ATLAS-1948: export fix to correct the import order Signed-off-by: Madhan Neethiraj <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/24a106b4 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/24a106b4 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/24a106b4 Branch: refs/heads/feature-odf Commit: 24a106b4e1a1581b266a453cc6c920bcd06d7a9b Parents: cfb6b84 Author: ashutoshm <[email protected]> Authored: Tue Jul 18 17:08:34 2017 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Wed Jul 19 00:50:14 2017 -0700 ---------------------------------------------------------------------- .../atlas/repository/impexp/ExportService.java | 22 ++-- .../atlas/repository/impexp/ZipSource.java | 2 +- .../store/graph/v1/AtlasEntityStoreV1.java | 119 ++++++++++++++----- .../repository/impexp/ImportServiceTest.java | 24 +++- repository/src/test/resources/ctas.zip | Bin 0 -> 7674 bytes 5 files changed, 124 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 8f45e9f..de48573 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -98,7 +98,6 @@ public class ExportService { AtlasExportResult.OperationStatus[] statuses = processItems(request, context); processTypesDef(context); - updateSinkWithOperationMetrics(context, statuses, getOperationDuration(startTime)); } catch(Exception ex) { LOG.error("Operation failed: ", ex); @@ -113,6 +112,7 @@ public class ExportService { } private void updateSinkWithOperationMetrics(ExportContext context, AtlasExportResult.OperationStatus[] statuses, int duration) throws AtlasBaseException { + context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setTypesDef(context.result.getData().getTypesDef()); clearContextData(context); @@ -201,9 +201,10 @@ public class ExportService { processEntity(guid, context); } - if (!context.guidsLineageToProcess.isEmpty()) { - context.guidsToProcess.addAll(context.guidsLineageToProcess); - context.guidsLineageToProcess.clear(); + if (!context.lineageToProcess.isEmpty()) { + context.guidsToProcess.addAll(context.lineageToProcess); + context.lineageProcessed.addAll(context.lineageToProcess.getList()); + context.lineageToProcess.clear(); } } } catch (AtlasBaseException excp) { @@ -295,7 +296,9 @@ public class ExportService { TraversalDirection direction = context.guidDirection.get(guid); AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); - context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); + if(!context.lineageProcessed.contains(guid)) { + context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); + } addEntity(entityWithExtInfo, context); addTypes(entityWithExtInfo.getEntity(), context); @@ -651,13 +654,18 @@ public class ExportService { list.clear(); set.clear(); } + + public List<T> getList() { + return list; + } } private class ExportContext { final Set<String> guidsProcessed = new HashSet<>(); final UniqueList<String> guidsToProcess = new UniqueList<>(); - final UniqueList<String> guidsLineageToProcess = new UniqueList<>(); + final UniqueList<String> lineageToProcess = new UniqueList<>(); + final Set<String> lineageProcessed = new HashSet<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>(); final Set<String> entityTypes = new HashSet<>(); final Set<String> classificationTypes = new HashSet<>(); @@ -719,7 +727,7 @@ public class ExportService { } if(isSuperTypeProcess) { - guidsLineageToProcess.add(guid); + lineageToProcess.add(guid); } guidDirection.put(guid, direction); http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java index edb816f..4c23582 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java @@ -196,7 +196,7 @@ public class ZipSource implements EntityImportStream { AtlasEntity entity = getEntity(guid); return entity; } catch (AtlasBaseException e) { - e.printStackTrace(); + LOG.error("getByGuid: {} failed!", guid, e); return null; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 5ea4ff2..f340330 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -69,7 +69,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { private final DeleteHandlerV1 deleteHandler; private final AtlasTypeRegistry typeRegistry; private final AtlasEntityChangeNotifier entityChangeNotifier; - private final EntityGraphMapper entityGraphMapper; + private final EntityGraphMapper entityGraphMapper; @Inject public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, @@ -77,7 +77,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { this.deleteHandler = deleteHandler; this.typeRegistry = typeRegistry; this.entityChangeNotifier = entityChangeNotifier; - this.entityGraphMapper = entityGraphMapper; + this.entityGraphMapper = entityGraphMapper; } @Override @@ -123,7 +123,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { @Override @GraphTransaction public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) - throws AtlasBaseException { + throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes); } @@ -136,7 +136,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { if (ret == null) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(), - uniqAttributes.toString()); + uniqAttributes.toString()); } if (LOG.isDebugEnabled()) { @@ -160,29 +160,36 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { EntityMutationResponse ret = new EntityMutationResponse(); ret.setGuidAssignments(new HashMap<String, String>()); - Set<String> processedGuids = new HashSet<>(); - int streamSize = entityStream.size(); - float currentPercent = 0f; + Set<String> processedGuids = new HashSet<>(); + float currentPercent = 0f; - while (entityStream.hasNext()) { - AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo(); + List<String> residualList = new ArrayList<>(); + EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList); + while (entityImportStreamWithResidualList.hasNext()) { + AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo(); AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - if(entity == null || processedGuids.contains(entity.getGuid())) { + if (entity == null || processedGuids.contains(entity.getGuid())) { continue; } AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream); + try { + EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); - EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); - currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, - entityStream.getPosition(), streamSize, currentPercent); + if (resp.getGuidAssignments() != null) { + ret.getGuidAssignments().putAll(resp.getGuidAssignments()); + } - if (resp.getGuidAssignments() != null) { - ret.getGuidAssignments().putAll(resp.getGuidAssignments()); - } + currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(), + entityImportStreamWithResidualList.getStreamSize(), currentPercent); - entityStream.onImportComplete(entity.getGuid()); + entityStream.onImportComplete(entity.getGuid()); + } catch (AtlasBaseException e) { + if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) { + throw e; + } + } } importResult.getProcessedEntities().addAll(processedGuids); @@ -191,20 +198,28 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { return ret; } + private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) { + if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) { + return false; + } + + lineageList.add(guid); + return true; + } + private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity, EntityMutationResponse resp, AtlasImportResult importResult, Set<String> processedGuids, int currentIndex, int streamSize, float currentPercent) { - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", - currentEntity.getEntity().getTypeName(), - currentIndex, - currentEntity.getEntity().getGuid()); + currentEntity.getEntity().getTypeName(), + currentIndex, + currentEntity.getEntity().getGuid()); return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported); } @@ -214,10 +229,10 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { final double tolerance = 0.000001; final int MAX_PERCENT = 100; - float percent = (float) ((currentIndex * MAX_PERCENT)/streamSize); + float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize); boolean updateLog = Double.compare(percent, currentPercent) > tolerance; float updatedPercent = (MAX_PERCENT < streamSize) ? percent : - ((updateLog) ? ++currentPercent : currentPercent); + ((updateLog) ? ++currentPercent : currentPercent); if (updateLog) { log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo); @@ -232,7 +247,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { } for (AtlasEntityHeader h : list) { - if(processedGuids.contains(h.getGuid())) { + if (processedGuids.contains(h.getGuid())) { continue; } @@ -298,7 +313,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { @Override @GraphTransaction public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName, Object attrValue) - throws AtlasBaseException { + throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue); } @@ -490,8 +505,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { List<AtlasClassification> updatedClassifications = new ArrayList<>(); for (AtlasClassification newClassification : newClassifications) { - String classificationName = newClassification.getTypeName(); - AtlasClassification oldClassification = getClassification(guid, classificationName); + String classificationName = newClassification.getTypeName(); + AtlasClassification oldClassification = getClassification(guid, classificationName); if (oldClassification == null) { throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName); @@ -704,7 +719,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { /** * Validate if classification is not already associated with the entities - * @param guid unique entity id + * + * @param guid unique entity id * @param classifications list of classifications to be associated */ private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { @@ -715,7 +731,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid + - ", already associated with classification: " + newClassification); + ", already associated with classification: " + newClassification); } } } @@ -734,4 +750,43 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { return ret; } + + private static class EntityImportStreamWithResidualList { + private final EntityImportStream stream; + private final List<String> residualList; + private boolean navigateResidualList; + private int currentResidualListIndex; + + + public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) { + this.stream = stream; + this.residualList = residualList; + this.navigateResidualList = false; + this.currentResidualListIndex = 0; + } + + public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + if (navigateResidualList == false) { + return stream.getNextEntityWithExtInfo(); + } else { + stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++)); + return stream.getNextEntityWithExtInfo(); + } + } + + public boolean hasNext() { + if (!navigateResidualList) { + boolean streamHasNext = stream.hasNext(); + navigateResidualList = (streamHasNext == false); + return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size()); + } else { + return (currentResidualListIndex < residualList.size()); + } + } + + public int getStreamSize() { + return stream.size() + residualList.size(); + } + } + } http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index de8e7ef..404225c 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -133,4 +133,22 @@ public class ImportServiceTest { assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName)); assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8); } + + @DataProvider(name = "ctas") + public static Object[][] getDataFromCtas(ITestContext context) throws IOException { + return getZipSource("ctas.zip"); + } + + @Test(dataProvider = "ctas") + public void importCTAS(ZipSource zipSource) throws IOException, AtlasBaseException { + loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry); + loadModelFromJson("0030-hive_model.json", typeDefStore, typeRegistry); + + AtlasImportRequest request = getDefaultImportRequest(); + runImportWithParameters(getImportService(), getDefaultImportRequest(), zipSource); + } + + private ImportService getImportService() { + return new ImportService(typeDefStore, entityStore, typeRegistry); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/24a106b4/repository/src/test/resources/ctas.zip ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/ctas.zip b/repository/src/test/resources/ctas.zip new file mode 100644 index 0000000..a77966c Binary files /dev/null and b/repository/src/test/resources/ctas.zip differ
