This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit f30bdc9fab05ad85005d737c443079bc65cdd3ea Author: Ashutosh Mestry <[email protected]> AuthorDate: Tue Sep 10 15:10:00 2019 -0700 ATLAS-3396: ZipSourceWithBackingDirectory: Implementation. Port to master. (cherry picked from commit 8bad6b0b6724f15ec6173769bccd33184d842f8e) --- distro/src/conf/atlas-application.properties | 3 + docs/src/site/twiki/Import-API-Options.twiki | 11 + .../java/org/apache/atlas/AtlasConfiguration.java | 4 +- .../atlas/repository/impexp/ImportService.java | 58 ++-- .../apache/atlas/repository/impexp/ZipSource.java | 10 +- .../impexp/ZipSourceWithBackingDirectory.java | 371 +++++++++++++++++++++ .../store/graph/v2/AtlasEntityStreamForImport.java | 52 +++ .../store/graph/v2/BulkImporterImpl.java | 11 +- .../store/graph/v2/EntityImportStream.java | 28 ++ .../repository/impexp/ExportIncrementalTest.java | 68 ++-- .../repository/impexp/ExportSkipLineageTest.java | 7 +- .../atlas/repository/impexp/ImportServiceTest.java | 68 ++-- .../impexp/ImportTransformsShaperTest.java | 2 +- .../RelationshipAttributesExtractorTest.java | 5 +- .../impexp/ReplicationEntityAttributeTest.java | 22 +- .../impexp/ZipFileResourceTestUtils.java | 49 +-- .../atlas/repository/impexp/ZipSourceTest.java | 4 +- .../ClassificationPropagationTest.java | 6 +- .../apache/atlas/services/MetricsServiceTest.java | 6 +- .../apache/atlas/web/resources/AdminResource.java | 4 +- 20 files changed, 670 insertions(+), 119 deletions(-) diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 471424b..7846452 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -82,6 +82,9 @@ ${graph.index.properties} # Solr-specific configuration property atlas.graph.index.search.max-result-set-size=150 +######### Import Configs ######### +#atlas.import.temp.directory=/temp/import + ######### Notification Configs ######### atlas.notification.embedded=true atlas.kafka.data=${sys:atlas.home}/data/kafka diff --git a/docs/src/site/twiki/Import-API-Options.twiki b/docs/src/site/twiki/Import-API-Options.twiki index 4004e70..7f90475 100644 --- a/docs/src/site/twiki/Import-API-Options.twiki +++ b/docs/src/site/twiki/Import-API-Options.twiki @@ -26,6 +26,7 @@ Following options are supported for Import process: * Specify transforms during import operation. * Resume import by specifying starting entity guid. * Optionally import type definition. + * Handling large imports. ---++++ Transforms @@ -133,3 +134,13 @@ curl -g -X POST -u adminuser:password -H "Content-Type: application/json" -d [email protected] "http://localhost:21000/api/atlas/admin/importfile" </verbatim> + +---++++ Handling Large Imports + +By default, the Import Service stores all of the data in memory. This may be limiting for ZIPs containing large amount of data. + +To configure temporary directory use the application property _atlas.import.temp.directory_. If this property is left blank, default in-memory implementation is used. + +Please ensure that there is sufficient disk space available for the operation. + +The contents of the directory created as backing store for the import operation will be erased after the operation is over. diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 38087dc..9160524 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -57,7 +57,9 @@ public enum AtlasConfiguration { //search configuration SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000), - SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100); + SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100), + + IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""); private static final Configuration APPLICATION_PROPERTIES; diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 3ded798..df49ae1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.impexp; import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContext; import org.apache.atlas.entitytransform.BaseEntityHandler; @@ -27,22 +28,23 @@ import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.BulkImporter; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.util.List; import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY; @@ -72,18 +74,25 @@ public class ImportService { this.importTransformsShaper = importTransformsShaper; } - public AtlasImportResult run(ZipSource source, String userName, + public AtlasImportResult run(InputStream inputStream, String userName, String hostName, String requestingIP) throws AtlasBaseException { - return run(source, null, userName, hostName, requestingIP); + return run(inputStream, null, userName, hostName, requestingIP); } - public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName, + public AtlasImportResult run(InputStream inputStream, AtlasImportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException { if (request == null) { request = new AtlasImportRequest(); } + EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); + return run(source, request, userName, hostName, requestingIP); + } + + @VisibleForTesting + AtlasImportResult run(EntityImportStream source, AtlasImportRequest request, String userName, + String hostName, String requestingIP) throws AtlasBaseException { AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis()); try { @@ -112,7 +121,10 @@ public class ImportService { } finally { RequestContext.get().setImportInProgress(false); - source.close(); + if (source != null) { + source.close(); + } + LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus()); } @@ -120,7 +132,7 @@ public class ImportService { } @VisibleForTesting - void setImportTransform(ZipSource source, String transforms) throws AtlasBaseException { + void setImportTransform(EntityImportStream source, String transforms) throws AtlasBaseException { ImportTransforms importTransform = ImportTransforms.fromJson(transforms); if (importTransform == null) { return; @@ -132,11 +144,10 @@ public class ImportService { if(LOG.isDebugEnabled()) { debugLog(" => transforms: {}", AtlasType.toJson(importTransform)); } - } @VisibleForTesting - void setEntityTransformerHandlers(ZipSource source, String transformersJson) throws AtlasBaseException { + void setEntityTransformerHandlers(EntityImportStream source, String transformersJson) throws AtlasBaseException { if (StringUtils.isEmpty(transformersJson)) { return; } @@ -156,7 +167,7 @@ public class ImportService { LOG.debug(s, params); } - private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException { + private void setStartPosition(AtlasImportRequest request, EntityImportStream source) throws AtlasBaseException { if (request.getStartGuid() != null) { source.setPositionUsingEntityGuid(request.getStartGuid()); } else if (request.getStartPosition() != null) { @@ -164,8 +175,7 @@ public class ImportService { } } - public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) - throws AtlasBaseException { + public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException { String fileName = request.getFileName(); if (StringUtils.isBlank(fileName)) { @@ -173,14 +183,11 @@ public class ImportService { } AtlasImportResult result = null; - try { LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName); - String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null; File file = new File(fileName); - ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms)); - result = run(source, request, userName, hostName, requestingIP); + result = run(new FileInputStream(file), request, userName, hostName, requestingIP); } catch (AtlasBaseException excp) { LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); @@ -189,10 +196,6 @@ public class ImportService { LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp); throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found"); - } catch (IOException excp) { - LOG.error("import(user={}, from={}, fileName={}): cannot read file", userName, requestingIP, excp); - - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": cannot read file"); } catch (Exception excp) { LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); @@ -214,7 +217,7 @@ public class ImportService { importTypeDefProcessor.processTypes(typeDefinitionMap, result); } - private void processEntities(String userName, ZipSource importSource, AtlasImportResult result) throws AtlasBaseException { + private void processEntities(String userName, EntityImportStream importSource, AtlasImportResult result) throws AtlasBaseException { result.setExportResult(importSource.getExportResult()); this.bulkImporter.bulkImport(importSource, result); @@ -228,4 +231,17 @@ public class ImportService { private int getDuration(long endTime, long startTime) { return (int) (endTime - startTime); } + + private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { + try { + if (StringUtils.isEmpty(configuredTemporaryDirectory)) { + return new ZipSource(inputStream); + } + + return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory); + } + catch (IOException ex) { + throw new AtlasBaseException(ex); + } + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java index 1ce96a8..812add9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java @@ -82,20 +82,25 @@ public class ZipSource implements EntityImportStream { guidEntityJsonMap.get(key).equals("[]")); } + @Override public ImportTransforms getImportTransform() { return this.importTransform; } + @Override public void setImportTransform(ImportTransforms importTransform) { this.importTransform = importTransform; } + @Override public List<BaseEntityHandler> getEntityHandlers() { return entityHandlers; } + @Override public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) { this.entityHandlers = entityHandlers; } + @Override public AtlasTypesDef getTypesDef() throws AtlasBaseException { final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); @@ -103,6 +108,7 @@ public class ZipSource implements EntityImportStream { return convertFromJson(AtlasTypesDef.class, s); } + @Override public AtlasExportResult getExportResult() throws AtlasBaseException { final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(); @@ -147,6 +153,7 @@ public class ZipSource implements EntityImportStream { zipInputStream.close(); } + @Override public List<String> getCreationOrder() { return this.creationOrder; } @@ -210,6 +217,7 @@ public class ZipSource implements EntityImportStream { return s; } + @Override public void close() { try { inputStream.close(); @@ -284,7 +292,7 @@ public class ZipSource implements EntityImportStream { currentPosition = index; reset(); for (int i = 0; i < creationOrder.size() && i <= index; i++) { - iterator.next(); + onImportComplete(iterator.next()); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java new file mode 100644 index 0000000..7963800 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceWithBackingDirectory.java @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.entitytransform.BaseEntityHandler; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP; + +public class ZipSourceWithBackingDirectory implements EntityImportStream { + private static final Logger LOG = LoggerFactory.getLogger(ZipSourceWithBackingDirectory.class); + private static final String TEMPORARY_DIRECTORY_PREFIX = "atlas-import-temp-"; + private static final String EXT_JSON = ".json"; + + private Path tempDirectory; + + private ImportTransforms importTransform; + private List<BaseEntityHandler> entityHandlers; + + private ArrayList<String> creationOrder = new ArrayList<>(); + private int currentPosition; + private int numberOfEntries; + + public ZipSourceWithBackingDirectory(InputStream inputStream) throws IOException, AtlasBaseException { + this(inputStream, null); + } + + public ZipSourceWithBackingDirectory(InputStream inputStream, String backingDirectory) throws IOException, AtlasBaseException { + setupBackingStore(inputStream, backingDirectory); + if (isZipFileEmpty()) { + throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP."); + } + } + + @Override + public ImportTransforms getImportTransform() { return this.importTransform; } + + @Override + public void setImportTransform(ImportTransforms importTransform) { + this.importTransform = importTransform; + } + + @Override + public List<BaseEntityHandler> getEntityHandlers() { + return entityHandlers; + } + + @Override + public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) { + this.entityHandlers = entityHandlers; + } + + @Override + public AtlasTypesDef getTypesDef() throws AtlasBaseException { + return getJsonFromEntry(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(), AtlasTypesDef.class); + } + + @Override + public AtlasExportResult getExportResult() throws AtlasBaseException { + return getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(), AtlasExportResult.class); + } + + @Override + public List<String> getCreationOrder() { + return creationOrder; + } + + @Override + public int getPosition() { + return currentPosition; + } + + @Override + public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { + final File file = getFileFromTemporaryDirectory(guid + EXT_JSON); + if (!file.exists()) { + return null; + } + + String json = getJsonStringForFile(file); + if (StringUtils.isEmpty(json)) { + return null; + } + + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json); + + if (importTransform != null) { + entityWithExtInfo = importTransform.apply(entityWithExtInfo); + } + + if (entityHandlers != null) { + applyTransformers(entityWithExtInfo); + } + + return entityWithExtInfo; + } + + @Override + public boolean hasNext() { + return (currentPosition < numberOfEntries); + } + + @Override + public AtlasEntity next() { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo(); + + return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + } + + @Override + public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + try { + return getEntityWithExtInfo(moveNext()); + } catch (AtlasBaseException e) { + LOG.error("getNextEntityWithExtInfo", e); + return null; + } + } + + @Override + public void reset() { + currentPosition = 0; + } + + @Override + public AtlasEntity getByGuid(String guid) { + try { + return getEntity(guid); + } catch (AtlasBaseException e) { + LOG.error("getByGuid: {} failed!", guid, e); + return null; + } + } + + + @Override + public void onImportComplete(String guid) { + getFileFromTemporaryDirectory(guid + EXT_JSON).delete(); + } + + @Override + public void setPosition(int index) { + reset(); + for (int i = 0; i < numberOfEntries && i <= index; i++) { + onImportComplete(moveNext()); + } + } + + @Override + public void setPositionUsingEntityGuid(String guid) { + if (StringUtils.isEmpty(guid)) { + return; + } + + String current; + while (currentPosition < numberOfEntries) { + current = creationOrder.get(currentPosition); + if (current.equals(guid)) { + return; + } + + moveNext(); + } + } + + @Override + public void close() { + creationOrder.clear(); + try { + LOG.error("Import: Removing temporary directory: {}", tempDirectory.toString()); + FileUtils.deleteDirectory(tempDirectory.toFile()); + } catch (IOException e) { + LOG.error("Import: Error deleting: {}", tempDirectory.toString(), e); + } + } + + private boolean isZipFileEmpty() { + return (numberOfEntries == 0); + } + + private <T> T getJsonFromEntry(String entryName, Class<T> clazz) throws AtlasBaseException { + final File file = getFileFromTemporaryDirectory(entryName + EXT_JSON); + if (!file.exists()) { + throw new AtlasBaseException(entryName + " not found!"); + } + + return convertFromJson(clazz, getJsonStringForFile(file)); + } + + private void setupBackingStore(InputStream inputStream, String backingDirectory) throws AtlasBaseException, IOException { + initTempDirectory(backingDirectory); + unzipToTempDirectory(inputStream); + setupIterator(); + } + + private void initTempDirectory(String backingDirectory) throws AtlasBaseException { + try { + tempDirectory = Files.createDirectory(Paths.get(backingDirectory, getChildDirectoryForSession())); + if (!permissionChecks(tempDirectory.toFile())) { + throw new AtlasBaseException( + String.format("Import: Temporary directory: %s does not have permissions for operation!", tempDirectory.toString())); + } + } + catch(Exception ex) { + throw new AtlasBaseException(String.format("Error fetching temporary directory: %s", tempDirectory.toString()), ex); + } + } + + private String getChildDirectoryForSession() { + return String.format("%s%s", TEMPORARY_DIRECTORY_PREFIX, UUID.randomUUID()); + } + + private boolean permissionChecks(File f) { + return f.exists() && f.isDirectory() && f.canWrite(); + } + + private void unzipToTempDirectory(InputStream inputStream) throws IOException { + LOG.info("Import: Temporary directory: {}", tempDirectory.toString()); + + ZipInputStream zis = new ZipInputStream(inputStream); + try { + ZipEntry zipEntry = zis.getNextEntry(); + while (zipEntry != null) { + String entryName = zipEntry.getName(); + + writeJsonToFile(entryName, getJsonPayloadFromZipEntryStream(zis)); + numberOfEntries++; + + zipEntry = zis.getNextEntry(); + } + + numberOfEntries -= ZipExportFileNames.values().length; + } + finally { + zis.close(); + inputStream.close(); + } + } + + private void writeJsonToFile(String entryName, byte[] jsonPayload) throws IOException { + File f = getFileFromTemporaryDirectory(entryName); + Files.write(f.toPath(), jsonPayload); + } + + private File getFileFromTemporaryDirectory(String entryName) { + return new File(tempDirectory.toFile(), entryName); + } + + private void setupIterator() { + try { + creationOrder = getJsonFromEntry(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), ArrayList.class); + } catch (AtlasBaseException e) { + LOG.error("Error fetching: {}. Error generating order.", ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(), e); + } + + reset(); + } + + private byte[] getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) { + try { + byte[] buf = new byte[1024]; + + int n = 0; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + while ((n = zipInputStream.read(buf, 0, 1024)) > -1) { + bos.write(buf, 0, n); + } + + return bos.toByteArray(); + } catch (IOException ex) { + LOG.error("Error fetching string from entry.", ex); + } + + return null; + } + + private String getJsonStringForFile(File file) { + try { + byte[] bytes = Files.readAllBytes(file.toPath()); + return new String(bytes); + } catch (IOException e) { + LOG.warn("Error fetching: {}", file.toString(), e); + return null; + } + } + + private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + if (entityWithExtInfo == null) { + return; + } + + transform(entityWithExtInfo.getEntity()); + + if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + transform(e); + } + } + } + + private void transform(AtlasEntity e) { + for (BaseEntityHandler handler : entityHandlers) { + handler.transform(e); + } + } + + private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { + try { + return AtlasType.fromJson(jsonData, clazz); + + } catch (Exception e) { + throw new AtlasBaseException("Error converting file to JSON.", e); + } + } + + private AtlasEntity getEntity(String guid) throws AtlasBaseException { + AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid); + return (extInfo != null) ? extInfo.getEntity() : null; + } + + public int size() { + return numberOfEntries; + } + + private String moveNext() { + if (currentPosition < numberOfEntries) { + return creationOrder.get(currentPosition++); + } + + return null; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java index 6bf962e..5ad9d60 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java @@ -17,8 +17,15 @@ */ package org.apache.atlas.repository.store.graph.v2; +import org.apache.atlas.entitytransform.BaseEntityHandler; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.impexp.ImportTransforms; + +import java.util.List; public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream { private int currentPosition = 0; @@ -36,6 +43,11 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent } @Override + public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { + return null; + } + + @Override public AtlasEntity getByGuid(String guid) { AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid); @@ -69,4 +81,44 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent public void onImportComplete(String guid) { } + + @Override + public void setImportTransform(ImportTransforms importTransform) { + + } + + @Override + public ImportTransforms getImportTransform() { + return null; + } + + @Override + public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) { + + } + + @Override + public List<BaseEntityHandler> getEntityHandlers() { + return null; + } + + @Override + public AtlasTypesDef getTypesDef() throws AtlasBaseException { + return null; + } + + @Override + public AtlasExportResult getExportResult() throws AtlasBaseException { + return null; + } + + @Override + public List<String> getCreationOrder() { + return null; + } + + @Override + public void close() { + + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java index 2f330c0..54c32c5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.store.graph.v2; import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; @@ -57,12 +58,14 @@ public class BulkImporterImpl implements BulkImporter { private final EntityGraphRetriever entityGraphRetriever; private AtlasTypeRegistry typeRegistry; private final int MAX_ATTEMPTS = 2; + private boolean directoryBasedImportConfigured; @Inject public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.entityStore = entityStore; this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.typeRegistry = typeRegistry; + this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); } @Override @@ -205,9 +208,11 @@ public class BulkImporterImpl implements BulkImporter { AtlasImportResult importResult, Set<String> processedGuids, int currentIndex, int streamSize, float currentPercent) { - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); + if (!directoryBasedImportConfigured) { + updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); + } String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java index cf7ac28..c43a04e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityImportStream.java @@ -18,17 +18,45 @@ package org.apache.atlas.repository.store.graph.v2; +import org.apache.atlas.entitytransform.BaseEntityHandler; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.impexp.ImportTransforms; + +import java.util.List; public interface EntityImportStream extends EntityStream { int size(); + void setPosition(int position); + int getPosition(); void setPositionUsingEntityGuid(String guid); AtlasEntityWithExtInfo getNextEntityWithExtInfo(); + AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException; + void onImportComplete(String guid); + + void setImportTransform(ImportTransforms importTransform); + + public ImportTransforms getImportTransform(); + + void setEntityHandlers(List<BaseEntityHandler> entityHandlers); + + List<BaseEntityHandler> getEntityHandlers(); + + AtlasTypesDef getTypesDef() throws AtlasBaseException; + + AtlasExportResult getExportResult() throws AtlasBaseException; + + List<String> getCreationOrder(); + + void close(); } diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java index 7aeb6a7..4d43852 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java @@ -33,16 +33,20 @@ import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.TestResourceFileUtils; +import org.apache.commons.io.IOUtils; import org.testng.ITestContext; import org.testng.SkipException; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import org.testng.annotations.DataProvider; import javax.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -52,8 +56,8 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; -import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -107,11 +111,13 @@ public class ExportIncrementalTest extends ExportImportTestBase { } @Test - public void atT0_ReturnsAllEntities() throws AtlasBaseException { + public void atT0_ReturnsAllEntities() throws AtlasBaseException, IOException { final int expectedEntityCount = 2; AtlasExportRequest request = getIncrementalRequest(0); - ZipSource source = runExportWithParameters(exportService, request); + InputStream inputStream = runExportWithParameters(exportService, request); + + ZipSource source = getZipSourceFromInputStream(inputStream); AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); int count = 0; @@ -129,13 +135,15 @@ public class ExportIncrementalTest extends ExportImportTestBase { } @Test(dependsOnMethods = "atT0_ReturnsAllEntities") - public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException { + public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException, IOException { final int expectedEntityCount = 1; entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue())); AtlasExportRequest request = getIncrementalRequest(nextTimestamp); - ZipSource source = runExportWithParameters(exportService, request); + InputStream inputStream = runExportWithParameters(exportService, request); + + ZipSource source = getZipSourceFromInputStream(inputStream); AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); AtlasEntity entity = null; @@ -155,7 +163,7 @@ public class ExportIncrementalTest extends ExportImportTestBase { } @Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable") - public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException { + public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException, IOException { final int expectedEntityCount = 1; AtlasEntity.AtlasEntityWithExtInfo tableEntity = entityStore.getById(TABLE_GUID); @@ -163,7 +171,9 @@ public class ExportIncrementalTest extends ExportImportTestBase { entityStore.addClassifications(COLUMN_GUID_HIGH, ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue())); - ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp)); + InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp)); + + ZipSource source = getZipSourceFromInputStream(inputStream); AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, expectedEntityCount); for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) { @@ -176,17 +186,26 @@ public class ExportIncrementalTest extends ExportImportTestBase { assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp); } + private ZipSource getZipSourceFromInputStream(InputStream inputStream) { + try { + return new ZipSource(inputStream); + } catch (IOException | AtlasBaseException e) { + return null; + } + } + @Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn") public void exportingWithSameParameters_Succeeds() { - ZipSource source = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp)); + InputStream inputStream = runExportWithParameters(exportService, getIncrementalRequest(nextTimestamp)); - assertNotNull(source); + assertNotNull(getZipSourceFromInputStream(inputStream)); } @Test public void connectedExport() { - ZipSource source = runExportWithParameters(exportService, getConnected()); + InputStream inputStream = runExportWithParameters(exportService, getConnected()); + ZipSource source = getZipSourceFromInputStream(inputStream); UniqueList<String> creationOrder = new UniqueList<>(); List<String> zipCreationOrder = source.getCreationOrder(); creationOrder.addAll(zipCreationOrder); @@ -200,27 +219,29 @@ public class ExportIncrementalTest extends ExportImportTestBase { } @Test(dataProvider = "hiveDb") - public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException { - runImportWithNoParameters(importService, zipSource); + public void importHiveDb(InputStream stream) throws AtlasBaseException, IOException { + runImportWithNoParameters(importService, stream); } @Test(dependsOnMethods = "importHiveDb") - public void exportTableInrementalConnected() throws AtlasBaseException { - ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true)); - verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2); + public void exportTableInrementalConnected() throws AtlasBaseException, IOException { + InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true)); - nextTimestamp = updateTimesampForNextIncrementalExport(source); + ZipSource sourceCopy = getZipSourceCopy(source); + verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_TABLE_CTAS_2); + + nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy); try { source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true)); - }catch (SkipException e){ - + } catch (SkipException e) { + throw e; } entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue())); source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true)); - verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2); + verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2); } @@ -281,4 +302,11 @@ public class ExportIncrementalTest extends ExportImportTestBase { } return ret; } + + private ZipSource getZipSourceCopy(InputStream is) throws IOException, AtlasBaseException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IOUtils.copy(is, baos); + + return new ZipSource(new ByteArrayInputStream(baos.toByteArray())); + } } diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java index 18b4a30..25e0a53 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java @@ -40,6 +40,7 @@ import org.testng.annotations.Test; import javax.inject.Inject; import java.io.IOException; +import java.io.InputStream; import java.util.Map; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; @@ -87,11 +88,13 @@ public class ExportSkipLineageTest extends ExportImportTestBase { } @Test - public void exportWithoutLineage() { + public void exportWithoutLineage() throws IOException, AtlasBaseException { final int expectedEntityCount = 3; AtlasExportRequest request = getRequest(); - ZipSource source = runExportWithParameters(exportService, request); + InputStream inputStream = runExportWithParameters(exportService, request); + + ZipSource source = new ZipSource(inputStream); AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount); int count = 0; diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index 7044243..33fe0ad 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -52,6 +52,7 @@ import org.testng.annotations.Guice; import org.testng.annotations.Test; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,7 +75,6 @@ import static org.testng.Assert.assertTrue; @Guice(modules = TestModules.TestOnlyModule.class) public class ImportServiceTest extends ExportImportTestBase { - private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class); private static final int DEFAULT_LIMIT = 25; private final ImportService importService; @@ -124,9 +124,9 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "sales") - public void importDB1(ZipSource zipSource) throws AtlasBaseException, IOException { + public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException { loadBaseModel(); - runAndVerifyQuickStart_v1_Import(importService, zipSource); + runAndVerifyQuickStart_v1_Import(importService, inputStream); assertEntityCount("DB_v1", "bfe88eb8-7556-403c-8210-647013f44a44", 1); @@ -141,9 +141,9 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "reporting") - public void importDB2(ZipSource zipSource) throws AtlasBaseException, IOException { + public void importDB2(InputStream inputStream) throws AtlasBaseException, IOException { loadBaseModel(); - runAndVerifyQuickStart_v1_Import(importService, zipSource); + runAndVerifyQuickStart_v1_Import(importService, inputStream); } @DataProvider(name = "logging") @@ -152,9 +152,9 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "logging") - public void importDB3(ZipSource zipSource) throws AtlasBaseException, IOException { + public void importDB3(InputStream inputStream) throws AtlasBaseException, IOException { loadBaseModel(); - runAndVerifyQuickStart_v1_Import(importService, zipSource); + runAndVerifyQuickStart_v1_Import(importService, inputStream); } @DataProvider(name = "salesNewTypeAttrs") @@ -163,9 +163,9 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "salesNewTypeAttrs", dependsOnMethods = "importDB1") - public void importDB4(ZipSource zipSource) throws AtlasBaseException, IOException { + public void importDB4(InputStream inputStream) throws AtlasBaseException, IOException { loadBaseModel(); - runImportWithParameters(importService, getDefaultImportRequest(), zipSource); + runImportWithParameters(importService, getDefaultImportRequest(), inputStream); } @DataProvider(name = "salesNewTypeAttrs-next") @@ -174,7 +174,7 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4") - public void importDB5(ZipSource zipSource) throws AtlasBaseException, IOException { + public void importDB5(InputStream inputStream) throws AtlasBaseException, IOException { final String newEnumDefName = "database_action"; assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName)); @@ -184,13 +184,13 @@ public class ImportServiceTest extends ExportImportTestBase { options.put("updateTypeDefinition", "false"); request.setOptions(options); - runImportWithParameters(importService, request, zipSource); + runImportWithParameters(importService, request, inputStream); assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName)); assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 4); } @Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4") - public void importDB6(ZipSource zipSource) throws AtlasBaseException, IOException { + public void importDB6(InputStream inputStream) throws AtlasBaseException, IOException { final String newEnumDefName = "database_action"; assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName)); @@ -200,7 +200,7 @@ public class ImportServiceTest extends ExportImportTestBase { options.put("updateTypeDefinition", "true"); request.setOptions(options); - runImportWithParameters(importService, request, zipSource); + runImportWithParameters(importService, request, inputStream); assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName)); assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8); } @@ -211,11 +211,11 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "ctas") - public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException { + public void importCTAS(InputStream inputStream) throws IOException, AtlasBaseException { loadBaseModel(); loadHiveModel(); - runImportWithNoParameters(importService, zipSource); + runImportWithNoParameters(importService, inputStream); } @DataProvider(name = "stocks-legacy") @@ -224,12 +224,12 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "stocks-legacy") - public void importLegacy(ZipSource zipSource) throws IOException, AtlasBaseException { + public void importLegacy(InputStream inputStream) throws IOException, AtlasBaseException { loadBaseModel(); loadFsModel(); loadHiveModel(); - runImportWithNoParameters(importService, zipSource); + runImportWithNoParameters(importService, inputStream); List<AtlasEntityHeader> result = getImportedEntities("hive_db", "886c5e9c-3ac6-40be-8201-fb0cebb64783"); assertEquals(result.size(), 1); @@ -244,12 +244,12 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "tag-prop-2") - public void importTagProp2(ZipSource zipSource) throws IOException, AtlasBaseException { + public void importTagProp2(InputStream inputStream) throws IOException, AtlasBaseException { loadBaseModel(); loadFsModel(); loadHiveModel(); - runImportWithNoParameters(importService, zipSource); + runImportWithNoParameters(importService, inputStream); assertEntityCount("hive_db", "7d7d5a18-d992-457e-83c0-e36f5b95ebdb", 1); assertEntityCount("hive_table", "dbe729bb-c614-4e23-b845-3258efdf7a58", 1); AtlasEntity entity = assertEntity("hive_table", "092e9888-de96-4908-8be3-925ee72e3395"); @@ -260,7 +260,7 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "stocks-legacy") - public void importExistingTopLevelEntity(ZipSource zipSource) throws IOException, AtlasBaseException{ + public void importExistingTopLevelEntity(InputStream inputStream) throws IOException, AtlasBaseException{ loadBaseModel(); loadFsModel(); loadHiveModel(); @@ -277,7 +277,7 @@ public class ImportServiceTest extends ExportImportTestBase { assertNotNull(createResponse); String preImportGuid = createResponse.getCreatedEntities().get(0).getGuid(); - runImportWithNoParameters(importService, zipSource); + runImportWithNoParameters(importService, inputStream); AtlasVertex v = AtlasGraphUtilsV2.findByGuid("886c5e9c-3ac6-40be-8201-fb0cebb64783"); assertNotNull(v); @@ -295,10 +295,10 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "stocks-glossary") - public void importGlossary(ZipSource zipSource) throws IOException, AtlasBaseException { + public void importGlossary(InputStream inputStream) throws IOException, AtlasBaseException { loadBaseModel(); loadGlossary(); - runImportWithNoParameters(importService, zipSource); + runImportWithNoParameters(importService, inputStream); assertEntityCount("AtlasGlossary", "40c80052-3129-4f7c-8f2f-391677935416", 1); assertEntityCount("AtlasGlossaryTerm", "e93ac426-de04-4d54-a7c9-d76c1e96369b", 1); @@ -317,13 +317,13 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "hdfs_path1", expectedExceptions = AtlasBaseException.class) - public void importHdfs_path1(ZipSource zipSource) throws IOException, AtlasBaseException { + public void importHdfs_path1(InputStream inputStream) throws IOException, AtlasBaseException { loadBaseModel(); loadFsModel(); loadModelFromResourcesJson("tag1.json", typeDefStore, typeRegistry); try { - runImportWithNoParameters(importService, zipSource); + runImportWithNoParameters(importService, inputStream); } catch (AtlasBaseException e) { assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_IMPORT_ATTRIBUTE_TYPE_CHANGED); AtlasClassificationType tag1 = typeRegistry.getClassificationTypeByName("tag1"); @@ -344,11 +344,11 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "relationshipLineage") - public void importDB8(ZipSource zipSource) throws AtlasBaseException, IOException { + public void importDB8(InputStream inputStream) throws AtlasBaseException, IOException { loadBaseModel(); loadHiveModel(); AtlasImportRequest request = getDefaultImportRequest(); - runImportWithParameters(importService, request, zipSource); + runImportWithParameters(importService, request, inputStream); } @DataProvider(name = "relationship") @@ -357,11 +357,11 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "relationship") - public void importDB7(ZipSource zipSource) throws AtlasBaseException, IOException { + public void importDB7(InputStream inputStream) throws AtlasBaseException, IOException { loadBaseModel(); loadHiveModel(); AtlasImportRequest request = getDefaultImportRequest(); - runImportWithParameters(importService, request, zipSource); + runImportWithParameters(importService, request, inputStream); assertEntityCount("hive_db", "d7dc0848-fbba-4d63-9264-a460798361f5", 1); assertEntityCount("hive_table", "2fb31eaa-4bb2-4eb8-b333-a888ba7c84fe", 1); @@ -430,11 +430,12 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "salesNewTypeAttrs-next") - public void transformUpdatesForSubTypes(ZipSource zipSource) throws IOException, AtlasBaseException { + public void transformUpdatesForSubTypes(InputStream inputStream) throws IOException, AtlasBaseException { loadBaseModel(); loadHiveModel(); String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }"; + ZipSource zipSource = new ZipSource(inputStream); importService.setImportTransform(zipSource, transformJSON); ImportTransforms importTransforms = zipSource.getImportTransform(); @@ -444,11 +445,12 @@ public class ImportServiceTest extends ExportImportTestBase { } @Test(dataProvider = "salesNewTypeAttrs-next") - public void transformUpdatesForSubTypesAddsToExistingTransforms(ZipSource zipSource) throws IOException, AtlasBaseException { - loadBaseModel(); - loadHiveModel(); + public void transformUpdatesForSubTypesAddsToExistingTransforms(InputStream inputStream) throws IOException, AtlasBaseException { + loadBaseModel(); + loadHiveModel(); String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"replace:@cl1:@cl2\" ] }, \"hive_table\": { \"qualifiedName\":[ \"lowercase\" ] } }"; + ZipSource zipSource = new ZipSource(inputStream); importService.setImportTransform(zipSource, transformJSON); ImportTransforms importTransforms = zipSource.getImportTransform(); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java index 06bdaa6..78fdaca 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java @@ -66,7 +66,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase { public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException { AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService, getImporRequest(), - ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip")); + ZipFileResourceTestUtils.getInputStreamFrom("stocks.zip")); AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME); assertNotNull(classification); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java index 03d50f1..920fc28 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java @@ -41,6 +41,7 @@ import javax.inject.Inject; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.ArrayList; @@ -106,8 +107,8 @@ public class RelationshipAttributesExtractorTest { } @Test(dataProvider = "hiveDb") - public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException { - runImportWithNoParameters(importService, zipSource); + public void importHiveDb(InputStream inputStream) throws AtlasBaseException, IOException { + runImportWithNoParameters(importService, inputStream); } @Test(dependsOnMethods = "importHiveDb") diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java index 92d4fb0..7a1ed18 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java @@ -24,10 +24,10 @@ import org.apache.atlas.RequestContext; import org.apache.atlas.TestModules; import org.apache.atlas.TestUtilsV2; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasServer; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AtlasServer; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.repository.Constants; @@ -40,6 +40,7 @@ import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.TestResourceFileUtils; +import org.apache.commons.io.IOUtils; import org.testng.SkipException; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -47,7 +48,10 @@ import org.testng.annotations.Guice; import org.testng.annotations.Test; import javax.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.List; import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO; @@ -88,7 +92,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { @Inject private AtlasEntityStoreV2 entityStore; - private ZipSource zipSource; + private InputStream inputStream; @BeforeClass public void setup() throws IOException, AtlasBaseException { @@ -107,13 +111,19 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { } @Test - public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException { + public void exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException { final int expectedEntityCount = 2; AtlasExportRequest request = getUpdateMetaInfoUpdateRequest(); - zipSource = runExportWithParameters(exportService, request); + InputStream inputStream = runExportWithParameters(exportService, request); + + assertNotNull(inputStream); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IOUtils.copy(inputStream, baos); + this.inputStream = new ByteArrayInputStream(baos.toByteArray()); - assertNotNull(zipSource); + ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray())); assertNotNull(zipSource.getCreationOrder()); assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount); @@ -139,7 +149,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { @Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute") public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException { AtlasImportRequest request = getImportRequestWithReplicationOption(); - AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource); + AtlasImportResult importResult = runImportWithParameters(importService, request, inputStream); assertCluster( AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME), diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java index 76b423e..f4e84b2 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java @@ -33,12 +33,14 @@ import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasJson; import org.apache.atlas.utils.TestResourceFileUtils; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +51,8 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -151,19 +155,11 @@ public class ZipFileResourceTestUtils { } public static Object[][] getZipSource(String fileName) throws IOException, AtlasBaseException { - return new Object[][]{{getZipSourceFrom(fileName)}}; + return new Object[][]{{getInputStreamFrom(fileName)}}; } - public static ZipSource getZipSourceFrom(String fileName) throws IOException, AtlasBaseException { - FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); - - return new ZipSource(fs); - } - - private static ZipSource getZipSourceFrom(ByteArrayOutputStream baos) throws IOException, AtlasBaseException { - ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray()); - ZipSource zipSource = new ZipSource(bis); - return zipSource; + public static InputStream getInputStreamFrom(String fileName) { + return ZipFileResourceTestUtils.getFileInputStream(fileName); } public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) { @@ -224,7 +220,7 @@ public class ZipFileResourceTestUtils { } } - public static ZipSource runExportWithParameters(ExportService exportService, AtlasExportRequest request) { + public static InputStream runExportWithParameters(ExportService exportService, AtlasExportRequest request) { final String requestingIP = "1.0.0.0"; final String hostName = "localhost"; final String userName = "admin"; @@ -237,7 +233,7 @@ public class ZipFileResourceTestUtils { assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS); zipSink.close(); - return getZipSourceFrom(baos); + return new ByteArrayInputStream(baos.toByteArray()); } catch(Exception ex) { throw new SkipException(String.format("runExportWithParameters: %s: failed!", request.toString())); @@ -325,27 +321,42 @@ public class ZipFileResourceTestUtils { } - public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, ZipSource source) throws AtlasBaseException, IOException { + public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, InputStream inputStream) throws AtlasBaseException, IOException { + final String requestingIP = "1.0.0.0"; + final String hostName = "localhost"; + final String userName = "admin"; + + AtlasImportResult result = importService.run(inputStream, request, userName, hostName, requestingIP); + assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); + return result; + } + + public static AtlasImportResult runImportWithNoParameters(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException { final String requestingIP = "1.0.0.0"; final String hostName = "localhost"; final String userName = "admin"; - AtlasImportResult result = importService.run(source, request, userName, hostName, requestingIP); + AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP); assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); return result; } - public static AtlasImportResult runImportWithNoParameters(ImportService importService, ZipSource source) throws AtlasBaseException, IOException { + public static AtlasImportResult runImportWithNoParametersUsingBackingDirectory(ImportService importService, InputStream inputStream) throws AtlasBaseException, IOException { final String requestingIP = "1.0.0.0"; final String hostName = "localhost"; final String userName = "admin"; - AtlasImportResult result = importService.run(source, userName, hostName, requestingIP); + EntityImportStream sourceWithBackingDirectory = new ZipSourceWithBackingDirectory(inputStream, Files.createTempDirectory("temp").toString()); + AtlasImportResult result = importService.run(sourceWithBackingDirectory, new AtlasImportRequest(), userName, hostName, requestingIP); assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); return result; } - public static void runAndVerifyQuickStart_v1_Import(ImportService importService, ZipSource zipSource) throws AtlasBaseException, IOException { + public static void runAndVerifyQuickStart_v1_Import(ImportService importService, InputStream is) throws AtlasBaseException, IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IOUtils.copy(is, baos); + + ZipSource zipSource = new ZipSource(new ByteArrayInputStream(baos.toByteArray())); AtlasExportResult exportResult = zipSource.getExportResult(); List<String> creationOrder = zipSource.getCreationOrder(); @@ -353,7 +364,7 @@ public class ZipFileResourceTestUtils { RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); AtlasImportRequest request = getDefaultImportRequest(); - AtlasImportResult result = runImportWithParameters(importService, request, zipSource); + AtlasImportResult result = runImportWithParameters(importService, request, new ByteArrayInputStream(baos.toByteArray())); assertNotNull(result); verifyImportedMetrics(exportResult, result); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java index 7436dc0..46164e8 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java @@ -28,6 +28,7 @@ import org.testng.annotations.Test; import java.io.ByteArrayInputStream; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.List; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; @@ -127,7 +128,8 @@ public class ZipSourceTest { } @Test(dataProvider = "sales") - public void iteratorSetPositionBehavor(ZipSource zipSource) throws IOException, AtlasBaseException { + public void iteratorSetPositionBehavor(InputStream inputStream) throws IOException, AtlasBaseException { + ZipSource zipSource = new ZipSource(inputStream); Assert.assertTrue(zipSource.hasNext()); List<String> creationOrder = zipSource.getCreationOrder(); diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java index 9d6d057..6f9c05e 100644 --- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationTest.java @@ -51,6 +51,7 @@ import org.testng.annotations.Test; import javax.inject.Inject; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -601,9 +602,8 @@ public class ClassificationPropagationTest { } } - public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException { - FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); - return new ZipSource(fs); + public static InputStream getZipSource(String fileName) throws IOException { + return ZipFileResourceTestUtils.getFileInputStream(fileName); } private void loadSampleClassificationDefs() throws AtlasBaseException { diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java index b2f2633..baeafd4 100644 --- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java @@ -39,6 +39,7 @@ import org.testng.annotations.Test; import javax.inject.Inject; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.time.Clock; import java.time.Instant; import java.time.ZoneId; @@ -246,9 +247,8 @@ public class MetricsServiceTest { } } - public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException { - FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); - return new ZipSource(fs); + public static InputStream getZipSource(String fileName) throws AtlasBaseException { + return ZipFileResourceTestUtils.getFileInputStream(fileName); } private static class TestClock extends Clock { diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index e78fcb6..464d46f 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -45,7 +45,6 @@ import org.apache.atlas.repository.impexp.ExportService; import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.MigrationProgressService; import org.apache.atlas.repository.impexp.ZipSink; -import org.apache.atlas.repository.impexp.ZipSource; import org.apache.atlas.repository.patches.AtlasPatchManager; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.services.MetricsService; @@ -404,9 +403,8 @@ public class AdminResource { try { AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class); - ZipSource zipSource = new ZipSource(inputStream); - result = importService.run(zipSource, request, AtlasAuthorizationUtils.getCurrentUserName(), + result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest), Servlets.getHostName(httpServletRequest), AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest)); } catch (AtlasBaseException excp) {
