Repository: atlas Updated Branches: refs/heads/branch-0.8 fb30afba4 -> 325859ab8
ATLAS-2818: Entity tagging after import. Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/325859ab Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/325859ab Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/325859ab Branch: refs/heads/branch-0.8 Commit: 325859ab8d7023e87f65d148e421e8d8d8e5cfc0 Parents: fb30afb Author: Ashutosh Mestry <[email protected]> Authored: Fri Aug 24 13:19:08 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Sun Aug 26 18:08:19 2018 -0700 ---------------------------------------------------------------------- .../atlas/repository/impexp/AuditsWriter.java | 77 +++++++------ .../atlas/repository/impexp/ClusterService.java | 9 -- .../atlas/repository/impexp/ImportService.java | 32 +++--- .../repository/impexp/ImportTransformer.java | 44 ++++++++ .../impexp/ImportTransformsShaper.java | 95 ++++++++++++++++ .../store/graph/v1/BulkImporterImpl.java | 9 +- .../repository/impexp/ImportServiceTest.java | 2 +- .../impexp/ImportTransformsShaperTest.java | 108 +++++++++++++++++++ .../repository/impexp/ImportTransformsTest.java | 32 ++++++ ...AtlasEntityStoreV1BulkImportPercentTest.java | 10 ++ 10 files changed, 351 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index 467d383..ced00b9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -28,18 +28,15 @@ import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.ExportImportAuditEntry; -import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.repository.Constants; import org.apache.atlas.type.AtlasType; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.inject.Inject; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -101,6 +98,11 @@ public class AuditsWriter { private AtlasCluster saveCluster(String clusterName, String entityGuid, long lastModifiedTimestamp) throws AtlasBaseException { AtlasCluster cluster = new AtlasCluster(clusterName, clusterName); cluster.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp); + + if (LOG.isDebugEnabled()) { + LOG.debug("saveCluster: {}", cluster); + } + return clusterService.save(cluster); } @@ -116,68 +118,77 @@ public class AuditsWriter { private class ExportAudits { private AtlasExportRequest request; - private AtlasCluster cluster; private String targetClusterName; private String optionKeyReplicatedTo; private boolean replicationOptionState; - public void add(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException { + public void add(String userName, AtlasExportResult result, + long startTime, long endTime, + List<String> entityGuids) throws AtlasBaseException { optionKeyReplicatedTo = AtlasExportRequest.OPTION_KEY_REPLICATED_TO; request = result.getRequest(); replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo); - targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); - cluster = saveCluster(getCurrentClusterName()); + saveClusters(); auditService.add(userName, getCurrentClusterName(), targetClusterName, ExportImportAuditEntry.OPERATION_EXPORT, - AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); - - updateReplicationAttributeForExport(request, entitityGuids); - } + AtlasType.toJson(result), startTime, endTime, !entityGuids.isEmpty()); - private void updateReplicationAttributeForExport(AtlasExportRequest request, List<String> entityGuids) throws AtlasBaseException { - if(!replicationOptionState) { + if (result.getOperationStatus() == AtlasExportResult.OperationStatus.FAIL) { return; } updateReplicationAttribute(replicationOptionState, targetClusterName, - entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, 0L); + entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, result.getLastModifiedTimestamp()); + } + + private void saveClusters() throws AtlasBaseException { + saveCluster(getCurrentClusterName()); + + targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); + if(StringUtils.isNotEmpty(targetClusterName)) { + saveCluster(targetClusterName); + } } } private class ImportAudits { private AtlasImportRequest request; private boolean replicationOptionState; - private AtlasCluster cluster; + private String sourceClusterName; + private AtlasCluster sourceCluster; private String optionKeyReplicatedFrom; - private AtlasImportResult result; - public void add(String userName, AtlasImportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException { - this.result = result; - request = result.getRequest(); + public void add(String userName, AtlasImportResult result, + long startTime, long endTime, + List<String> entityGuids) throws AtlasBaseException { optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM; + request = result.getRequest(); replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom); - String sourceCluster = getClusterNameFromOptionsState(); - cluster = saveCluster(sourceCluster); + saveClusters(); auditService.add(userName, - sourceCluster, getCurrentClusterName(), - ExportImportAuditEntry.OPERATION_IMPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); + sourceClusterName, getCurrentClusterName(), + ExportImportAuditEntry.OPERATION_IMPORT, + AtlasType.toJson(result), startTime, endTime, !entityGuids.isEmpty()); - updateReplicationAttributeForImport(entitityGuids); - } + if(result.getOperationStatus() == AtlasImportResult.OperationStatus.FAIL) { + return; + } - private void updateReplicationAttributeForImport(List<String> entityGuids) throws AtlasBaseException { - if(!replicationOptionState) return; + updateReplicationAttribute(replicationOptionState, this.sourceClusterName, entityGuids, + Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER, result.getExportResult().getLastModifiedTimestamp()); + } - String targetClusterName = cluster.getName(); + private void saveClusters() throws AtlasBaseException { + saveCluster(getCurrentClusterName()); - updateReplicationAttribute(replicationOptionState, targetClusterName, - entityGuids, - Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER, - result.getExportResult().getLastModifiedTimestamp()); + sourceClusterName = getClusterNameFromOptionsState(); + if(StringUtils.isNotEmpty(sourceClusterName)) { + this.sourceCluster = saveCluster(sourceClusterName); + } } private String getClusterNameFromOptionsState() { http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java index 950850e..4462f2c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java @@ -19,26 +19,19 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.AtlasException; -import org.apache.atlas.RequestContextV1; import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; -import org.apache.atlas.repository.store.graph.v1.AtlasEntityStreamForImport; -import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.repository.store.graph.v1.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasStructType; -import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -48,8 +41,6 @@ import javax.inject.Inject; import java.util.ArrayList; import java.util.List; -import static org.apache.atlas.repository.Constants.ATTR_NAME_REFERENCEABLE; - @AtlasService public class ClusterService { private static final Logger LOG = LoggerFactory.getLogger(ClusterService.class); http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 8a184fa..095f60f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -48,17 +48,21 @@ public class ImportService { private final AtlasTypeDefStore typeDefStore; private final AtlasTypeRegistry typeRegistry; private final BulkImporter bulkImporter; - private AuditsWriter auditsWriter; + private final AuditsWriter auditsWriter; + private final ImportTransformsShaper importTransformsShaper; private long startTimestamp; private long endTimestamp; @Inject - public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter, AuditsWriter auditsWriter) { + public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter, + AuditsWriter auditsWriter, + ImportTransformsShaper importTransformsShaper) { this.typeDefStore = typeDefStore; this.typeRegistry = typeRegistry; this.bulkImporter = bulkImporter; this.auditsWriter = auditsWriter; + this.importTransformsShaper = importTransformsShaper; } public AtlasImportResult run(ZipSource source, String userName, @@ -76,7 +80,7 @@ public class ImportService { AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis()); try { - LOG.info("==> import(user={}, from={})", userName, requestingIP); + LOG.info("==> import(user={}, from={}, request={})", userName, requestingIP, request); String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null; @@ -85,8 +89,6 @@ public class ImportService { processTypes(source.getTypesDef(), result); setStartPosition(request, source); processEntities(userName, source, result); - - result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS); } catch (AtlasBaseException excp) { LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); @@ -110,12 +112,13 @@ public class ImportService { return; } - updateTransformsWithSubTypes(importTransform); - source.setImportTransform(importTransform); + importTransformsShaper.shape(importTransform); + source.setImportTransform(importTransform); if(LOG.isDebugEnabled()) { debugLog(" => transforms: {}", AtlasType.toJson(importTransform)); } + } private void debugLog(String s, Object... params) { @@ -124,19 +127,6 @@ public class ImportService { LOG.debug(s, params); } - private void updateTransformsWithSubTypes(ImportTransforms importTransforms) throws AtlasBaseException { - String[] transformTypes = importTransforms.getTypes().toArray(new String[importTransforms.getTypes().size()]); - for (int i = 0; i < transformTypes.length; i++) { - String typeName = transformTypes[i]; - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); - if (entityType == null) { - continue; - } - - importTransforms.addParentTransformsToSubTypes(typeName, entityType.getAllSubTypes()); - } - } - private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException { if (request.getStartGuid() != null) { source.setPositionUsingEntityGuid(request.getStartGuid()); @@ -201,6 +191,8 @@ public class ImportService { endTimestamp = System.currentTimeMillis(); result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp)); result.setExportResult(importSource.getExportResult()); + + result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS); auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder()); } http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java index dc71c2a..213539d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java @@ -35,6 +35,7 @@ public abstract class ImportTransformer { private static final String TRANSFORMER_NAME_LOWERCASE = "lowercase"; private static final String TRANSFORMER_NAME_UPPERCASE = "uppercase"; private static final String TRANSFORMER_NAME_REMOVE_CLASSIFICATION = "removeClassification"; + private static final String TRANSFORMER_NAME_ADD_CLASSIFICATION = "addClassification"; private static final String TRANSFORMER_NAME_REPLACE = "replace"; private static final String TRANSFORMER_SET_DELETED = "setDeleted"; @@ -68,6 +69,9 @@ public abstract class ImportTransformer { ret = new ClearAttributes(name); } else if (key.equals(TRANSFORMER_SET_DELETED)) { ret = new SetDeleted(); + } else if (key.equals(TRANSFORMER_NAME_ADD_CLASSIFICATION)) { + String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length); + ret = new AddClassification(name); } else { throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec); } @@ -145,6 +149,46 @@ public abstract class ImportTransformer { } } + static class AddClassification extends ImportTransformer { + private final String classificationName; + + public AddClassification(String name) { + super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION); + + this.classificationName = name; + } + + @Override + public Object apply(Object o) { + if (!(o instanceof AtlasEntity)) { + return o; + } + + AtlasEntity entity = (AtlasEntity) o; + if(entity.getClassifications() == null) { + entity.setClassifications(new ArrayList<AtlasClassification>()); + } + + for (AtlasClassification c : entity.getClassifications()) { + if (c.getTypeName().equals(classificationName)) { + return entity; + } + } + + entity.getClassifications().add(new AtlasClassification(classificationName)); + return entity; + } + + @Override + public String toString() { + return String.format("%s=%s", "AddClassification", classificationName); + } + + public String getClassificationName() { + return classificationName; + } + } + static class RemoveClassification extends ImportTransformer { private final String classificationToBeRemoved; http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java new file mode 100644 index 0000000..62eba45 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@Component +public class ImportTransformsShaper { + private static final Logger LOG = LoggerFactory.getLogger(ImportTransformsShaper.class); + + private final AtlasTypeRegistry typeRegistry; + private final AtlasTypeDefStore typeDefStore; + + @Inject + public ImportTransformsShaper(AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore) { + this.typeRegistry = typeRegistry; + this.typeDefStore = typeDefStore; + } + + public void shape(ImportTransforms importTransform) throws AtlasBaseException { + getCreateClassifications(importTransform); + updateTransformsWithSubTypes(importTransform); + } + + private void getCreateClassifications(ImportTransforms importTransform) throws AtlasBaseException { + Map<String, Map<String, List<ImportTransformer>>> mapMapList = importTransform.getTransforms(); + for (Map<String, List<ImportTransformer>> mapList : mapMapList.values()) { + for (List<ImportTransformer> list : mapList.values()) { + for (ImportTransformer importTransformer : list) { + if((importTransformer instanceof ImportTransformer.AddClassification)) { + + ImportTransformer.AddClassification addClassification = (ImportTransformer.AddClassification) importTransformer; + getCreateTag(addClassification.getClassificationName()); + } + } + } + } + } + + private void updateTransformsWithSubTypes(ImportTransforms importTransforms) { + String[] transformTypes = importTransforms.getTypes().toArray(new String[importTransforms.getTypes().size()]); + for (int i = 0; i < transformTypes.length; i++) { + String typeName = transformTypes[i]; + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + if (entityType == null) { + continue; + } + + importTransforms.addParentTransformsToSubTypes(typeName, entityType.getAllSubTypes()); + } + } + + private String getCreateTag(String classificationName) throws AtlasBaseException { + AtlasClassificationDef classificationDef = typeRegistry.getClassificationDefByName(classificationName); + if(classificationDef != null) { + return classificationName; + } + + classificationDef = new AtlasClassificationDef(classificationName); + AtlasTypesDef typesDef = new AtlasTypesDef(); + typesDef.setClassificationDefs(Collections.singletonList(classificationDef)); + typeDefStore.createTypesDef(typesDef); + LOG.info("created classification: {}", classificationName); + return classificationName; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java index 467ced7..62b393a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java @@ -129,7 +129,7 @@ public class BulkImporterImpl implements BulkImporter { String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); - return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported); + return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); } @VisibleForTesting @@ -137,12 +137,13 @@ public class BulkImporterImpl implements BulkImporter { final double tolerance = 0.000001; final int MAX_PERCENT = 100; - float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize); + int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex; + float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize); boolean updateLog = Double.compare(percent, currentPercent) > tolerance; - float updatedPercent = (MAX_PERCENT < streamSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); + float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); if (updateLog) { - log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo); + log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), maxSize, additionalInfo); } return updatedPercent; http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index 08bbcd2..b556fe9 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -204,7 +204,7 @@ public class ImportServiceTest extends ExportImportTestBase { @Test public void importServiceProcessesIOException() { - ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null); + ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null,null); AtlasImportRequest req = mock(AtlasImportRequest.class); Answer<Map> answer = new Answer<Map>() { http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java new file mode 100644 index 0000000..f894553 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.TestModules; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.List; + +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertNotNull; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class ImportTransformsShaperTest extends ExportImportTestBase { + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private ImportService importService; + + @Inject + private AtlasEntityStore entityStore; + + private final String TAG_NAME = "REPLICATED"; + + @BeforeClass + public void setup() throws IOException, AtlasBaseException { + basicSetup(typeDefStore, typeRegistry); + loadFsModel(typeDefStore, typeRegistry); + } + + @Test + public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException { + AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService, + getImporRequest(), + ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip")); + + AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME); + assertNotNull(classification); + assertEntities(result.getProcessedEntities(), TAG_NAME); + } + private void assertEntities(List<String> entityGuids, String tagName) throws AtlasBaseException { + for (String guid : entityGuids) { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(guid); + + assertNotNull(entityWithExtInfo); + assertTag(entityWithExtInfo, tagName); + } + } + + private void assertTag(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, String tagName) { + if(entityWithExtInfo.getReferredEntities() == null || entityWithExtInfo.getReferredEntities().size() == 0) { + return; + } + + for (AtlasEntity entity : entityWithExtInfo.getReferredEntities().values()) { + assertTag(entity, tagName); + } + } + + private void assertTag(AtlasEntity entity, String tagName) { + + assertTrue(entity.getClassifications().size() > 0, + String.format("%s not tagged", entity.getTypeName())); + assertEquals(entity.getClassifications().get(0).getTypeName(), tagName); + } + + + private AtlasImportRequest getImporRequest() { + AtlasImportRequest request = new AtlasImportRequest(); + request.getOptions().put("transforms", "{ \"Referenceable\": { \"*\":[ \"addClassification:REPLICATED\" ] } }"); + return request; + } + +} http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java index b241dda..cd623d0 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java @@ -46,6 +46,8 @@ public class ImportTransformsTest { private final String jsonSingleClearAttrValue = "{ \"hive_table\": { \"*\":[ \"clearAttrValue:replicatedToCluster\", \"clearAttrValue:replicatedFromCluster\" ] } }"; private final String jsonMultipleClearAttrValue = "{ \"hive_table\": { \"*\":[ \"clearAttrValue:replicatedToCluster,replicatedFromCluster\" ] } }"; private final String jsonSetDeleted = "{ \"hive_table\": { \"*\":[ \"setDeleted\" ] } }"; + private final String jsonAddClasification = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED\" ] } }"; + private final String jsonAddClasification2 = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED_2\" ] } }"; private ImportTransforms transform; private String HIVE_TABLE_ATTR_SYNC_INFO = "hive_table.syncInfo"; @@ -192,6 +194,36 @@ public class ImportTransformsTest { assertEquals(entity.getStatus(), AtlasEntity.Status.DELETED); } + @Test + public void addClassification_AddsClassificationToEntitiy() throws AtlasBaseException { + AtlasEntity entity = getHiveTableAtlasEntity(); + int existingClassificationsCount = entity.getClassifications() != null ? entity.getClassifications().size() : 0; + ImportTransforms t = ImportTransforms.fromJson(jsonAddClasification); + + assertTrue(t.getTransforms().size() > 0); + + t.apply(entity); + + assertNotNull(t); + assertEquals(entity.getClassifications().size(), existingClassificationsCount + 1); + addClassification_ExistingClassificationsAreHandled(entity); + addClassification_MultipleClassificationsAreAdded(entity); + } + + private void addClassification_ExistingClassificationsAreHandled(AtlasEntity entity) throws AtlasBaseException { + int existingClassificationsCount = entity.getClassifications() != null ? entity.getClassifications().size() : 0; + assertTrue(existingClassificationsCount > 0); + ImportTransforms.fromJson(jsonAddClasification).apply(entity); + + assertEquals(entity.getClassifications().size(), existingClassificationsCount); + } + + private void addClassification_MultipleClassificationsAreAdded(AtlasEntity entity) throws AtlasBaseException { + int existingClassificationsCount = entity.getClassifications().size(); + ImportTransforms.fromJson(jsonAddClasification2).apply(entity); + + assertEquals(entity.getClassifications().size(), existingClassificationsCount + 1); + } private String[] getExtEntityExpectedValues(AtlasEntityWithExtInfo entityWithExtInfo) { String[] ret = new String[entityWithExtInfo.getReferredEntities().size()]; http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java index 73dfe37..656ab10 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java @@ -35,6 +35,7 @@ import static org.testng.Assert.assertTrue; public class AtlasEntityStoreV1BulkImportPercentTest { private final int MAX_PERCENT = 100; + private final float MAX_PERCENT_FLOAT = 100.0F; private List<Integer> percentHolder; private Logger log; @@ -143,6 +144,15 @@ public class AtlasEntityStoreV1BulkImportPercentTest { assertEqualsForPercentHolder(expected); } + @Test + public void exceedingInitialStreamSize_KeepsPercentAt100() throws Exception { + runWithSize(4); + double[] expected = fillPercentHolderWith100(); + float f = BulkImporterImpl.updateImportProgress(log, 5, 4, 100, "additional info"); + + assertTrue((f - MAX_PERCENT_FLOAT) <= 0.0001); + } + private void runWithSize(int streamSize) throws Exception { float currentPercent = 0; setupPercentHolder(streamSize);
