This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit a2ccfb9f3577e911103041d8d4b91c169697f6a1 Author: Ashutosh Mestry <[email protected]> AuthorDate: Thu Feb 20 11:46:37 2020 -0800 ATLAS-3320: Import Service. Support concurrent ingest. --- .../repository/graphdb/janus/AtlasJanusGraph.java | 2 +- .../java/org/apache/atlas/AtlasConfiguration.java | 1 + .../atlas/model/impexp/AtlasImportRequest.java | 43 +++- .../java/org/apache/atlas/pc/WorkItemConsumer.java | 11 +- .../java/org/apache/atlas/pc/WorkItemManager.java | 9 +- .../apache/atlas/GraphTransactionInterceptor.java | 4 + .../atlas/repository/impexp/AuditsWriter.java | 3 +- .../atlas/repository/impexp/ImportService.java | 22 +- .../repository/impexp/ZipExportFileNames.java | 4 + .../atlas/repository/impexp/ZipSourceDirect.java | 269 +++++++++++++++++++++ .../migration/ZipFileMigrationImporter.java | 58 ++++- .../repository/patches/UniqueAttributePatch.java | 4 +- .../repository/store/graph/AtlasEntityStore.java | 8 + .../store/graph/v2/AtlasEntityStoreV2.java | 11 +- .../store/graph/v2/AtlasRelationshipStoreV2.java | 4 + .../store/graph/v2/BulkImporterImpl.java | 228 ++++------------- .../store/graph/v2/EntityGraphMapper.java | 41 +++- .../graph/v2/bulkimport/ImportStrategy.java} | 22 +- .../store/graph/v2/bulkimport/MigrationImport.java | 122 ++++++++++ .../RegularImport.java} | 76 ++---- .../graph/v2/bulkimport/pc/EntityConsumer.java | 213 ++++++++++++++++ .../v2/bulkimport/pc/EntityConsumerBuilder.java | 50 ++++ .../v2/bulkimport/pc/EntityCreationManager.java | 130 ++++++++++ .../graph/v2/bulkimport/pc/StatusReporter.java | 131 ++++++++++ .../atlas/repository/impexp/ImportServiceTest.java | 16 ++ .../repository/impexp/MigrationImportTest.java | 77 ++++++ .../repository/impexp/StatusReporterTest.java | 99 ++++++++ .../atlas/repository/impexp/ZipDirectTest.java | 61 +++++ .../impexp/ZipFileResourceTestUtils.java | 7 +- repository/src/test/resources/zip-direct-1.zip | Bin 0 -> 22 bytes repository/src/test/resources/zip-direct-2.zip | Bin 0 -> 1720553 bytes 31 files changed, 1432 insertions(+), 294 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index 4acb371..0176ba7 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE } } - janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance(); + janusGraph = (StandardJanusGraph) graphInstance; } @Override diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 1a0d0cc..f8d7f8c 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -64,6 +64,7 @@ public enum AtlasConfiguration { CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500), LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50), IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""), + MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position", 0), LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false); private static final Configuration APPLICATION_PROPERTIES; diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index 0b3ede9..0ad3673 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -44,10 +44,16 @@ public class AtlasImportRequest implements Serializable { public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMERS_KEY = "transformers"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; - private static final String START_POSITION_KEY = "startPosition"; + public static final String OPTION_KEY_MIGRATION = "migration"; + public static final String OPTION_KEY_NUM_WORKERS = "numWorkers"; + public static final String OPTION_KEY_BATCH_SIZE = "batchSize"; + public static final String OPTION_KEY_FORMAT = "format"; + public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect"; + public static final String START_POSITION_KEY = "startPosition"; private static final String START_GUID_KEY = "startGuid"; private static final String FILE_NAME_KEY = "fileName"; private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition"; + private static final String OPTION_KEY_STREAM_SIZE = "size"; private Map<String, String> options; @@ -108,7 +114,7 @@ public class AtlasImportRequest implements Serializable { return null; } - return (String) this.options.get(key); + return this.options.get(key); } @JsonIgnore @@ -121,10 +127,41 @@ public class AtlasImportRequest implements Serializable { return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY; } + @JsonIgnore + public int getOptionKeyNumWorkers() { + return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1); + } + + @JsonIgnore + public int getOptionKeyBatchSize() { + return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1); + } + + private int getOptionsValue(String optionKeyBatchSize, int defaultValue) { + String optionsValue = getOptionForKey(optionKeyBatchSize); + + return StringUtils.isEmpty(optionsValue) ? + defaultValue : + Integer.valueOf(optionsValue); + } + @JsonAnySetter public void setOption(String key, String value) { if (null == options) { options = new HashMap<>(); } options.put(key, value); - }} + } + + public void setSizeOption(int size) { + setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size)); + } + + public int getSizeOption() { + if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) { + return 1; + } + + return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE)); + } +} diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java index 9ba4bf4..dd76697 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java @@ -21,6 +21,7 @@ package org.apache.atlas.pc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { private final AtomicBoolean isDirty = new AtomicBoolean(false); private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS); private CountDownLatch countdownLatch; - private BlockingQueue<Object> results; + private Queue<Object> results; public WorkItemConsumer(BlockingQueue<T> queue) { this.queue = queue; @@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { protected abstract void processItem(T item); protected void addResult(Object value) { - try { - results.put(value); - } catch (InterruptedException e) { - LOG.error("Interrupted while adding result: {}", value); - } + results.add(value); } protected void updateCommitTime(long commitTime) { @@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { this.countdownLatch = countdownLatch; } - public <V> void setResults(BlockingQueue<Object> queue) { + public <V> void setResults(Queue<Object> queue) { this.results = queue; } } diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java index a7ba67c..351421e 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.concurrent.*; public class WorkItemManager<T, U extends WorkItemConsumer> { @@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { private final ExecutorService service; private final List<U> consumers = new ArrayList<>(); private CountDownLatch countdownLatch; - private BlockingQueue<Object> resultsQueue; + private Queue<Object> resultsQueue; public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) { this.numWorkers = numWorkers; @@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { this(builder, "workItemConsumer", batchSize, numWorkers, false); } - public void setResultsCollection(BlockingQueue<Object> resultsQueue) { + public void setResultsCollection(Queue<Object> resultsQueue) { this.resultsQueue = resultsQueue; } private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) { if (collectResults) { - setResultsCollection(new LinkedBlockingQueue<>()); + setResultsCollection(new ConcurrentLinkedQueue<>()); } for (int i = 0; i < numWorkers; i++) { @@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { LOG.info("WorkItemManager: Shutdown done!"); } - public BlockingQueue getResults() { + public Queue getResults() { return this.resultsQueue; } diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index bbe0dc5..57e454a 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor { return cache.get(guid); } + public static void clearCache() { + guidVertexCache.get().clear(); + } + boolean logException(Throwable t) { if (t instanceof AtlasBaseException) { Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode(); diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index 55990f7..373921d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -247,7 +247,8 @@ public class AuditsWriter { } updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids, - Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker()); + Constants.ATTR_NAME_REPLICATED_FROM, + (result.getExportResult() != null) ? result.getExportResult().getChangeMarker() : 0); } public void add(String userName, String sourceCluster, long startTime, diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 27001e3..cd1deab 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -92,7 +92,7 @@ public class ImportService { request = new AtlasImportRequest(); } - EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); + EntityImportStream source = createZipSource(request, inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); return run(source, request, userName, hostName, requestingIP); } @@ -248,8 +248,18 @@ public class ImportService { return (int) (endTime - startTime); } - private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { + private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { try { + if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) { + LOG.info("Migration mode: Detected...", request.getOptions().get("size")); + return getZipDirectEntityImportStream(request, inputStream); + } + + if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) && + request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) { + return getZipDirectEntityImportStream(request, inputStream); + } + if (StringUtils.isEmpty(configuredTemporaryDirectory)) { return new ZipSource(inputStream); } @@ -260,9 +270,15 @@ public class ImportService { } } + private EntityImportStream getZipDirectEntityImportStream(AtlasImportRequest request, InputStream inputStream) throws IOException, AtlasBaseException { + ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, request.getSizeOption()); + LOG.info("Using ZipSourceDirect: Size: {} entities", zipSourceDirect.size()); + return zipSourceDirect; + } + @VisibleForTesting boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) { - if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) { + if (exportRequest == null || CollectionUtils.isEmpty(exportRequest.getItemsToExport())) { return false; } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java index 351b475..8347b91 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java @@ -31,4 +31,8 @@ public enum ZipExportFileNames { public String toString() { return this.name; } + + public String toEntryFileName() { + return this.name + ".json"; + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java new file mode 100644 index 0000000..260c4af --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.entitytransform.BaseEntityHandler; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP; + +public class ZipSourceDirect implements EntityImportStream { + private static final Logger LOG = LoggerFactory.getLogger(ZipSourceDirect.class); + + private final ZipInputStream zipInputStream; + private int currentPosition; + + private ImportTransforms importTransform; + private List<BaseEntityHandler> entityHandlers; + private AtlasTypesDef typesDef; + private ZipEntry zipEntryNext; + private int streamSize = 1; + + public ZipSourceDirect(InputStream inputStream, int streamSize) throws IOException, AtlasBaseException { + this.zipInputStream = new ZipInputStream(inputStream); + this.streamSize = streamSize; + prepareStreamForFetch(); + } + + @Override + public ImportTransforms getImportTransform() { return this.importTransform; } + + @Override + public void setImportTransform(ImportTransforms importTransform) { + this.importTransform = importTransform; + } + + @Override + public List<BaseEntityHandler> getEntityHandlers() { + return entityHandlers; + } + + @Override + public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) { + this.entityHandlers = entityHandlers; + } + + @Override + public AtlasTypesDef getTypesDef() throws AtlasBaseException { + return this.typesDef; + } + + @Override + public + AtlasExportResult getExportResult() throws AtlasBaseException { + return new AtlasExportResult(); + } + + @Override + public List<String> getCreationOrder() { + return new ArrayList<>(); + } + + @Override + public int getPosition() { + return currentPosition; + } + + @Override + public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String json) throws AtlasBaseException { + if (StringUtils.isEmpty(json)) { + return null; + } + + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json); + + if (importTransform != null) { + entityWithExtInfo = importTransform.apply(entityWithExtInfo); + } + + if (entityHandlers != null) { + applyTransformers(entityWithExtInfo); + } + + return entityWithExtInfo; + } + + @Override + public boolean hasNext() { + return (this.zipEntryNext != null + && !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toEntryFileName()) + && !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toEntryFileName())); + } + + @Override + public AtlasEntity next() { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo(); + + return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + } + + @Override + public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + try { + if (hasNext()) { + String json = moveNext(); + return getEntityWithExtInfo(json); + } + } catch (AtlasBaseException e) { + LOG.error("getNextEntityWithExtInfo", e); + } + return null; + } + + @Override + public void reset() { + currentPosition = 0; + } + + @Override + public AtlasEntity getByGuid(String guid) { + try { + return getEntity(guid); + } catch (AtlasBaseException e) { + LOG.error("getByGuid: {} failed!", guid, e); + return null; + } + } + + @Override + public void onImportComplete(String guid) { + } + + @Override + public void setPosition(int index) { + try { + for (int i = 0; i < index; i++) { + moveNextEntry(); + } + } + catch (IOException e) { + LOG.error("Error setting position: {}. Position may be beyond the stream size.", index); + } + } + + @Override + public void setPositionUsingEntityGuid(String guid) { + } + + @Override + public void close() { + } + + private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + if (entityWithExtInfo == null) { + return; + } + + transform(entityWithExtInfo.getEntity()); + + if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + transform(e); + } + } + } + + private void transform(AtlasEntity e) { + for (BaseEntityHandler handler : entityHandlers) { + handler.transform(e); + } + } + + private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { + try { + return AtlasType.fromJson(jsonData, clazz); + + } catch (Exception e) { + throw new AtlasBaseException("Error converting file to JSON.", e); + } + } + + private AtlasEntity getEntity(String guid) throws AtlasBaseException { + AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid); + return (extInfo != null) ? extInfo.getEntity() : null; + } + + public int size() { + return this.streamSize; + } + + private String moveNext() { + try { + moveNextEntry(); + return getJsonPayloadFromZipEntryStream(this.zipInputStream); + } catch (IOException e) { + LOG.error("moveNext failed!", e); + } + + return null; + } + + private void moveNextEntry() throws IOException { + this.zipEntryNext = this.zipInputStream.getNextEntry(); + this.currentPosition++; + } + + private void prepareStreamForFetch() throws AtlasBaseException, IOException { + moveNextEntry(); + if (this.zipEntryNext == null) { + throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP."); + } + + if (this.zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) { + String json = getJsonPayloadFromZipEntryStream(this.zipInputStream); + this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class); + } + } + + private String getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) { + try { + final int BUFFER_LENGTH = 4096; + byte[] buf = new byte[BUFFER_LENGTH]; + + int n = 0; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + while ((n = zipInputStream.read(buf, 0, BUFFER_LENGTH)) > -1) { + bos.write(buf, 0, n); + } + + return bos.toString(); + } catch (IOException ex) { + LOG.error("Error fetching string from entry!", ex); + } + + return null; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index ca0bc41..69d78cd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -24,6 +24,7 @@ import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.repository.impexp.ImportService; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,11 +33,20 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; +import java.util.zip.ZipFile; + +import static org.apache.atlas.AtlasConfiguration.MIGRATION_IMPORT_START_POSITION; public class ZipFileMigrationImporter implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class); - private static String ENV_USER_NAME = "user.name"; + private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers"; + private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size"; + private static final String DEFAULT_NUMBER_OF_WORKDERS = "4"; + private static final String DEFAULT_BATCH_SIZE = "100"; + private static final String ZIP_FILE_COMMENT = "streamSize"; + + private final static String ENV_USER_NAME = "user.name"; private final ImportService importService; private final String fileToImport; @@ -52,7 +62,8 @@ public class ZipFileMigrationImporter implements Runnable { FileWatcher fileWatcher = new FileWatcher(fileToImport); fileWatcher.start(); - performImport(new FileInputStream(new File(fileToImport))); + int streamSize = getStreamSizeFromComment(fileToImport); + performImport(new FileInputStream(new File(fileToImport)), streamSize); } catch (IOException e) { LOG.error("Migration Import: IO Error!", e); } catch (AtlasBaseException e) { @@ -60,19 +71,44 @@ public class ZipFileMigrationImporter implements Runnable { } } - private void performImport(InputStream fs) throws AtlasBaseException { + private int getStreamSizeFromComment(String fileToImport) { + int ret = 1; + try { + ZipFile zipFile = new ZipFile(fileToImport); + String streamSizeComment = zipFile.getComment(); + ret = processZipFileStreamSizeComment(streamSizeComment); + zipFile.close(); + } catch (IOException e) { + LOG.error("Error opening ZIP file: {}", fileToImport, e); + } + + return ret; + } + + private int processZipFileStreamSizeComment(String streamSizeComment) { + if (!StringUtils.isNotEmpty(streamSizeComment) || !StringUtils.startsWith(streamSizeComment, ZIP_FILE_COMMENT)) { + return 1; + } + + String s = StringUtils.substringAfter(streamSizeComment, ":"); + LOG.debug("ZipFileMigrationImporter: streamSize: {}", streamSizeComment); + + return Integer.valueOf(s); + } + + private void performImport(InputStream fs, int streamSize) throws AtlasBaseException { try { LOG.info("Migration Import: {}: Starting...", fileToImport); RequestContext.get().setUser(getUserNameFromEnvironment(), null); - importService.run(fs, getImportRequest(), + importService.run(fs, getImportRequest(streamSize), getUserNameFromEnvironment(), InetAddress.getLocalHost().getHostName(), InetAddress.getLocalHost().getHostAddress()); } catch (Exception ex) { - LOG.error("Error loading zip for migration", ex); + LOG.error("Migration Import: Error loading zip for migration!", ex); throw new AtlasBaseException(ex); } finally { LOG.info("Migration Import: {}: Done!", fileToImport); @@ -83,8 +119,16 @@ public class ZipFileMigrationImporter implements Runnable { return System.getProperty(ENV_USER_NAME); } - private AtlasImportRequest getImportRequest() throws AtlasException { - return new AtlasImportRequest(); + private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException { + AtlasImportRequest request = new AtlasImportRequest(); + + request.setSizeOption(streamSize); + request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true"); + request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS)); + request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE)); + request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt())); + + return request; } private String getPropertyValue(String property, String defaultValue) throws AtlasException { diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java index 2b58119..bee6378 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java @@ -81,9 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler { AtlasGraph graph = getGraph(); for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) { - LOG.info("finding entities of type {}", entityType.getTypeName()); - + LOG.info("finding entities of type: {}", entityType.getTypeName()); Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds(); + LOG.info("found entities of type: {}", entityType.getTypeName()); int count = 0; for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 39ea3f8..805531c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -150,6 +150,14 @@ public interface AtlasEntityStore { EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException; /** + * Create or update entities with parameters necessary for import process without commit. Caller will have to do take care of commit. + * @param entityStream AtlasEntityStream + * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed + * @throws AtlasBaseException + */ + EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException; + + /** * Update a single entity * @param objectId ID of the entity * @param updatedEntityInfo updated entity information diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 30f5e5a..6f6ee17 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -332,6 +332,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { } @Override + public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException { + return createOrUpdate(entityStream, false, true, true); + } + + @Override @GraphTransaction public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException { if (LOG.isDebugEnabled()) { @@ -1210,8 +1215,10 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ret.setGuidAssignments(context.getGuidAssignments()); - // Notify the change listeners - entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); + if (!RequestContext.get().isImportInProgress()) { + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress()); + } if (LOG.isDebugEnabled()) { LOG.debug("<== createOrUpdate()"); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java index fdf117a..857b709 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java @@ -929,6 +929,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { } private void sendNotifications(AtlasRelationship ret, OperationType relationshipUpdate) throws AtlasBaseException { + if (entityChangeNotifier == null) { + return; + } + entityChangeNotifier.notifyPropagatedEntities(); if (notificationsEnabled){ entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java index 54c32c5..4526002 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java @@ -18,33 +18,30 @@ package org.apache.atlas.repository.store.graph.v2; import com.google.common.annotations.VisibleForTesting; -import org.apache.atlas.AtlasConfiguration; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.RequestContext; -import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.BulkImporter; +import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy; +import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport; +import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.Constants; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -55,131 +52,24 @@ public class BulkImporterImpl implements BulkImporter { private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); private final AtlasEntityStore entityStore; - private final EntityGraphRetriever entityGraphRetriever; private AtlasTypeRegistry typeRegistry; - private final int MAX_ATTEMPTS = 2; - private boolean directoryBasedImportConfigured; @Inject public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.entityStore = entityStore; - this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.typeRegistry = typeRegistry; - this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); } @Override public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> bulkImport()"); - } - - if (entityStream == null || !entityStream.hasNext()) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); - } - - EntityMutationResponse ret = new EntityMutationResponse(); - ret.setGuidAssignments(new HashMap<>()); - - Set<String> processedGuids = new HashSet<>(); - float currentPercent = 0f; - List<String> residualList = new ArrayList<>(); - - EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList); - - while (entityImportStreamWithResidualList.hasNext()) { - AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo(); - AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - - if (entity == null) { - continue; - } - - for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { - try { - AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); - EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream); - - if (resp.getGuidAssignments() != null) { - ret.getGuidAssignments().putAll(resp.getGuidAssignments()); - } - - currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, - entityStream.getPosition(), - entityImportStreamWithResidualList.getStreamSize(), - currentPercent); - - entityStream.onImportComplete(entity.getGuid()); - break; - } catch (AtlasBaseException e) { - if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) { - throw e; - } - break; - } catch (AtlasSchemaViolationException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Entity: {}", entity.getGuid(), e); - } - - if (attempt == 0) { - updateVertexGuid(entity); - } else { - LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid()); - throw e; - } - } catch (Throwable e) { - AtlasBaseException abe = new AtlasBaseException(e); - if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) { - throw abe; - } - - LOG.warn("Exception: {}", entity.getGuid(), e); - break; - } finally { - RequestContext.get().clearCache(); - } - } - } - - importResult.getProcessedEntities().addAll(processedGuids); - LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size()); - - return ret; - } - - @GraphTransaction - public void updateVertexGuid(AtlasEntity entity) { - String entityGuid = entity.getGuid(); - AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity); - - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - String vertexGuid = null; - try { - vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes()); - } catch (AtlasBaseException e) { - LOG.warn("Entity: {}: Does not exist!", objectId); - return; - } - - if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) { - return; - } - - AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid); - if (v == null) { - return; - } - - addHistoricalGuid(v, vertexGuid); - AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid); - - LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid()); - } - - private void addHistoricalGuid(AtlasVertex v, String vertexGuid) { - String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class); - - AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid)); + ImportStrategy importStrategy = + (importResult.getRequest().getOptions() != null && + importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) + ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry) + : new RegularImport(this.entityStore, this.typeRegistry); + + LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName()); + return importStrategy.run(entityStream, importResult); } @VisibleForTesting @@ -193,38 +83,16 @@ public class BulkImporterImpl implements BulkImporter { return json; } - private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) { - if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) { - return false; - } - - lineageList.add(guid); - - return true; - } - - private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity, - EntityMutationResponse resp, - AtlasImportResult importResult, - Set<String> processedGuids, - int currentIndex, int streamSize, float currentPercent) { - if (!directoryBasedImportConfigured) { - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); - } - - String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); - - return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); - } - @VisibleForTesting - static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { + public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { final double tolerance = 0.000001; final int MAX_PERCENT = 100; int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex; + if (maxSize <= 0) { + return currentPercent; + } + float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize); boolean updateLog = Double.compare(percent, currentPercent) > tolerance; float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); @@ -236,7 +104,7 @@ public class BulkImporterImpl implements BulkImporter { return updatedPercent; } - private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { + public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { if (list == null) { return; } @@ -251,41 +119,37 @@ public class BulkImporterImpl implements BulkImporter { } } - private static class EntityImportStreamWithResidualList { - private final EntityImportStream stream; - private final List<String> residualList; - private boolean navigateResidualList; - private int currentResidualListIndex; - + public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) { + String entityGuid = entity.getGuid(); + AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity); - public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) { - this.stream = stream; - this.residualList = residualList; - this.navigateResidualList = false; - this.currentResidualListIndex = 0; + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + String vertexGuid = null; + try { + vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes()); + } catch (AtlasBaseException e) { + LOG.warn("Entity: {}: Does not exist!", objectId); + return; } - public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { - if (navigateResidualList == false) { - return stream.getNextEntityWithExtInfo(); - } else { - stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++)); - return stream.getNextEntityWithExtInfo(); - } + if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) { + return; } - public boolean hasNext() { - if (!navigateResidualList) { - boolean streamHasNext = stream.hasNext(); - navigateResidualList = (streamHasNext == false); - return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size()); - } else { - return (currentResidualListIndex < residualList.size()); - } + AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid); + if (v == null) { + return; } - public int getStreamSize() { - return stream.size() + residualList.size(); - } + addHistoricalGuid(v, vertexGuid); + AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid); + + LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid()); + } + + public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) { + String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class); + + AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid)); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 2f3aad0..e76b341 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -361,7 +361,9 @@ public class EntityGraphMapper { updateLabels(vertex, labels); - entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels); + if (entityChangeNotifier != null) { + entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels); + } } public void addLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException { @@ -378,7 +380,10 @@ public class EntityGraphMapper { if (!updatedLabels.equals(existingLabels)) { updateLabels(vertex, updatedLabels); updatedLabels.removeAll(existingLabels); - entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null); + + if (entityChangeNotifier != null) { + entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null); + } } } } @@ -395,7 +400,10 @@ public class EntityGraphMapper { if (!updatedLabels.equals(existingLabels)) { updateLabels(vertex, updatedLabels); existingLabels.removeAll(updatedLabels); - entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels); + + if (entityChangeNotifier != null) { + entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels); + } } } } @@ -1948,7 +1956,9 @@ public class EntityGraphMapper { Set<AtlasVertex> vertices = addedClassifications.get(classification); List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices); - entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification)); + if (entityChangeNotifier != null) { + entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification)); + } } RequestContext.get().endMetricRecord(metric); @@ -2056,7 +2066,10 @@ public class EntityGraphMapper { AtlasEntity entity = updateClassificationText(entry.getKey()); List<AtlasClassification> deletedClassificationNames = entry.getValue(); - entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames); + + if (entityChangeNotifier != null) { + entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames); + } } } @@ -2283,17 +2296,19 @@ public class EntityGraphMapper { notificationVertices.addAll(entitiesToPropagateTo); } - for (AtlasVertex vertex : notificationVertices) { - String entityGuid = GraphHelper.getGuid(vertex); - AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); + if (entityChangeNotifier != null) { + for (AtlasVertex vertex : notificationVertices) { + String entityGuid = GraphHelper.getGuid(vertex); + AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); - if (isActive(entity)) { - vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity)); - entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications); + if (isActive(entity)) { + vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity)); + entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications); + } } } - if (MapUtils.isNotEmpty(removedPropagations)) { + if (entityChangeNotifier != null && MapUtils.isNotEmpty(removedPropagations)) { for (AtlasClassification classification : removedPropagations.keySet()) { List<AtlasVertex> propagatedVertices = removedPropagations.get(classification); List<AtlasEntity> propagatedEntities = updateClassificationText(classification, propagatedVertices); @@ -2526,7 +2541,7 @@ public class EntityGraphMapper { private List<AtlasEntity> updateClassificationText(AtlasClassification classification, Collection<AtlasVertex> propagatedVertices) throws AtlasBaseException { List<AtlasEntity> propagatedEntities = new ArrayList<>(); - if(CollectionUtils.isNotEmpty(propagatedVertices)) { + if (fullTextMapperV2 != null && CollectionUtils.isNotEmpty(propagatedVertices)) { for(AtlasVertex vertex : propagatedVertices) { AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java similarity index 58% copy from repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java index 351b475..6b70eab 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,20 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.repository.impexp; -public enum ZipExportFileNames { - ATLAS_EXPORT_INFO_NAME("atlas-export-info"), - ATLAS_EXPORT_ORDER_NAME("atlas-export-order"), - ATLAS_TYPESDEF_NAME("atlas-typesdef"); +package org.apache.atlas.repository.store.graph.v2.bulkimport; - public final String name; - ZipExportFileNames(String name) { - this.name = name; - } +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; - @Override - public String toString() { - return this.name; - } +public abstract class ImportStrategy { + public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java new file mode 100644 index 0000000..8c66656 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2.bulkimport; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.converters.AtlasFormatConverters; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; +import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder; +import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MigrationImport extends ImportStrategy { + private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class); + + private final AtlasTypeRegistry typeRegistry; + private AtlasGraph atlasGraph; + private EntityGraphRetriever entityGraphRetriever; + private EntityGraphMapper entityGraphMapper; + private AtlasEntityStore entityStore; + + public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) { + this.typeRegistry = typeRegistry; + setupEntityStore(atlasGraphProvider, typeRegistry); + LOG.info("MigrationImport: Using bulkLoading..."); + } + + public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { + if (entityStream == null || !entityStream.hasNext()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); + } + + if (importResult.getRequest() == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request"); + } + + int index = 0; + int streamSize = entityStream.size(); + EntityMutationResponse ret = new EntityMutationResponse(); + EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize); + + try { + LOG.info("Migration Import: Size: {}: Starting...", streamSize); + index = creationManager.read(entityStream); + creationManager.drain(); + creationManager.extractResults(); + } catch (Exception ex) { + LOG.error("Migration Import: Error: Current position: {}", index, ex); + } finally { + shutdownEntityCreationManager(creationManager); + } + + LOG.info("Migration Import: Size: {}: Done!", streamSize); + return ret; + } + + private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) { + int batchSize = importResult.getRequest().getOptionKeyBatchSize(); + int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers()); + + EntityConsumerBuilder consumerBuilder = + new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize); + + return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize); + } + + private static int getNumWorkers(int numWorkersFromOptions) { + int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1; + LOG.info("Migration Import: Setting numWorkers: {}", ret); + return ret; + } + + private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) { + this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); + this.atlasGraph = atlasGraphProvider.getBulkLoading(); + DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry); + + AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, null); + AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry); + AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters); + this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, null, instanceConverter, null); + this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, null, entityGraphMapper); + } + + private void shutdownEntityCreationManager(EntityCreationManager creationManager) { + try { + creationManager.shutdown(); + } catch (InterruptedException e) { + LOG.error("Migration Import: Shutdown: Interrupted!", e); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java similarity index 80% copy from repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java index 54c32c5..4cc8ed4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java @@ -6,16 +6,18 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.repository.store.graph.v2; + +package org.apache.atlas.repository.store.graph.v2.bulkimport; + import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.AtlasConfiguration; @@ -33,15 +35,17 @@ import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.BulkImporter; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; -import javax.inject.Inject; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -49,27 +53,25 @@ import java.util.List; import java.util.Set; import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress; -@Component -public class BulkImporterImpl implements BulkImporter { - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); - +public class RegularImport extends ImportStrategy { + private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class); + private static final int MAX_ATTEMPTS = 3; private final AtlasEntityStore entityStore; + private final AtlasTypeRegistry typeRegistry; private final EntityGraphRetriever entityGraphRetriever; - private AtlasTypeRegistry typeRegistry; - private final int MAX_ATTEMPTS = 2; private boolean directoryBasedImportConfigured; - @Inject - public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { + public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.entityStore = entityStore; - this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.typeRegistry = typeRegistry; + this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString()); } @Override - public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { + public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> bulkImport()"); } @@ -81,7 +83,7 @@ public class BulkImporterImpl implements BulkImporter { EntityMutationResponse ret = new EntityMutationResponse(); ret.setGuidAssignments(new HashMap<>()); - Set<String> processedGuids = new HashSet<>(); + Set<String> processedGuids = new HashSet<>(); float currentPercent = 0f; List<String> residualList = new ArrayList<>(); @@ -209,9 +211,9 @@ public class BulkImporterImpl implements BulkImporter { Set<String> processedGuids, int currentIndex, int streamSize, float currentPercent) { if (!directoryBasedImportConfigured) { - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); + BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); + BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); + BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); } String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); @@ -219,38 +221,6 @@ public class BulkImporterImpl implements BulkImporter { return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); } - @VisibleForTesting - static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { - final double tolerance = 0.000001; - final int MAX_PERCENT = 100; - - int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex; - float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize); - boolean updateLog = Double.compare(percent, currentPercent) > tolerance; - float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); - - if (updateLog) { - log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), maxSize, additionalInfo); - } - - return updatedPercent; - } - - private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { - if (list == null) { - return; - } - - for (AtlasEntityHeader h : list) { - if (processedGuids.contains(h.getGuid())) { - continue; - } - - processedGuids.add(h.getGuid()); - importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName())); - } - } - private static class EntityImportStreamWithResidualList { private final EntityImportStream stream; private final List<String> residualList; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java new file mode 100644 index 0000000..bb74205 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; + +import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.pc.WorkItemConsumer; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport; +import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> { + private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class); + private static final int MAX_COMMIT_RETRY_COUNT = 3; + + private final int batchSize; + private AtomicLong counter = new AtomicLong(1); + private AtomicLong currentBatch = new AtomicLong(1); + + private final AtlasGraph atlasGraph; + private final AtlasEntityStore entityStoreV2; + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityGraphRetriever; + + private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>(); + private List<EntityMutationResponse> localResults = new ArrayList<>(); + + public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore, + EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, + BlockingQueue queue, int batchSize) { + super(queue); + + this.atlasGraph = atlasGraph; + this.entityStoreV2 = entityStore; + this.entityGraphRetriever = entityGraphRetriever; + this.typeRegistry = typeRegistry; + this.batchSize = batchSize; + } + + @Override + protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities()) + ? 1 + : entityWithExtInfo.getReferredEntities().size()) + 1; + + long currentCount = counter.addAndGet(delta); + currentBatch.addAndGet(delta); + entityBuffer.add(entityWithExtInfo); + + try { + processEntity(entityWithExtInfo, currentCount); + attemptCommit(); + } catch (Exception e) { + LOG.info("Data loss: Please re-submit!", e); + } + } + + private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) { + try { + RequestContext.get().setImportInProgress(true); + AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); + + LOG.debug("Processing: {}", currentCount); + EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream); + localResults.add(result); + } catch (AtlasBaseException e) { + addResult(entityWithExtInfo.getEntity().getGuid()); + LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e); + } catch (AtlasSchemaViolationException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e); + } + + BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity()); + } + } + + private void attemptCommit() { + if (currentBatch.get() < batchSize) { + return; + } + + doCommit(); + } + + @Override + protected void doCommit() { + for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) { + if (commitWithRetry(retryCount)) { + return; + } + } + + LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get()); + clear(); + } + + @Override + protected void commitDirty() { + super.commitDirty(); + LOG.info("Total: Commit: {}", counter.get()); + counter.set(0); + } + + private boolean commitWithRetry(int retryCount) { + try { + atlasGraph.commit(); + if (LOG.isDebugEnabled()) { + LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get()); + } + + dispatchResults(); + return true; + } catch (Exception ex) { + rollbackPauseRetry(retryCount, ex); + return false; + } + } + + private void rollbackPauseRetry(int retryCount, Exception ex) { + atlasGraph.rollback(); + clearCache(); + + LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount); + pause(retryCount); + if (ex.getClass().getName().endsWith("JanusGraphException") && retryCount >= MAX_COMMIT_RETRY_COUNT) { + LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex); + } else { + LOG.info("Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount); + } + retryProcessEntity(retryCount); + } + + private void retryProcessEntity(int retryCount) { + LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); + for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) { + processEntity(e, counter.get()); + } + LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); + } + + private void dispatchResults() { + localResults.stream().forEach(x -> { + addResultsFromResponse(x.getCreatedEntities()); + addResultsFromResponse(x.getUpdatedEntities()); + addResultsFromResponse(x.getDeletedEntities()); + }); + + clear(); + } + + private void pause(int retryCount) { + try { + Thread.sleep(1000 * retryCount); + } catch (InterruptedException e) { + LOG.error("pause: Interrupted!", e); + } + } + + private void addResultsFromResponse(List<AtlasEntityHeader> entities) { + if (CollectionUtils.isEmpty(entities)) { + return; + } + + for (AtlasEntityHeader eh : entities) { + addResult(eh.getGuid()); + } + } + + private void clear() { + localResults.clear(); + entityBuffer.clear(); + clearCache(); + currentBatch.set(0); + } + + private void clearCache() { + GraphTransactionInterceptor.clearCache(); + RequestContext.get().clearCache(); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java new file mode 100644 index 0000000..69d33b2 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.pc.WorkItemBuilder; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.type.AtlasTypeRegistry; + +import java.util.concurrent.BlockingQueue; + +public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> { + private AtlasGraph atlasGraph; + private AtlasEntityStore entityStore; + private final EntityGraphRetriever entityGraphRetriever; + private final AtlasTypeRegistry typeRegistry; + private int batchSize; + + public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore, + EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) { + this.atlasGraph = atlasGraph; + this.entityStore = entityStore; + this.entityGraphRetriever = entityGraphRetriever; + this.typeRegistry = typeRegistry; + this.batchSize = batchSize; + } + + @Override + public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) { + return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java new file mode 100644 index 0000000..0051941 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; + +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.pc.WorkItemBuilder; +import org.apache.atlas.pc.WorkItemManager; +import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; +import org.apache.atlas.repository.store.graph.v2.EntityImportStream; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager { + private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class); + private static final String WORKER_PREFIX = "migration-import"; + + private final StatusReporter<String, String> statusReporter; + private final AtlasImportResult importResult; + private final int streamSize; + private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min + private String currentTypeName; + private float currentPercent; + + public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) { + super(builder, WORKER_PREFIX, batchSize, numWorkers, true); + this.importResult = importResult; + this.streamSize = streamSize; + + this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION); + } + + public int read(EntityImportStream entityStream) { + int currentIndex = 0; + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; + while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) { + AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + if (entity == null) { + continue; + } + + try { + produce(currentIndex++, entity.getTypeName(), entityWithExtInfo); + } catch (Throwable e) { + LOG.warn("Exception: {}", entity.getGuid(), e); + break; + } + } + return currentIndex; + } + + private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + String previousTypeName = getCurrentTypeName(); + + if (StringUtils.isNotEmpty(typeName) + && StringUtils.isNotEmpty(previousTypeName) + && !StringUtils.equals(previousTypeName, typeName)) { + LOG.info("Waiting: '{}' to complete...", previousTypeName); + super.drain(); + LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName); + } + + setCurrentTypeName(typeName); + statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex)); + super.checkProduce(entityWithExtInfo); + extractResults(); + } + + public void extractResults() { + Object result; + while (((result = getResults().poll())) != null) { + statusReporter.processed((String) result); + } + + logStatus(); + } + + private void logStatus() { + String ack = statusReporter.ack(); + if (StringUtils.isEmpty(ack)) { + return; + } + + String[] split = ack.split(":"); + if (split.length == 0 || split.length < 2) { + return; + } + + importResult.incrementMeticsCounter(split[0]); + this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent()); + } + + private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) { + String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex); + return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); + } + + private String getCurrentTypeName() { + return this.currentTypeName; + } + + private void setCurrentTypeName(String typeName) { + this.currentTypeName = typeName; + } + + private float getCurrentPercent() { + return this.currentPercent; + } + + private int getStreamSize() { + return this.streamSize; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java new file mode 100644 index 0000000..1cd9860 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2.bulkimport.pc; + +import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +public class StatusReporter<T, U> { + private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class); + + private Map<T,U> producedItems = new LinkedHashMap<>(); + private Set<T> processedSet = new HashSet<>(); + private TypesUtil.Pair<T, Long> watchedItem; + private final long timeOut; + + public StatusReporter(long timeOut) { + this.timeOut = timeOut; + } + + public void produced(T item, U index) { + this.producedItems.put(item, index); + } + + public void processed(T item) { + this.processedSet.add(item); + } + + public void processed(T[] index) { + this.processedSet.addAll(Arrays.asList(index)); + } + + public U ack() { + U ack = null; + U ret; + Map.Entry<T, U> firstElement; + do { + firstElement = getFirstElement(this.producedItems); + ret = completionIndex(firstElement); + if (ret != null) { + ack = ret; + } + } while(ret != null); + + return addToWatchIfNeeded(ack, firstElement); + } + + private U addToWatchIfNeeded(U ack, Map.Entry<T, U> firstElement) { + if (ack == null && firstElement != null) { + ack = addToWatch(firstElement.getKey()); + } else { + resetWatchItem(); + } + return ack; + } + + private void resetWatchItem() { + this.watchedItem = null; + } + + private U addToWatch(T key) { + createNewWatchItem(key); + if (!hasTimedOut(this.watchedItem)) { + return null; + } + + T producedItemKey = this.watchedItem.left; + resetWatchItem(); + LOG.warn("Item: {}: Was produced but not successfully processed!", producedItemKey); + return this.producedItems.get(producedItemKey); + + } + + private void createNewWatchItem(T key) { + if (this.watchedItem != null) { + return; + } + + this.watchedItem = new TypesUtil.Pair<T, Long>(key, System.currentTimeMillis()); + } + + private boolean hasTimedOut(TypesUtil.Pair<T, Long> watchedItem) { + if (watchedItem == null) { + return false; + } + + return (System.currentTimeMillis() - watchedItem.right) >= timeOut; + } + + private Map.Entry<T, U> getFirstElement(Map<T, U> map) { + if (map.isEmpty()) { + return null; + } + + return map.entrySet().iterator().next(); + } + + private U completionIndex(Map.Entry<T, U> lookFor) { + U ack = null; + if (lookFor == null || !processedSet.contains(lookFor.getKey())) { + return ack; + } + + ack = lookFor.getValue(); + producedItems.remove(lookFor.getKey()); + processedSet.remove(lookFor); + return ack; + } +} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index c14850f..759be64 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -136,6 +136,11 @@ public class ImportServiceTest extends ExportImportTestBase { return getZipSource("dup_col_deleted.zip"); } + @DataProvider(name = "zipDirect1") + public static Object[][] getZipDirect(ITestContext context) throws IOException, AtlasBaseException { + return getZipSource("dup_col_deleted.zip"); + } + @Test(dataProvider = "sales") public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException { loadBaseModel(); @@ -530,6 +535,17 @@ public class ImportServiceTest extends ExportImportTestBase { } } + @Test(dataProvider = "zipDirect1") + public void zipSourceDirect(InputStream inputStream) throws IOException, AtlasBaseException { + loadBaseModel(); + loadFsModel(); + loadHiveModel(); + + runImportWithNoParameters(importService, inputStream); + + } + + private AtlasImportRequest getImportRequest(String replicatedFrom){ AtlasImportRequest importRequest = getDefaultImportRequest(); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java new file mode 100644 index 0000000..2a22d88 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + + +import com.google.inject.Inject; +import org.apache.atlas.TestModules; +import org.apache.atlas.discovery.EntityDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.InputStream; + +import static org.testng.Assert.assertNotNull; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class MigrationImportTest extends ExportImportTestBase { + + private final ImportService importService; + + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private EntityDiscoveryService discoveryService; + + @Inject + AtlasEntityStore entityStore; + + @Inject + AtlasGraph atlasGraph; + + @Inject + public MigrationImportTest(ImportService importService) { + this.importService = importService; + } + + @Test + public void simpleImport() throws IOException, AtlasBaseException { + InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip"); + + AtlasImportRequest importRequest = new AtlasImportRequest(); + importRequest.setOption("migration", "true"); + + AtlasImportResult result = importService.run(inputStream, importRequest, null, null, null); + assertNotNull(result); + } +} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java new file mode 100644 index 0000000..5e15023 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.StatusReporter; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +public class StatusReporterTest { + @Test + public void noneProducedNoneReported() { + StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(100); + assertNull(statusReporter.ack()); + } + + @Test + public void producedButNotAcknowledged() { + StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(); + assertNull(statusReporter.ack()); + } + + @Test + public void producedAcknowledged() { + StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(); + statusReporter.processed(1); + + assertEquals(java.util.Optional.of(100).get(), statusReporter.ack()); + } + + @Test + public void producedAcknowledgeMaxAvailableInSequence() { + StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(); + + statusReporter.processed(new Integer[]{1, 3, 5}); + + assertEquals(java.util.Optional.of(100).get(), statusReporter.ack()); + } + + @Test + public void producedAcknowledgeMaxAvailableInSequence2() { + StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(); + statusReporter.processed(new Integer[]{1, 2, 3, 6, 5}); + + assertEquals(java.util.Optional.of(300).get(), statusReporter.ack()); + } + + @Test + public void producedSetDisjointWithAckSet() { + StatusReporter<Integer, Integer> statusReporter = new StatusReporter(100); + statusReporter.produced(11, 1000); + statusReporter.produced(12, 2000); + statusReporter.produced(13, 3000); + + statusReporter.processed(new Integer[]{1, 11, 12, 13}); + + assertEquals(java.util.Optional.of(3000).get(), statusReporter.ack()); + } + + @Test + public void missingAck() throws InterruptedException { + StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(2, 3, 4); + + assertNull(statusReporter.ack()); + Thread.sleep(1002); + assertEquals(java.util.Optional.of(100).get(), statusReporter.ack()); + } + + private StatusReporter<Integer, Integer> createStatusReportWithItems(Integer... processed) { + StatusReporter<Integer, Integer> statusReporter = new StatusReporter(1000); + statusReporter.produced(1, 100); + statusReporter.produced(2, 200); + statusReporter.produced(3, 300); + statusReporter.produced(4, 400); + statusReporter.produced(5, 500); + statusReporter.produced(6, 600); + + statusReporter.processed(processed); + + return statusReporter; + } +} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java new file mode 100644 index 0000000..d191d8c --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.InputStream; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +public class ZipDirectTest { + @Test(expectedExceptions = AtlasBaseException.class) + public void loadFileEmpty() throws IOException, AtlasBaseException { + InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip"); + new ZipSourceDirect(inputStream, 1); + } + + @Test + public void loadFile() throws IOException, AtlasBaseException { + final int EXPECTED_ENTITY_COUNT = 3434; + + InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip"); + ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, EXPECTED_ENTITY_COUNT); + + assertNotNull(zipSourceDirect); + assertNotNull(zipSourceDirect.getTypesDef()); + assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0); + assertNotNull(zipSourceDirect.getExportResult()); + + int count = 0; + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; + while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) != null) { + assertNotNull(entityWithExtInfo); + count++; + } + + assertEquals(count, EXPECTED_ENTITY_COUNT); + } +} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java index 0ffc3d5..27a6668 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java @@ -317,7 +317,9 @@ public class ZipFileResourceTestUtils { } public static AtlasImportRequest getDefaultImportRequest() { - return new AtlasImportRequest(); + AtlasImportRequest atlasImportRequest = new AtlasImportRequest(); + atlasImportRequest.setOption("migration", "true"); + return atlasImportRequest; } @@ -336,7 +338,8 @@ public class ZipFileResourceTestUtils { final String hostName = "localhost"; final String userName = "admin"; - AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP); + AtlasImportRequest request = getDefaultImportRequest(); + AtlasImportResult result = runImportWithParameters(importService, request, inputStream); assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); return result; } diff --git a/repository/src/test/resources/zip-direct-1.zip b/repository/src/test/resources/zip-direct-1.zip new file mode 100644 index 0000000..15cb0ec Binary files /dev/null and b/repository/src/test/resources/zip-direct-1.zip differ diff --git a/repository/src/test/resources/zip-direct-2.zip b/repository/src/test/resources/zip-direct-2.zip new file mode 100644 index 0000000..e7b8617 Binary files /dev/null and b/repository/src/test/resources/zip-direct-2.zip differ
