This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit e8661ecbc49e750b3e962aa999f11b2a94d8fe27 Author: Ashutosh Mestry <[email protected]> AuthorDate: Mon Mar 2 08:14:04 2020 -0800 ATLAS-3641: Import Service: Support zipDirect format of import. --- .../atlas/model/impexp/AtlasImportRequest.java | 20 +- .../java/org/apache/atlas/utils/AtlasJson.java | 4 + .../atlas/repository/impexp/ImportService.java | 17 +- .../repository/impexp/ZipExportFileNames.java | 4 + .../atlas/repository/impexp/ZipSourceDirect.java | 333 +++++++++++++++++++++ .../migration/ZipFileMigrationImporter.java | 8 +- .../atlas/repository/impexp/ImportServiceTest.java | 15 + .../atlas/repository/impexp/ZipDirectTest.java | 83 +++++ 8 files changed, 474 insertions(+), 10 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index 0b3ede9..3362bf1 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -44,10 +44,13 @@ public class AtlasImportRequest implements Serializable { public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMERS_KEY = "transformers"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; + public static final String OPTION_KEY_FORMAT = "format"; + public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect"; private static final String START_POSITION_KEY = "startPosition"; private static final String START_GUID_KEY = "startGuid"; private static final String FILE_NAME_KEY = "fileName"; private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition"; + private static final String OPTION_KEY_STREAM_SIZE = "size"; private Map<String, String> options; @@ -108,7 +111,7 @@ public class AtlasImportRequest implements Serializable { return null; } - return (String) this.options.get(key); + return this.options.get(key); } @JsonIgnore @@ -127,4 +130,17 @@ public class AtlasImportRequest implements Serializable { options = new HashMap<>(); } options.put(key, value); - }} + } + + public void setSizeOption(int size) { + setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size)); + } + + public int getSizeOption() { + if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) { + return 1; + } + + return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE)); + } +} diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java b/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java index 1c13860..abeddf6 100644 --- a/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java +++ b/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java @@ -251,6 +251,10 @@ public class AtlasJson { return ret; } + public static ObjectCodec getMapper() { + return mapper; + } + static class DateSerializer extends JsonSerializer<Date> { @Override public void serialize(Date value, JsonGenerator jgen, SerializerProvider provider) throws IOException { diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 27001e3..1964ade 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -92,7 +92,7 @@ public class ImportService { request = new AtlasImportRequest(); } - EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); + EntityImportStream source = createZipSource(request, inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); return run(source, request, userName, hostName, requestingIP); } @@ -248,8 +248,13 @@ public class ImportService { return (int) (endTime - startTime); } - private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { + private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { try { + if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) && + request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) { + return getZipDirectEntityImportStream(request, inputStream); + } + if (StringUtils.isEmpty(configuredTemporaryDirectory)) { return new ZipSource(inputStream); } @@ -260,9 +265,15 @@ public class ImportService { } } + private EntityImportStream getZipDirectEntityImportStream(AtlasImportRequest request, InputStream inputStream) throws IOException, AtlasBaseException { + ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, request.getSizeOption()); + LOG.info("Using ZipSourceDirect: Size: {} entities", zipSourceDirect.size()); + return zipSourceDirect; + } + @VisibleForTesting boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) { - if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) { + if (exportRequest == null || CollectionUtils.isEmpty(exportRequest.getItemsToExport())) { return false; } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java index 351b475..8347b91 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java @@ -31,4 +31,8 @@ public enum ZipExportFileNames { public String toString() { return this.name; } + + public String toEntryFileName() { + return this.name + ".json"; + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java new file mode 100644 index 0000000..cb5a7ac --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.atlas.entitytransform.BaseEntityHandler; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.utils.AtlasJson; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP; + +public class ZipSourceDirect implements EntityImportStream { + private static final Logger LOG = LoggerFactory.getLogger(ZipSourceDirect.class); + private static final String ZIP_ENTRY_ENTITIES = "entities.json"; + + private final ZipInputStream zipInputStream; + private int currentPosition; + + private ImportTransforms importTransform; + private List<BaseEntityHandler> entityHandlers; + private AtlasTypesDef typesDef; + private int streamSize = 1; + + EntitiesArrayParser entitiesArrayParser; + + public ZipSourceDirect(InputStream inputStream, int streamSize) throws IOException, AtlasBaseException { + this.zipInputStream = new ZipInputStream(inputStream); + this.streamSize = streamSize; + prepareStreamForFetch(); + } + + @Override + public ImportTransforms getImportTransform() { + return this.importTransform; + } + + @Override + public void setImportTransform(ImportTransforms importTransform) { + this.importTransform = importTransform; + } + + @Override + public List<BaseEntityHandler> getEntityHandlers() { + return entityHandlers; + } + + @Override + public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) { + this.entityHandlers = entityHandlers; + } + + @Override + public AtlasTypesDef getTypesDef() throws AtlasBaseException { + return this.typesDef; + } + + @Override + public AtlasExportResult getExportResult() throws AtlasBaseException { + return new AtlasExportResult(); + } + + @Override + public List<String> getCreationOrder() { + return new ArrayList<>(); + } + + @Override + public int getPosition() { + return currentPosition; + } + + @Override + public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String json) throws AtlasBaseException { + if (StringUtils.isEmpty(json)) { + return null; + } + + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json); + + if (importTransform != null) { + entityWithExtInfo = importTransform.apply(entityWithExtInfo); + } + + if (entityHandlers != null) { + applyTransformers(entityWithExtInfo); + } + + return entityWithExtInfo; + } + + @Override + public boolean hasNext() { + return (this.entitiesArrayParser != null && entitiesArrayParser.hasNext()); + } + + @Override + public AtlasEntity next() { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo(); + + return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + } + + @Override + public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + try { + if (hasNext()) { + String json = moveNext(); + return getEntityWithExtInfo(json); + } + } catch (AtlasBaseException e) { + LOG.error("getNextEntityWithExtInfo", e); + } + return null; + } + + @Override + public void reset() { + currentPosition = 0; + } + + @Override + public AtlasEntity getByGuid(String guid) { + try { + return getEntity(guid); + } catch (AtlasBaseException e) { + LOG.error("getByGuid: {} failed!", guid, e); + return null; + } + } + + @Override + public void onImportComplete(String guid) { + } + + @Override + public void setPosition(int index) { + try { + for (int i = 0; i < index; i++) { + moveNextEntry(); + } + } catch (IOException e) { + LOG.error("Error setting position: {}. Position may be beyond the stream size.", index); + } + } + + @Override + public void setPositionUsingEntityGuid(String guid) { + } + + @Override + public void close() { + if (this.entitiesArrayParser != null) { + this.entitiesArrayParser.close(); + } + } + + private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + if (entityWithExtInfo == null) { + return; + } + + transform(entityWithExtInfo.getEntity()); + + if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + transform(e); + } + } + } + + private void transform(AtlasEntity e) { + for (BaseEntityHandler handler : entityHandlers) { + handler.transform(e); + } + } + + private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { + try { + return AtlasType.fromJson(jsonData, clazz); + + } catch (Exception e) { + throw new AtlasBaseException("Error converting file to JSON.", e); + } + } + + private AtlasEntity getEntity(String guid) throws AtlasBaseException { + AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid); + return (extInfo != null) ? extInfo.getEntity() : null; + } + + public int size() { + return this.streamSize; + } + + private String moveNext() { + try { + moveNextEntry(); + return entitiesArrayParser.next(); + } catch (IOException e) { + LOG.error("moveNext failed!", e); + } + + return null; + } + + private void moveNextEntry() throws IOException { + this.currentPosition++; + } + + private void prepareStreamForFetch() throws AtlasBaseException, IOException { + ZipEntry zipEntryNext = zipInputStream.getNextEntry(); + if (zipEntryNext == null) { + throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP."); + } + + if (zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) { + String json = getJsonPayloadFromZipEntryStream(this.zipInputStream); + this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class); + } + + zipEntryNext = zipInputStream.getNextEntry(); + if (zipEntryNext.getName().equals(ZIP_ENTRY_ENTITIES)) { + this.entitiesArrayParser = new EntitiesArrayParser(zipInputStream); + } else { + throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP. " + ZIP_ENTRY_ENTITIES + " could not be found!"); + } + } + + private String getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + IOUtils.copy(zipInputStream, bos); + } catch (IOException e) { + LOG.error("Streaming copying failed!", e); + return null; + } + return bos.toString(); + } + + static class EntitiesArrayParser { + private static final String EMPTY_OBJECT = "{}"; + + private final JsonFactory factory; + private final JsonParser parser; + private boolean hasNext; + + public EntitiesArrayParser(InputStream inputStream) throws IOException { + this.factory = AtlasJson.getMapper().getFactory(); + this.parser = factory.createParser(inputStream); + + parseNext(); + } + + public String next() throws IOException { + JsonToken jsonToken = parseNext(); + if (!hasNext) { + return null; + } + + if (jsonToken != null && jsonToken == JsonToken.START_OBJECT) { + JsonNode node = parser.readValueAsTree(); + return validate(node.toString()); + } + return null; + + } + + private JsonToken parseNext() throws IOException { + JsonToken jsonToken = this.parser.nextToken(); + hasNext = (jsonToken != null) && (jsonToken != JsonToken.END_ARRAY); + return jsonToken; + } + + private String validate(String payload) { + if (payload.equals(EMPTY_OBJECT)) { + hasNext = false; + close(); + return null; + } + + return payload; + } + + public boolean hasNext() { + return hasNext; + } + + public void close() { + try { + this.parser.close(); + } catch (IOException e) { + LOG.error("Error closing parser!", e); + } + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index ca0bc41..f552525 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -84,10 +84,8 @@ public class ZipFileMigrationImporter implements Runnable { } private AtlasImportRequest getImportRequest() throws AtlasException { - return new AtlasImportRequest(); - } - - private String getPropertyValue(String property, String defaultValue) throws AtlasException { - return ApplicationProperties.get().getString(property, defaultValue); + AtlasImportRequest request = new AtlasImportRequest(); + request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT); + return request; } } diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index c14850f..116ffa7 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -186,6 +186,11 @@ public class ImportServiceTest extends ExportImportTestBase { return getZipSource("salesNewTypeAttrs-next.zip"); } + @DataProvider(name = "zip-direct-3") + public static Object[][] getZipDirect3(ITestContext context) throws IOException, AtlasBaseException { + return getZipSource("zip-direct-3.zip"); + } + @Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4") public void importDB5(InputStream inputStream) throws AtlasBaseException, IOException { final String newEnumDefName = "database_action"; @@ -346,6 +351,16 @@ public class ImportServiceTest extends ExportImportTestBase { } } + @Test(dataProvider = "zip-direct-3", expectedExceptions = AtlasBaseException.class) + public void zipDirectSample(InputStream inputStream) throws IOException, AtlasBaseException { + loadBaseModel(); + loadFsModel(); + + AtlasImportRequest request = new AtlasImportRequest(); + request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT); + runImportWithParameters(importService, request, inputStream); + } + @DataProvider(name = "relationshipLineage") public static Object[][] getImportWithRelationships(ITestContext context) throws IOException, AtlasBaseException { return getZipSource("rel-lineage.zip"); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java new file mode 100644 index 0000000..faa31c3 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +public class ZipDirectTest { + @Test(expectedExceptions = AtlasBaseException.class) + public void loadFileEmpty() throws IOException, AtlasBaseException { + InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip"); + new ZipSourceDirect(inputStream, 1); + } + + @Test + public void loadFile() throws IOException, AtlasBaseException { + final int EXPECTED_ENTITY_COUNT = 3; + + InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip"); + ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, EXPECTED_ENTITY_COUNT); + + assertNotNull(zipSourceDirect); + assertNotNull(zipSourceDirect.getTypesDef()); + assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0); + assertNotNull(zipSourceDirect.getExportResult()); + + int count = 0; + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; + while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) != null) { + assertNotNull(entityWithExtInfo); + count++; + } + + assertEquals(count, EXPECTED_ENTITY_COUNT); + } + + @Test + public void entitiesParserTest() throws IOException { + String object1 = "{\"type\":\"hdfs_path\"}"; + String object2 = "{\"type\":\"hive_db\"}"; + String entities = "[" + object1 + "," + object2 + ",{}]"; + InputStream inputStream = new ByteArrayInputStream(entities.getBytes()); + ZipSourceDirect.EntitiesArrayParser entitiesArrayParser = new ZipSourceDirect.EntitiesArrayParser(inputStream); + + Object o = entitiesArrayParser.next(); + + assertNotNull(o); + assertEquals(o, object1); + + o = entitiesArrayParser.next(); + assertEquals(o, object2); + + o = entitiesArrayParser.next(); + assertNull(o); + } +}
