This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 206c3ed4d6ac686ae03adedd16764f09024658bd Author: Ashutosh Mestry <[email protected]> AuthorDate: Mon Mar 16 09:58:27 2020 -0700 ATLAS-3663: ZipFileMigrator: Automatic Resume During Migration. (cherry picked from commit 3d0c98779f16c7f0c6faa2b441877458ea152675) --- .../atlas/model/impexp/AtlasImportRequest.java | 3 +- .../model/migration/MigrationImportStatus.java | 62 ++++++ .../java/org/apache/atlas/pc/StatusReporter.java | 41 +++- .../org/apache/atlas/pc/StatusReporterTest.java | 18 ++ .../atlas/repository/impexp/ImportService.java | 10 +- .../impexp/MigrationProgressService.java | 42 +++- .../migration/DataMigrationStatusService.java | 245 +++++++++++++++++++++ .../migration/ZipFileMigrationImporter.java | 43 +++- .../store/graph/v2/bulkimport/MigrationImport.java | 21 +- .../graph/v2/bulkimport/pc/EntityConsumer.java | 5 +- .../v2/bulkimport/pc/EntityCreationManager.java | 19 +- .../impexp/DataMigrationStatusServiceTest.java | 66 ++++++ 12 files changed, 548 insertions(+), 27 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index 09dafdf..2c18704 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -44,12 +44,13 @@ public class AtlasImportRequest implements Serializable { public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMERS_KEY = "transformers"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; + public static final String OPTION_KEY_MIGRATION_FILE_NAME = "migrationFileName"; public static final String OPTION_KEY_MIGRATION = "migration"; public static final String OPTION_KEY_NUM_WORKERS = "numWorkers"; public static final String OPTION_KEY_BATCH_SIZE = "batchSize"; public static final String OPTION_KEY_FORMAT = "format"; public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect"; - private static final String START_POSITION_KEY = "startPosition"; + public static final String START_POSITION_KEY = "startPosition"; private static final String START_GUID_KEY = "startGuid"; private static final String FILE_NAME_KEY = "fileName"; private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition"; diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java new file mode 100644 index 0000000..3430fda --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.model.migration; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.model.AtlasBaseModelObject; +import org.apache.atlas.model.impexp.MigrationStatus; +import org.apache.commons.lang.StringUtils; + +import java.io.Serializable; +import java.util.Date; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class MigrationImportStatus extends MigrationStatus { + private String name; + + public MigrationImportStatus() { + } + + public MigrationImportStatus(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(", name=").append(name); + sb.append(super.toString()); + return sb.toString(); + } +} diff --git a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java index f84e8d0..7baf973 100644 --- a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java +++ b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java @@ -18,6 +18,9 @@ package org.apache.atlas.pc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Arrays; import java.util.HashSet; import java.util.LinkedHashMap; @@ -25,8 +28,20 @@ import java.util.Map; import java.util.Set; public class StatusReporter<T, U> { + private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class); + private Map<T,U> producedItems = new LinkedHashMap<>(); private Set<T> processedSet = new HashSet<>(); + private long timeoutDuration; + private long lastAck; + + public StatusReporter() { + this.timeoutDuration = -1; + } + + public StatusReporter(long timeoutDurationInMs) { + this.timeoutDuration = timeoutDurationInMs; + } public void produced(T item, U index) { this.producedItems.put(item, index); @@ -44,7 +59,8 @@ public class StatusReporter<T, U> { U ack = null; U ret; do { - ret = completionIndex(getFirstElement(this.producedItems)); + Map.Entry<T, U> firstElement = getFirstElement(this.producedItems); + ret = completionIndex(firstElement); if (ret != null) { ack = ret; } @@ -63,13 +79,32 @@ public class StatusReporter<T, U> { private U completionIndex(Map.Entry<T, U> lookFor) { U ack = null; - if (lookFor == null || !processedSet.contains(lookFor.getKey())) { + if (lookFor == null) { return ack; } - ack = lookFor.getValue(); + if (hasTimeoutDurationReached(System.currentTimeMillis())) { + LOG.warn("Ack: Timeout: {} - {}", lookFor.getKey(), lookFor.getValue()); + return acknowledged(lookFor); + } + + if (!processedSet.contains(lookFor.getKey())) { + return ack; + } + + return acknowledged(lookFor); + } + + private U acknowledged(Map.Entry<T, U> lookFor) { + U ack = lookFor.getValue(); producedItems.remove(lookFor.getKey()); processedSet.remove(lookFor); return ack; } + + private boolean hasTimeoutDurationReached(long now) { + boolean b = (this.timeoutDuration > -1) && (this.lastAck != 0) && ((now - this.lastAck) >= timeoutDuration); + lastAck = System.currentTimeMillis(); + return b; + } } diff --git a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java index 3e50562..45bdbb0 100644 --- a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java +++ b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java @@ -23,6 +23,8 @@ import org.testng.annotations.Test; import java.util.concurrent.BlockingQueue; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; public class StatusReporterTest { private static class IntegerConsumer extends WorkItemConsumer<Integer> { @@ -91,4 +93,20 @@ public class StatusReporterTest { statusReporter.processed((Integer) result); } } + + @Test + public void reportWithTimeout() throws InterruptedException { + StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(2000); + statusReporter.produced(1, 100); + statusReporter.produced(2, 200); + + statusReporter.processed(2); + Integer ack = statusReporter.ack(); + assertNull(ack); + + Thread.sleep(3000); + ack = statusReporter.ack(); + assertNotNull(ack); + assertEquals(ack, Integer.valueOf(200)); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index c18c4ab..1d29bf8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -235,6 +235,10 @@ public class ImportService { result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp)); result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS); + if (isMigrationMode(result.getRequest())) { + return; + } + auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder()); } @@ -250,7 +254,7 @@ public class ImportService { private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { try { - if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) && + if (isMigrationMode(request) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) && request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT))) { LOG.info("ZipSource Format: ZipDirect: Size: {}", request.getOptions().get("size")); return getZipDirectEntityImportStream(request, inputStream); @@ -288,4 +292,8 @@ public class ImportService { exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) && exportRequest.getSkipLineageOptionValue(); } + + private boolean isMigrationMode(AtlasImportRequest request) { + return request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java index 54ae32a..6bb5f1e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java @@ -19,35 +19,61 @@ package org.apache.atlas.repository.impexp; import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.model.impexp.MigrationStatus; +import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.GraphDBMigrator; +import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; import javax.inject.Singleton; +import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME; + @AtlasService @Singleton public class MigrationProgressService { private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class); + private static final String FILE_EXTENSION_ZIP = ".zip"; public static final String MIGRATION_QUERY_CACHE_TTL = "atlas.migration.query.cache.ttlInSecs"; @VisibleForTesting - static long DEFAULT_CACHE_TTL_IN_SECS = 30 * 1000; // 30 secs + static long DEFAULT_CACHE_TTL_IN_SECS = 120 * 1000; // 30 secs private final long cacheValidity; private final GraphDBMigrator migrator; private MigrationStatus cachedStatus; private long cacheExpirationTime = 0; + private DataMigrationStatusService dataMigrationStatusService; + private boolean zipFileBasedMigrationImport; @Inject public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) { this.migrator = migrator; this.cacheValidity = (configuration != null) ? configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) : DEFAULT_CACHE_TTL_IN_SECS; + + this.zipFileBasedMigrationImport = isZipFileBasedMigrationEnabled(); + initConditionallyZipFileBasedMigrator(); + } + + private void initConditionallyZipFileBasedMigrator() { + if (!zipFileBasedMigrationImport) { + return; + } + + dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance()); + dataMigrationStatusService.init(getFileNameFromMigrationProperty()); + } + + private boolean isZipFileBasedMigrationEnabled() { + return StringUtils.endsWithIgnoreCase(getFileNameFromMigrationProperty(), FILE_EXTENSION_ZIP); } public MigrationStatus getStatus() { @@ -58,7 +84,11 @@ public class MigrationProgressService { long currentTime = System.currentTimeMillis(); if(resetCache(currentTime)) { - cachedStatus = migrator.getMigrationStatus(); + if (this.zipFileBasedMigrationImport) { + cachedStatus = dataMigrationStatusService.getStatus(); + } else { + cachedStatus = migrator.getMigrationStatus(); + } } return cachedStatus; @@ -73,4 +103,12 @@ public class MigrationProgressService { return ret; } + + public String getFileNameFromMigrationProperty() { + try { + return ApplicationProperties.get().getString(ATLAS_MIGRATION_MODE_FILENAME, StringUtils.EMPTY); + } catch (AtlasException e) { + return StringUtils.EMPTY; + } + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java new file mode 100644 index 0000000..8c7a3a8 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.migration; + +import org.apache.atlas.model.migration.MigrationImportStatus; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Iterator; + +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; +import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.encodePropertyKey; +import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX; + +public class DataMigrationStatusService { + private static final Logger LOG = LoggerFactory.getLogger(DataMigrationStatusService.class); + private final MigrationStatusVertexManagement migrationStatusVertexManagement; + + private MigrationImportStatus status; + + public DataMigrationStatusService() { + this.migrationStatusVertexManagement = new MigrationStatusVertexManagement(AtlasGraphProvider.getGraphInstance()); + } + + public DataMigrationStatusService(AtlasGraph atlasGraph) { + this.migrationStatusVertexManagement = new MigrationStatusVertexManagement(atlasGraph); + } + + + public void init(String fileToImport) { + this.status = new MigrationImportStatus(fileToImport); + if (!this.migrationStatusVertexManagement.exists(fileToImport)) { + return; + } + + getCreate(fileToImport); + } + + public MigrationImportStatus getCreate(String fileName) { + return getCreate(new MigrationImportStatus(fileName)); + } + + public MigrationImportStatus getCreate(MigrationImportStatus status) { + try { + this.status = this.migrationStatusVertexManagement.createOrUpdate(status); + } catch (Exception ex) { + LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getName(), ex); + } + + return this.status; + } + + public MigrationImportStatus getStatus() { + if (this.status != null && + StringUtils.isEmpty(this.status.getOperationStatus()) && + this.migrationStatusVertexManagement.exists(this.status.getName())) { + return getCreate(this.status); + } else { + return this.status; + } + } + + public MigrationImportStatus getByName(String name) { + return this.migrationStatusVertexManagement.findByName(name); + } + + public void delete() { + if (this.status == null) { + return; + } + + MigrationImportStatus status = getByName(this.status.getName()); + this.migrationStatusVertexManagement.delete(status.getName()); + this.status = null; + } + + public void savePosition(String position) { + this.status.setCurrentIndex(Long.valueOf(position)); + this.migrationStatusVertexManagement.updateVertexPartial(this.status); + } + + public void setStatus(String status) { + this.status.setOperationStatus(status); + this.migrationStatusVertexManagement.updateVertexPartial(this.status); + } + + private static class MigrationStatusVertexManagement { + public static final String PROPERTY_KEY_START_TIME = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.startTime"); + public static final String PROPERTY_KEY_SIZE = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.size"); + public static final String PROPERTY_KEY_POSITION = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.position"); + public static final String PROPERTY_KEY_STATUS = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.status"); + + private AtlasGraph atlasGraph; + private AtlasVertex vertex; + + public MigrationStatusVertexManagement(AtlasGraph atlasGraph) { + this.atlasGraph = atlasGraph; + } + + public MigrationImportStatus createOrUpdate(MigrationImportStatus status) { + this.vertex = findByNameInternal(status.getName()); + + if (this.vertex == null) { + this.vertex = atlasGraph.addVertex(); + LOG.info("MigrationStatusVertexManagement: Vertex created!"); + updateVertex(this.vertex, status); + } + + return to(this.vertex); + } + + public boolean exists(String name) { + return findByNameInternal(name) != null; + } + + public MigrationImportStatus findByName(String name) { + if (this.vertex != null) { + return to(this.vertex); + } + + AtlasVertex v = findByNameInternal(name); + if (v == null) { + return null; + } + + this.vertex = v; + LOG.info("MigrationImportStatus: Vertex found!"); + return to(v); + } + + public void delete(String name) { + try { + AtlasVertex vertex = findByNameInternal(name); + atlasGraph.removeVertex(vertex); + this.vertex = null; + } finally { + atlasGraph.commit(); + } + } + + private AtlasVertex findByNameInternal(String name) { + try { + String idxQueryString = String.format("%s\"%s\":\"%s\"", AtlasGraphUtilsV2.getIndexSearchPrefix(), Constants.GUID_PROPERTY_KEY, name); + AtlasIndexQuery idxQuery = atlasGraph.indexQuery(Constants.VERTEX_INDEX, idxQueryString); + Iterator<AtlasIndexQuery.Result<Object, Object>> results = idxQuery.vertices(); + + AtlasIndexQuery.Result<?, ?> qryResult = results.hasNext() ? results.next() : null; + if (qryResult != null) { + return qryResult.getVertex(); + } else { + return null; + } + } catch (Exception e) { + LOG.error("MigrationStatusVertexManagement.findByNameInternal: Failed!", e); + } finally { + atlasGraph.commit(); + } + + return null; + } + + public void updateVertexPartial(MigrationImportStatus status) { + try { + setEncodedProperty(vertex, PROPERTY_KEY_POSITION, status.getCurrentIndex()); + } catch (Exception e) { + LOG.warn("Error updating status. Please rely on log messages.", e); + } finally { + atlasGraph.commit(); + } + } + + private void updateVertex(AtlasVertex vertex, MigrationImportStatus status) { + try { + setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getName()); + + setEncodedProperty(vertex, PROPERTY_KEY_START_TIME, + (status.getStartTime() != null) + ? status.getStartTime().getTime() + : System.currentTimeMillis()); + + setEncodedProperty(vertex, PROPERTY_KEY_SIZE, status.getTotalCount()); + setEncodedProperty(vertex, PROPERTY_KEY_POSITION, status.getCurrentIndex()); + setEncodedProperty(vertex, PROPERTY_KEY_STATUS, status.getOperationStatus()); + } catch (Exception ex) { + LOG.error("Error updating MigrationImportStatus vertex. Status may not be persisted correctly.", ex); + } finally { + atlasGraph.commit(); + } + } + + private static MigrationImportStatus to(AtlasVertex vertex) { + MigrationImportStatus ret = new MigrationImportStatus(); + + try { + ret.setName(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class)); + + Long dateValue = getEncodedProperty(vertex, PROPERTY_KEY_START_TIME, Long.class); + if (dateValue != null) { + ret.setStartTime(new Date(dateValue)); + } + + Long size = getEncodedProperty(vertex, PROPERTY_KEY_SIZE, Long.class); + if (size != null) { + ret.setTotalCount(size); + } + + Long position = getEncodedProperty(vertex, PROPERTY_KEY_POSITION, Long.class); + if (position != null) { + ret.setCurrentIndex(position); + } + + ret.setOperationStatus(getEncodedProperty(vertex, PROPERTY_KEY_STATUS, String.class)); + } catch (Exception ex) { + LOG.error("Error converting to MigrationImportStatus. Will proceed with default values.", ex); + } + + return ret; + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java index 35a76ea..f44f2a8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java @@ -23,6 +23,8 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.migration.MigrationImportStatus; +import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.type.AtlasType; import org.apache.commons.lang.StringUtils; @@ -41,7 +43,7 @@ public class ZipFileMigrationImporter implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class); private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers"; - private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size"; + private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size"; private static final String DEFAULT_NUMBER_OF_WORKERS = "4"; private static final String DEFAULT_BATCH_SIZE = "100"; private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount"; @@ -51,20 +53,24 @@ public class ZipFileMigrationImporter implements Runnable { private final ImportService importService; private final String fileToImport; + private DataMigrationStatusService dataMigrationStatusService; + private MigrationImportStatus migrationImportStatus; public ZipFileMigrationImporter(ImportService importService, String fileName) { this.importService = importService; this.fileToImport = fileName; + this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance()); } @Override public void run() { try { - FileWatcher fileWatcher = new FileWatcher(fileToImport); - fileWatcher.start(); + detectFileToImport(); int streamSize = getStreamSizeFromComment(fileToImport); - performImport(new FileInputStream(new File(fileToImport)), streamSize); + migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize); + performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex())); + dataMigrationStatusService.setStatus("DONE"); } catch (IOException e) { LOG.error("Migration Import: IO Error!", e); } catch (AtlasBaseException e) { @@ -72,6 +78,22 @@ public class ZipFileMigrationImporter implements Runnable { } } + private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) { + MigrationImportStatus status = new MigrationImportStatus(fileName); + status.setTotalCount(streamSize); + + MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status); + + LOG.info("DataMigrationStatusService: Position: {}", statusRetrieved.getCurrentIndex()); + dataMigrationStatusService.setStatus("STARTED"); + return statusRetrieved; + } + + private void detectFileToImport() throws IOException { + FileWatcher fileWatcher = new FileWatcher(fileToImport); + fileWatcher.start(); + } + private int getStreamSizeFromComment(String fileToImport) { int ret = 1; try { @@ -99,13 +121,13 @@ public class ZipFileMigrationImporter implements Runnable { return entitiesCount; } - private void performImport(InputStream fs, int streamSize) throws AtlasBaseException { + private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException { try { - LOG.info("Migration Import: {}: Starting...", fileToImport); - + LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition); + InputStream fs = new FileInputStream(new File(fileToImport)); RequestContext.get().setUser(getUserNameFromEnvironment(), null); - importService.run(fs, getImportRequest(streamSize), + importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition), getUserNameFromEnvironment(), InetAddress.getLocalHost().getHostName(), InetAddress.getLocalHost().getHostAddress()); @@ -122,16 +144,19 @@ public class ZipFileMigrationImporter implements Runnable { return System.getProperty(ENV_USER_NAME); } - private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException { + private AtlasImportRequest getImportRequest(String fileToImport, int streamSize, String position) throws AtlasException { AtlasImportRequest request = new AtlasImportRequest(); + request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME, fileToImport); request.setSizeOption(streamSize); request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true"); request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS)); request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE)); + request.setOption(AtlasImportRequest.START_POSITION_KEY, (StringUtils.isEmpty(position) ? "0" : position)); return request; } + private String getPropertyValue(String property, String defaultValue) throws AtlasException { return ApplicationProperties.get().getString(property, defaultValue); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java index 4c912fd..ff55e40 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -20,12 +20,14 @@ package org.apache.atlas.repository.store.graph.v2.bulkimport; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.converters.AtlasFormatConverters; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate; @@ -65,10 +67,12 @@ public class MigrationImport extends ImportStrategy { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request"); } - int index = 0; + DataMigrationStatusService dataMigrationStatusService = createMigrationStatusService(importResult); + + long index = 0; int streamSize = entityStream.size(); EntityMutationResponse ret = new EntityMutationResponse(); - EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult); + EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, dataMigrationStatusService); try { LOG.info("Migration Import: Size: {}: Starting...", streamSize); @@ -85,14 +89,23 @@ public class MigrationImport extends ImportStrategy { return ret; } - private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult) { + private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) { + DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService(); + dataMigrationStatusService.init(importResult.getRequest().getOptions().get(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME)); + return dataMigrationStatusService; + } + + private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, + AtlasImportResult importResult, + DataMigrationStatusService dataMigrationStatusService) { + atlasGraph = threadedAtlasGraph; int batchSize = importResult.getRequest().getOptionKeyBatchSize(); int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers()); EntityConsumerBuilder consumerBuilder = new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize); - return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult); + return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService); } private static int getNumWorkers(int numWorkersFromOptions) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java index e8f4b02..d0fac10 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java @@ -155,7 +155,10 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount); pause(retryCount); - LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex); + String exceptionClass = ex.getClass().getSimpleName(); + if (!exceptionClass.equals("JanusGraphException") && !exceptionClass.equals("PermanentLockingException")) { + LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex); + } retryProcessEntity(retryCount); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java index 16bb49e..177b563 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java @@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.pc.StatusReporter; import org.apache.atlas.pc.WorkItemBuilder; import org.apache.atlas.pc.WorkItemManager; +import org.apache.atlas.repository.migration.DataMigrationStatusService; import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.commons.lang.StringUtils; @@ -32,24 +33,28 @@ import org.slf4j.LoggerFactory; public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager { private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class); private static final String WORKER_PREFIX = "migration-import"; + private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000; // 5 min private final StatusReporter<String, String> statusReporter; private final AtlasImportResult importResult; + private final DataMigrationStatusService dataMigrationStatusService; private String currentTypeName; private float currentPercent; private EntityImportStream entityImportStream; - public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult) { + public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, DataMigrationStatusService dataMigrationStatusService) { super(builder, WORKER_PREFIX, batchSize, numWorkers, true); this.importResult = importResult; + this.dataMigrationStatusService = dataMigrationStatusService; - this.statusReporter = new StatusReporter<>(); + this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION); } - public int read(EntityImportStream entityStream) { - int currentIndex = 0; + public long read(EntityImportStream entityStream) { + long currentIndex = this.dataMigrationStatusService.getStatus().getCurrentIndex(); AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; this.entityImportStream = entityStream; + this.dataMigrationStatusService.setStatus("IN_PROGRESS"); while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) { AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; if (entity == null) { @@ -66,7 +71,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag return currentIndex; } - private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + private void produce(long currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { String previousTypeName = getCurrentTypeName(); if (StringUtils.isNotEmpty(typeName) @@ -104,7 +109,9 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag } importResult.incrementMeticsCounter(split[0]); - this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), this.entityImportStream.size(), getCurrentPercent()); + String currentPosition = split[1]; + dataMigrationStatusService.savePosition(currentPosition); + this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), this.entityImportStream.size(), getCurrentPercent()); } private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) { diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java new file mode 100644 index 0000000..bf1d9a0 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import com.google.inject.Inject; +import org.apache.atlas.TestModules; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.migration.MigrationImportStatus; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.migration.DataMigrationStatusService; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.util.Date; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class DataMigrationStatusServiceTest { + @Inject + AtlasGraph atlasGraph; + + @Test + public void createUpdateDelete() throws AtlasBaseException { + DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService(atlasGraph); + + MigrationImportStatus expected = new MigrationImportStatus("/tmp/defg.zip"); + expected.setTotalCount(3333); + expected.setCurrentIndex(20); + expected.setStartTime(new Date()); + + MigrationImportStatus ret = dataMigrationStatusService.getCreate(expected); + + assertNotNull(ret); + assertEquals(ret.getName(), expected.getName()); + assertEquals(ret.getStartTime(), expected.getStartTime()); + assertEquals(ret.getTotalCount(), expected.getTotalCount()); + assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex()); + + dataMigrationStatusService.savePosition("100"); + assertNotNull(dataMigrationStatusService.getStatus()); + assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100"); + assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(), "100"); + + dataMigrationStatusService.delete(); + assertNull(dataMigrationStatusService.getStatus()); + assertNull(dataMigrationStatusService.getByName(ret.getName())); + } +}
