Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 fb30afba4 -> 325859ab8


ATLAS-2818: Entity tagging after import.


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/325859ab
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/325859ab
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/325859ab

Branch: refs/heads/branch-0.8
Commit: 325859ab8d7023e87f65d148e421e8d8d8e5cfc0
Parents: fb30afb
Author: Ashutosh Mestry <[email protected]>
Authored: Fri Aug 24 13:19:08 2018 -0700
Committer: Ashutosh Mestry <[email protected]>
Committed: Sun Aug 26 18:08:19 2018 -0700

----------------------------------------------------------------------
 .../atlas/repository/impexp/AuditsWriter.java   |  77 +++++++------
 .../atlas/repository/impexp/ClusterService.java |   9 --
 .../atlas/repository/impexp/ImportService.java  |  32 +++---
 .../repository/impexp/ImportTransformer.java    |  44 ++++++++
 .../impexp/ImportTransformsShaper.java          |  95 ++++++++++++++++
 .../store/graph/v1/BulkImporterImpl.java        |   9 +-
 .../repository/impexp/ImportServiceTest.java    |   2 +-
 .../impexp/ImportTransformsShaperTest.java      | 108 +++++++++++++++++++
 .../repository/impexp/ImportTransformsTest.java |  32 ++++++
 ...AtlasEntityStoreV1BulkImportPercentTest.java |  10 ++
 10 files changed, 351 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 467d383..ced00b9 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -28,18 +28,15 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
-import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
 import javax.inject.Inject;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -101,6 +98,11 @@ public class AuditsWriter {
     private AtlasCluster saveCluster(String clusterName, String entityGuid, 
long lastModifiedTimestamp) throws AtlasBaseException {
         AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
         cluster.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("saveCluster: {}", cluster);
+        }
+
         return clusterService.save(cluster);
     }
 
@@ -116,68 +118,77 @@ public class AuditsWriter {
 
     private class ExportAudits {
         private AtlasExportRequest request;
-        private AtlasCluster cluster;
         private String targetClusterName;
         private String optionKeyReplicatedTo;
         private boolean replicationOptionState;
 
-        public void add(String userName, AtlasExportResult result, long 
startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException {
+        public void add(String userName, AtlasExportResult result,
+                        long startTime, long endTime,
+                        List<String> entityGuids) throws AtlasBaseException {
             optionKeyReplicatedTo = 
AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
             request = result.getRequest();
             replicationOptionState = 
isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo);
-            targetClusterName = 
getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
 
-            cluster = saveCluster(getCurrentClusterName());
+            saveClusters();
 
             auditService.add(userName, getCurrentClusterName(), 
targetClusterName,
                     ExportImportAuditEntry.OPERATION_EXPORT,
-                    AtlasType.toJson(result), startTime, endTime, 
!entitityGuids.isEmpty());
-
-            updateReplicationAttributeForExport(request, entitityGuids);
-        }
+                    AtlasType.toJson(result), startTime, endTime, 
!entityGuids.isEmpty());
 
-        private void updateReplicationAttributeForExport(AtlasExportRequest 
request, List<String> entityGuids) throws AtlasBaseException {
-            if(!replicationOptionState) {
+            if (result.getOperationStatus() == 
AtlasExportResult.OperationStatus.FAIL) {
                 return;
             }
 
             updateReplicationAttribute(replicationOptionState, 
targetClusterName,
-                    entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, 
0L);
+                    entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, 
result.getLastModifiedTimestamp());
+        }
+
+        private void saveClusters() throws AtlasBaseException {
+            saveCluster(getCurrentClusterName());
+
+            targetClusterName = 
getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
+            if(StringUtils.isNotEmpty(targetClusterName)) {
+                saveCluster(targetClusterName);
+            }
         }
     }
 
     private class ImportAudits {
         private AtlasImportRequest request;
         private boolean replicationOptionState;
-        private AtlasCluster cluster;
+        private String sourceClusterName;
+        private AtlasCluster sourceCluster;
         private String optionKeyReplicatedFrom;
-        private AtlasImportResult result;
 
-        public void add(String userName, AtlasImportResult result, long 
startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException {
-            this.result = result;
-            request = result.getRequest();
+        public void add(String userName, AtlasImportResult result,
+                        long startTime, long endTime,
+                        List<String> entityGuids) throws AtlasBaseException {
             optionKeyReplicatedFrom = 
AtlasImportRequest.OPTION_KEY_REPLICATED_FROM;
+            request = result.getRequest();
             replicationOptionState = 
isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom);
 
-            String sourceCluster = getClusterNameFromOptionsState();
-            cluster = saveCluster(sourceCluster);
+            saveClusters();
 
             auditService.add(userName,
-                    sourceCluster, getCurrentClusterName(),
-                    ExportImportAuditEntry.OPERATION_IMPORT, 
AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
+                    sourceClusterName, getCurrentClusterName(),
+                    ExportImportAuditEntry.OPERATION_IMPORT,
+                    AtlasType.toJson(result), startTime, endTime, 
!entityGuids.isEmpty());
 
-            updateReplicationAttributeForImport(entitityGuids);
-        }
+            if(result.getOperationStatus() == 
AtlasImportResult.OperationStatus.FAIL) {
+                return;
+            }
 
-        private void updateReplicationAttributeForImport(List<String> 
entityGuids) throws AtlasBaseException {
-            if(!replicationOptionState) return;
+            updateReplicationAttribute(replicationOptionState, 
this.sourceClusterName, entityGuids,
+                    Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER, 
result.getExportResult().getLastModifiedTimestamp());
+        }
 
-            String targetClusterName = cluster.getName();
+        private void saveClusters() throws AtlasBaseException {
+            saveCluster(getCurrentClusterName());
 
-            updateReplicationAttribute(replicationOptionState, 
targetClusterName,
-                    entityGuids,
-                    Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER,
-                    result.getExportResult().getLastModifiedTimestamp());
+            sourceClusterName = getClusterNameFromOptionsState();
+            if(StringUtils.isNotEmpty(sourceClusterName)) {
+                this.sourceCluster = saveCluster(sourceClusterName);
+            }
         }
 
         private String getClusterNameFromOptionsState() {

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
index 950850e..4462f2c 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
@@ -19,26 +19,19 @@
 package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.annotation.AtlasService;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasCluster;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.ogm.DataAccess;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
-import org.apache.atlas.repository.store.graph.v1.AtlasEntityStreamForImport;
-import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
 import org.apache.atlas.repository.store.graph.v1.EntityGraphMapper;
 import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType;
-import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -48,8 +41,6 @@ import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.atlas.repository.Constants.ATTR_NAME_REFERENCEABLE;
-
 @AtlasService
 public class ClusterService {
     private static final Logger LOG = 
LoggerFactory.getLogger(ClusterService.class);

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 8a184fa..095f60f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -48,17 +48,21 @@ public class ImportService {
     private final AtlasTypeDefStore typeDefStore;
     private final AtlasTypeRegistry typeRegistry;
     private final BulkImporter bulkImporter;
-    private AuditsWriter auditsWriter;
+    private final AuditsWriter auditsWriter;
+    private final ImportTransformsShaper importTransformsShaper;
 
     private long startTimestamp;
     private long endTimestamp;
 
     @Inject
-    public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry 
typeRegistry, BulkImporter bulkImporter, AuditsWriter auditsWriter) {
+    public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry 
typeRegistry, BulkImporter bulkImporter,
+                         AuditsWriter auditsWriter,
+                         ImportTransformsShaper importTransformsShaper) {
         this.typeDefStore = typeDefStore;
         this.typeRegistry = typeRegistry;
         this.bulkImporter = bulkImporter;
         this.auditsWriter = auditsWriter;
+        this.importTransformsShaper = importTransformsShaper;
     }
 
     public AtlasImportResult run(ZipSource source, String userName,
@@ -76,7 +80,7 @@ public class ImportService {
         AtlasImportResult result = new AtlasImportResult(request, userName, 
requestingIP, hostName, System.currentTimeMillis());
 
         try {
-            LOG.info("==> import(user={}, from={})", userName, requestingIP);
+            LOG.info("==> import(user={}, from={}, request={})", userName, 
requestingIP, request);
 
             String transforms = MapUtils.isNotEmpty(request.getOptions()) ? 
request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null;
 
@@ -85,8 +89,6 @@ public class ImportService {
             processTypes(source.getTypesDef(), result);
             setStartPosition(request, source);
             processEntities(userName, source, result);
-
-            
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
         } catch (AtlasBaseException excp) {
             LOG.error("import(user={}, from={}): failed", userName, 
requestingIP, excp);
 
@@ -110,12 +112,13 @@ public class ImportService {
             return;
         }
 
-        updateTransformsWithSubTypes(importTransform);
-        source.setImportTransform(importTransform);
+        importTransformsShaper.shape(importTransform);
 
+        source.setImportTransform(importTransform);
         if(LOG.isDebugEnabled()) {
             debugLog("   => transforms: {}", 
AtlasType.toJson(importTransform));
         }
+
     }
 
     private void debugLog(String s, Object... params) {
@@ -124,19 +127,6 @@ public class ImportService {
         LOG.debug(s, params);
     }
 
-    private void updateTransformsWithSubTypes(ImportTransforms 
importTransforms) throws AtlasBaseException {
-        String[] transformTypes = importTransforms.getTypes().toArray(new 
String[importTransforms.getTypes().size()]);
-        for (int i = 0; i < transformTypes.length; i++) {
-            String typeName = transformTypes[i];
-            AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(typeName);
-            if (entityType == null) {
-                continue;
-            }
-
-            importTransforms.addParentTransformsToSubTypes(typeName, 
entityType.getAllSubTypes());
-        }
-    }
-
     private void setStartPosition(AtlasImportRequest request, ZipSource 
source) throws AtlasBaseException {
         if (request.getStartGuid() != null) {
             source.setPositionUsingEntityGuid(request.getStartGuid());
@@ -201,6 +191,8 @@ public class ImportService {
         endTimestamp = System.currentTimeMillis();
         result.incrementMeticsCounter("duration", 
getDuration(this.endTimestamp, this.startTimestamp));
         result.setExportResult(importSource.getExportResult());
+
+        result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
         auditsWriter.write(userName, result, startTimestamp, endTimestamp, 
importSource.getCreationOrder());
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
index dc71c2a..213539d 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
@@ -35,6 +35,7 @@ public abstract class ImportTransformer {
     private static final String TRANSFORMER_NAME_LOWERCASE = "lowercase";
     private static final String TRANSFORMER_NAME_UPPERCASE = "uppercase";
     private static final String TRANSFORMER_NAME_REMOVE_CLASSIFICATION = 
"removeClassification";
+    private static final String TRANSFORMER_NAME_ADD_CLASSIFICATION = 
"addClassification";
     private static final String TRANSFORMER_NAME_REPLACE = "replace";
     private static final String TRANSFORMER_SET_DELETED = "setDeleted";
 
@@ -68,6 +69,9 @@ public abstract class ImportTransformer {
             ret = new ClearAttributes(name);
         } else if (key.equals(TRANSFORMER_SET_DELETED)) {
             ret = new SetDeleted();
+        } else if (key.equals(TRANSFORMER_NAME_ADD_CLASSIFICATION)) {
+            String name = (params == null || params.length < 1) ? "" : 
StringUtils.join(params, ":", 1, params.length);
+            ret = new AddClassification(name);
         } else {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error 
creating ImportTransformer. Unknown transformer: {}.", transformerSpec);
         }
@@ -145,6 +149,46 @@ public abstract class ImportTransformer {
         }
     }
 
+    static class AddClassification extends ImportTransformer {
+        private final String classificationName;
+
+        public AddClassification(String name) {
+            super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION);
+
+            this.classificationName = name;
+        }
+
+        @Override
+        public Object apply(Object o) {
+            if (!(o instanceof AtlasEntity)) {
+                return o;
+            }
+
+            AtlasEntity entity = (AtlasEntity) o;
+            if(entity.getClassifications() == null) {
+                entity.setClassifications(new 
ArrayList<AtlasClassification>());
+            }
+
+            for (AtlasClassification c : entity.getClassifications()) {
+                if (c.getTypeName().equals(classificationName)) {
+                    return entity;
+                }
+            }
+
+            entity.getClassifications().add(new 
AtlasClassification(classificationName));
+            return entity;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s=%s", "AddClassification", 
classificationName);
+        }
+
+        public String getClassificationName() {
+            return classificationName;
+        }
+    }
+
     static class RemoveClassification extends ImportTransformer {
         private final String classificationToBeRemoved;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
new file mode 100644
index 0000000..62eba45
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class ImportTransformsShaper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ImportTransformsShaper.class);
+
+    private final AtlasTypeRegistry typeRegistry;
+    private final AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    public ImportTransformsShaper(AtlasTypeRegistry typeRegistry, 
AtlasTypeDefStore typeDefStore) {
+        this.typeRegistry = typeRegistry;
+        this.typeDefStore = typeDefStore;
+    }
+
+    public void shape(ImportTransforms importTransform) throws 
AtlasBaseException {
+        getCreateClassifications(importTransform);
+        updateTransformsWithSubTypes(importTransform);
+    }
+
+    private void getCreateClassifications(ImportTransforms importTransform) 
throws AtlasBaseException {
+        Map<String, Map<String, List<ImportTransformer>>> mapMapList = 
importTransform.getTransforms();
+        for (Map<String, List<ImportTransformer>> mapList : 
mapMapList.values()) {
+            for (List<ImportTransformer> list : mapList.values()) {
+                for (ImportTransformer importTransformer : list) {
+                    if((importTransformer instanceof 
ImportTransformer.AddClassification)) {
+
+                        ImportTransformer.AddClassification addClassification 
= (ImportTransformer.AddClassification) importTransformer;
+                        
getCreateTag(addClassification.getClassificationName());
+                    }
+                }
+            }
+        }
+    }
+
+    private void updateTransformsWithSubTypes(ImportTransforms 
importTransforms) {
+        String[] transformTypes = importTransforms.getTypes().toArray(new 
String[importTransforms.getTypes().size()]);
+        for (int i = 0; i < transformTypes.length; i++) {
+            String typeName = transformTypes[i];
+            AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(typeName);
+            if (entityType == null) {
+                continue;
+            }
+
+            importTransforms.addParentTransformsToSubTypes(typeName, 
entityType.getAllSubTypes());
+        }
+    }
+
+    private String getCreateTag(String classificationName) throws 
AtlasBaseException {
+        AtlasClassificationDef classificationDef = 
typeRegistry.getClassificationDefByName(classificationName);
+        if(classificationDef != null) {
+            return classificationName;
+        }
+
+        classificationDef = new AtlasClassificationDef(classificationName);
+        AtlasTypesDef typesDef = new AtlasTypesDef();
+        
typesDef.setClassificationDefs(Collections.singletonList(classificationDef));
+        typeDefStore.createTypesDef(typesDef);
+        LOG.info("created classification: {}", classificationName);
+        return classificationName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
index 467ced7..62b393a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
@@ -129,7 +129,7 @@ public class BulkImporterImpl implements BulkImporter {
 
         String lastEntityImported = 
String.format("entity:last-imported:%s:[%s]:(%s)", 
currentEntity.getEntity().getTypeName(), currentIndex, 
currentEntity.getEntity().getGuid());
 
-        return updateImportProgress(LOG, currentIndex + 1, streamSize, 
currentPercent, lastEntityImported);
+        return updateImportProgress(LOG, currentIndex, streamSize, 
currentPercent, lastEntityImported);
     }
 
     @VisibleForTesting
@@ -137,12 +137,13 @@ public class BulkImporterImpl implements BulkImporter {
         final double tolerance   = 0.000001;
         final int    MAX_PERCENT = 100;
 
-        float   percent        = (float) ((currentIndex * MAX_PERCENT) / 
streamSize);
+        int     maxSize        = (currentIndex <= streamSize) ? streamSize : 
currentIndex;
+        float   percent        = (float) ((currentIndex * MAX_PERCENT) / 
maxSize);
         boolean updateLog      = Double.compare(percent, currentPercent) > 
tolerance;
-        float   updatedPercent = (MAX_PERCENT < streamSize) ? percent : 
((updateLog) ? ++currentPercent : currentPercent);
+        float   updatedPercent = (MAX_PERCENT < maxSize) ? percent : 
((updateLog) ? ++currentPercent : currentPercent);
 
         if (updateLog) {
-            log.info("bulkImport(): progress: {}% (of {}) - {}", (int) 
Math.ceil(percent), streamSize, additionalInfo);
+            log.info("bulkImport(): progress: {}% (of {}) - {}", (int) 
Math.ceil(percent), maxSize, additionalInfo);
         }
 
         return updatedPercent;

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 08bbcd2..b556fe9 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -204,7 +204,7 @@ public class ImportServiceTest extends ExportImportTestBase 
{
 
     @Test
     public void importServiceProcessesIOException() {
-        ImportService importService = new ImportService(typeDefStore, 
typeRegistry, null, null);
+        ImportService importService = new ImportService(typeDefStore, 
typeRegistry, null, null,null);
         AtlasImportRequest req = mock(AtlasImportRequest.class);
 
         Answer<Map> answer = new Answer<Map>() {

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
new file mode 100644
index 0000000..f894553
--- /dev/null
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class ImportTransformsShaperTest extends ExportImportTestBase {
+    @Inject
+    AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private ImportService importService;
+
+    @Inject
+    private AtlasEntityStore entityStore;
+
+    private final String TAG_NAME = "REPLICATED";
+
+    @BeforeClass
+    public void setup() throws IOException, AtlasBaseException {
+        basicSetup(typeDefStore, typeRegistry);
+        loadFsModel(typeDefStore, typeRegistry);
+    }
+
+    @Test
+    public void newTagIsCreatedAndEntitiesAreTagged() throws 
AtlasBaseException, IOException {
+        AtlasImportResult result = 
ZipFileResourceTestUtils.runImportWithParameters(importService,
+                getImporRequest(),
+                ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip"));
+
+        AtlasClassificationType classification = 
typeRegistry.getClassificationTypeByName(TAG_NAME);
+        assertNotNull(classification);
+        assertEntities(result.getProcessedEntities(), TAG_NAME);
+    }
+    private void assertEntities(List<String> entityGuids, String tagName) 
throws AtlasBaseException {
+        for (String guid : entityGuids) {
+            AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
this.entityStore.getById(guid);
+
+            assertNotNull(entityWithExtInfo);
+            assertTag(entityWithExtInfo, tagName);
+        }
+    }
+
+    private void assertTag(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo, String tagName) {
+        if(entityWithExtInfo.getReferredEntities() == null || 
entityWithExtInfo.getReferredEntities().size() == 0) {
+            return;
+        }
+
+        for (AtlasEntity entity : 
entityWithExtInfo.getReferredEntities().values()) {
+            assertTag(entity, tagName);
+        }
+    }
+
+    private void assertTag(AtlasEntity entity, String tagName) {
+
+        assertTrue(entity.getClassifications().size() > 0,
+                String.format("%s not tagged", entity.getTypeName()));
+        assertEquals(entity.getClassifications().get(0).getTypeName(), 
tagName);
+    }
+
+
+    private AtlasImportRequest getImporRequest() {
+        AtlasImportRequest request = new AtlasImportRequest();
+        request.getOptions().put("transforms", "{ \"Referenceable\": { \"*\":[ 
\"addClassification:REPLICATED\" ] } }");
+        return request;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
index b241dda..cd623d0 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
@@ -46,6 +46,8 @@ public class ImportTransformsTest {
     private final String jsonSingleClearAttrValue = "{ \"hive_table\": { 
\"*\":[ \"clearAttrValue:replicatedToCluster\", 
\"clearAttrValue:replicatedFromCluster\" ] } }";
     private final String jsonMultipleClearAttrValue = "{ \"hive_table\": { 
\"*\":[ \"clearAttrValue:replicatedToCluster,replicatedFromCluster\" ] } }";
     private final String jsonSetDeleted = "{ \"hive_table\": { \"*\":[ 
\"setDeleted\" ] } }";
+    private final String jsonAddClasification = "{ \"hive_table\": { \"*\":[ 
\"addClassification:REPLICATED\" ] } }";
+    private final String jsonAddClasification2 = "{ \"hive_table\": { \"*\":[ 
\"addClassification:REPLICATED_2\" ] } }";
 
     private ImportTransforms transform;
     private String HIVE_TABLE_ATTR_SYNC_INFO = "hive_table.syncInfo";
@@ -192,6 +194,36 @@ public class ImportTransformsTest {
         assertEquals(entity.getStatus(),  AtlasEntity.Status.DELETED);
     }
 
+    @Test
+    public void addClassification_AddsClassificationToEntitiy() throws 
AtlasBaseException {
+        AtlasEntity entity = getHiveTableAtlasEntity();
+        int existingClassificationsCount =  entity.getClassifications() != 
null ? entity.getClassifications().size() : 0;
+        ImportTransforms t = ImportTransforms.fromJson(jsonAddClasification);
+
+        assertTrue(t.getTransforms().size() > 0);
+
+        t.apply(entity);
+
+        assertNotNull(t);
+        assertEquals(entity.getClassifications().size(), 
existingClassificationsCount + 1);
+        addClassification_ExistingClassificationsAreHandled(entity);
+        addClassification_MultipleClassificationsAreAdded(entity);
+    }
+
+    private void 
addClassification_ExistingClassificationsAreHandled(AtlasEntity entity) throws 
AtlasBaseException {
+        int existingClassificationsCount =  entity.getClassifications() != 
null ? entity.getClassifications().size() : 0;
+        assertTrue(existingClassificationsCount > 0);
+        ImportTransforms.fromJson(jsonAddClasification).apply(entity);
+
+        assertEquals(entity.getClassifications().size(), 
existingClassificationsCount);
+    }
+
+    private void addClassification_MultipleClassificationsAreAdded(AtlasEntity 
entity) throws AtlasBaseException {
+        int existingClassificationsCount =  entity.getClassifications().size();
+        ImportTransforms.fromJson(jsonAddClasification2).apply(entity);
+
+        assertEquals(entity.getClassifications().size(), 
existingClassificationsCount + 1);
+    }
 
     private String[] getExtEntityExpectedValues(AtlasEntityWithExtInfo 
entityWithExtInfo) {
         String[] ret = new 
String[entityWithExtInfo.getReferredEntities().size()];

http://git-wip-us.apache.org/repos/asf/atlas/blob/325859ab/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
index 73dfe37..656ab10 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1BulkImportPercentTest.java
@@ -35,6 +35,7 @@ import static org.testng.Assert.assertTrue;
 public class AtlasEntityStoreV1BulkImportPercentTest {
 
     private final int MAX_PERCENT = 100;
+    private final float MAX_PERCENT_FLOAT = 100.0F;
     private List<Integer> percentHolder;
     private Logger log;
 
@@ -143,6 +144,15 @@ public class AtlasEntityStoreV1BulkImportPercentTest {
         assertEqualsForPercentHolder(expected);
     }
 
+    @Test
+    public void exceedingInitialStreamSize_KeepsPercentAt100() throws 
Exception {
+        runWithSize(4);
+        double[] expected = fillPercentHolderWith100();
+        float f = BulkImporterImpl.updateImportProgress(log, 5, 4, 100, 
"additional info");
+
+        assertTrue((f - MAX_PERCENT_FLOAT) <= 0.0001);
+    }
+
     private void runWithSize(int streamSize) throws Exception {
         float currentPercent = 0;
         setupPercentHolder(streamSize);

Reply via email to