ATLAS-2555: Migration-Import: Support for BigInteger, BigDecimal. Unit tests.


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/15967a93
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/15967a93
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/15967a93

Branch: refs/heads/master
Commit: 15967a9309b1fe8ca3645d82abd4749017c5553a
Parents: 7515915
Author: Ashutosh Mestry <[email protected]>
Authored: Fri Apr 13 09:58:53 2018 -0700
Committer: Ashutosh Mestry <[email protected]>
Committed: Fri Apr 13 10:28:26 2018 -0700

----------------------------------------------------------------------
 .../atlas/repository/graphdb/AtlasGraph.java    |    3 +
 .../graphdb/janus/AtlasJanusGraph.java          |    7 +
 .../graphdb/janus/AtlasJanusGraphDatabase.java  |    6 +
 .../janus/migration/AtlasGraphSONReader.java    |   10 +
 .../janus/migration/GraphSONTokensTP2.java      |   47 +-
 .../janus/migration/GraphSONUtility.java        |   20 +-
 .../janus/migration/JsonNodeParsers.java        |   36 +-
 .../janus/migration/JsonNodeProcessManager.java |    4 +-
 .../janus/migration/MappedElementCache.java     |   24 +-
 .../janus/migration/PostProcessManager.java     |    2 +-
 .../janus/migration/ReaderStatusManager.java    |   48 +-
 .../janus/migration/RelationshipTypeCache.java  |    5 -
 .../janus/migration/pc/WorkItemConsumer.java    |    2 +-
 .../janus/migration/pc/WorkItemManager.java     |    2 +-
 .../graphdb/janus/migration/BaseUtils.java      |  118 +
 .../GraphSONUtilityPostProcessTest.java         |   95 +
 .../janus/migration/GraphSONUtilityTest.java    |  104 +
 .../janus/migration/JsonNodeParsersTest.java    |  122 +
 .../janus/migration/MappedElementCacheTest.java |   72 +
 .../migration/ReaderStatusManagerTest.java      |   61 +
 .../janus/migration/WorkItemConsumerTest.java   |   96 +
 .../janus/migration/WorkItemManagerTest.java    |  116 +
 .../test/resources/atlas-migration-data.json    | 7338 ++++++++++++++++++
 .../janus/src/test/resources/col-legacy.json    |   73 +
 .../src/test/resources/db-type-legacy.json      |   84 +
 .../janus/src/test/resources/db-v-65544.json    |   78 +
 .../janus/src/test/resources/edge-legacy.json   |   27 +
 .../src/test/resources/table-v-147504.json      |  121 +
 .../repository/graphdb/titan0/Titan0Graph.java  |    6 +
 .../impexp/MigrationProgressService.java        |   91 +-
 .../migration/DataMigrationService.java         |    4 +-
 .../migration/HiveParititionTest.java           |    9 +-
 .../repository/migration/HiveStocksTest.java    |    9 +-
 .../migration/MigrationBaseAsserts.java         |    3 +-
 .../migration/MigrationProgressServiceTest.java |  141 +
 .../atlas/repository/migration/PathTest.java    |   84 +
 .../migration/RelationshipMappingTest.java      |    1 +
 .../resources/path_db/atlas-migration-data.json | 2189 ++++++
 .../path_db/atlas-migration-typesdef.json       |    1 +
 39 files changed, 11104 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
----------------------------------------------------------------------
diff --git 
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java 
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
index f252dc3..607baf6 100644
--- 
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
+++ 
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
@@ -28,6 +28,7 @@ import javax.script.ScriptException;
 
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.groovy.GroovyExpression;
+import org.apache.atlas.model.impexp.MigrationStatus;
 import org.apache.atlas.type.AtlasType;
 
 /**
@@ -320,4 +321,6 @@ public interface AtlasGraph<V, E> {
     boolean isMultiProperty(String name);
 
     void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream 
fs) throws AtlasBaseException;
+
+    MigrationStatus getMigrationStatus();
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index 2a1ce4e..c0b9c17 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -25,6 +25,7 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.groovy.GroovyExpression;
+import org.apache.atlas.model.impexp.MigrationStatus;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
@@ -33,6 +34,7 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.graphdb.GremlinVersion;
+import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
 import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery;
 import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
 import org.apache.atlas.type.AtlasType;
@@ -459,4 +461,9 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
     public void loadLegacyGraphSON(Map<String, String> relationshipCache, 
InputStream fs) throws AtlasBaseException {
         AtlasJanusGraphDatabase.loadLegacyGraphSON(relationshipCache, fs);
     }
+
+    @Override
+    public MigrationStatus getMigrationStatus() {
+        return AtlasJanusGraphDatabase.getMigrationStatus();
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index c9407bb..86cd299 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -21,9 +21,11 @@ package org.apache.atlas.repository.graphdb.janus;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.MigrationStatus;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.GraphDatabase;
 import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader;
+import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
 import 
org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer;
 import 
org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
 import 
org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
@@ -259,4 +261,8 @@ public class AtlasJanusGraphDatabase implements 
GraphDatabase<AtlasJanusVertex,
             LOG.info("Done! loadLegacyGraphSON.");
         }
     }
+
+    public static MigrationStatus getMigrationStatus() {
+        return ReaderStatusManager.get(getGraphInstance());
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
index 099e6b9..636e6e8 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java
@@ -103,6 +103,16 @@ public final class AtlasGraphSONReader {
                         processElement(parser, new 
JsonNodeParsers.ParseEdge(), startIndex);
                         break;
 
+                    case GraphSONTokensTP2.VERTEX_COUNT:
+                        parser.nextToken();
+                        LOG.info("Vertex count: {}", parser.getLongValue());
+                        break;
+
+                    case GraphSONTokensTP2.EDGE_COUNT:
+                        parser.nextToken();
+                        LOG.info("Edge count: {}", parser.getLongValue());
+                        break;
+
                     default:
                         throw new 
IllegalStateException(String.format("Unexpected token in GraphSON - %s", 
fieldName));
                 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java
index e8b0758..c44dfac 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java
@@ -19,27 +19,32 @@
 package org.apache.atlas.repository.graphdb.janus.migration;
 
 public final class GraphSONTokensTP2 {
-    public static final String _ID          = "_id";
-    public static final String _LABEL       = "_label";
-    public static final String _TYPE        = "_type";
-    public static final String _OUT_V       = "_outV";
-    public static final String _IN_V        = "_inV";
-    public static final String VALUE        = "value";
-    public static final String TYPE         = "type";
-    public static final String TYPE_LIST    = "list";
-    public static final String TYPE_STRING  = "string";
-    public static final String TYPE_DOUBLE  = "double";
-    public static final String TYPE_INTEGER = "integer";
-    public static final String TYPE_FLOAT   = "float";
-    public static final String TYPE_MAP     = "map";
-    public static final String TYPE_BOOLEAN = "boolean";
-    public static final String TYPE_LONG    = "long";
-    public static final String TYPE_SHORT   = "short";
-    public static final String TYPE_BYTE    = "byte";
-    public static final String TYPE_UNKNOWN = "unknown";
-    public static final String VERTICES     = "vertices";
-    public static final String EDGES        = "edges";
-    public static final String MODE         = "mode";
+    public static final String _ID              = "_id";
+    public static final String _LABEL           = "_label";
+    public static final String _TYPE            = "_type";
+    public static final String _OUT_V           = "_outV";
+    public static final String _IN_V            = "_inV";
+    public static final String VALUE            = "value";
+    public static final String TYPE             = "type";
+    public static final String TYPE_LIST        = "list";
+    public static final String TYPE_STRING      = "string";
+    public static final String TYPE_DOUBLE      = "double";
+    public static final String TYPE_INTEGER     = "integer";
+    public static final String TYPE_FLOAT       = "float";
+    public static final String TYPE_MAP         = "map";
+    public static final String TYPE_BOOLEAN     = "boolean";
+    public static final String TYPE_LONG        = "long";
+    public static final String TYPE_SHORT       = "short";
+    public static final String TYPE_BYTE        = "byte";
+    public static final String TYPE_BIG_DECIMAL = "bigdecimal";
+    public static final String TYPE_BIG_INTEGER = "biginteger";
+    public static final String TYPE_DATE        = "date";
+    public static final String TYPE_UNKNOWN     = "unknown";
+    public static final String VERTICES         = "vertices";
+    public static final String EDGES            = "edges";
+    public static final String MODE             = "mode";
+    public static final String VERTEX_COUNT     = "vertexCount";
+    public static final String EDGE_COUNT       = "edgeCount";
 
     private GraphSONTokensTP2() {
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
index 5120935..ec320b0 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.graphdb.janus.migration;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.repository.Constants;
+import org.apache.atlas.type.AtlasBuiltInTypes;
 import org.apache.commons.lang.StringUtils;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -43,6 +44,8 @@ class GraphSONUtility {
     private static final String EMPTY_STRING = "";
 
     private final RelationshipTypeCache relationshipTypeCache;
+    private static AtlasBuiltInTypes.AtlasBigIntegerType bigIntegerType = new 
AtlasBuiltInTypes.AtlasBigIntegerType();
+    private static AtlasBuiltInTypes.AtlasBigDecimalType bigDecimalType = new 
AtlasBuiltInTypes.AtlasBigDecimalType();
 
     public GraphSONUtility(final RelationshipTypeCache relationshipTypeCache) {
         this.relationshipTypeCache = relationshipTypeCache;
@@ -187,7 +190,9 @@ class GraphSONUtility {
         if (StringUtils.isNotEmpty(typeName)) {
             props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
         } else {
-            LOG.debug("Could not find relationship type for: {}", edgeLabel);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Could not find relationship type for: {}", 
edgeLabel);
+            }
         }
     }
 
@@ -223,6 +228,7 @@ class GraphSONUtility {
         }
     }
 
+    @VisibleForTesting
     static Map<String, Object> readProperties(final JsonNode node) {
         final Map<String, Object>                   map      = new HashMap<>();
         final Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
@@ -267,7 +273,13 @@ class GraphSONUtility {
         } else if 
(node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_INTEGER))
 {
             propertyValue = node.get(GraphSONTokensTP2.VALUE).intValue();
         } else if 
(node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_LONG))
 {
-            propertyValue = node.get(GraphSONTokensTP2.VALUE).longValue();
+            propertyValue = node.get(GraphSONTokensTP2.VALUE).asLong();
+        } else if 
(node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_BIG_DECIMAL))
 {
+            propertyValue = 
bigDecimalType.getNormalizedValue(node.get(GraphSONTokensTP2.VALUE));
+        } else if 
(node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_BIG_INTEGER))
 {
+            propertyValue = 
bigIntegerType.getNormalizedValue(node.get(GraphSONTokensTP2.VALUE));
+        } else if 
(node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_DATE))
 {
+            propertyValue = new 
Date(node.get(GraphSONTokensTP2.VALUE).asLong());
         } else if 
(node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_STRING))
 {
             propertyValue = node.get(GraphSONTokensTP2.VALUE).textValue();
         } else if 
(node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_LIST))
 {
@@ -308,6 +320,10 @@ class GraphSONUtility {
                 theValue = node.longValue();
             } else if (node.isTextual()) {
                 theValue = node.textValue();
+            } else if (node.isBigDecimal()) {
+                theValue = node.decimalValue();
+            } else if (node.isBigInteger()) {
+                theValue = node.bigIntegerValue();
             } else if (node.isArray()) {
                 // this is an array so just send it back so that it can be
                 // reprocessed to its primitive components

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java
index acf8cb2..cd65460 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java
@@ -73,42 +73,10 @@ public class JsonNodeParsers {
             return el;
         }
 
-        static Object getTypedValueFromJsonNode(final JsonNode node) {
-            Object theValue = null;
-
-            if (node != null && !node.isNull()) {
-                if (node.isBoolean()) {
-                    theValue = node.booleanValue();
-                } else if (node.isDouble()) {
-                    theValue = node.doubleValue();
-                } else if (node.isFloatingPointNumber()) {
-                    theValue = node.floatValue();
-                } else if (node.isInt()) {
-                    theValue = node.intValue();
-                } else if (node.isLong()) {
-                    theValue = node.longValue();
-                } else if (node.isTextual()) {
-                    theValue = node.textValue();
-                } else if (node.isArray()) {
-                    // this is an array so just send it back so that it can be
-                    // reprocessed to its primitive components
-                    theValue = node;
-                } else if (node.isObject()) {
-                    // this is an object so just send it back so that it can be
-                    // reprocessed to its primitive components
-                    theValue = node;
-                } else {
-                    theValue = node.textValue();
-                }
-            }
-
-            return theValue;
-        }
     }
 
     static class ParseEdge extends ParseElement {
         private static final String MESSAGE_EDGE          = "edge";
-        private static final String TYPE_NAME_NODE_NAME   = 
Constants.VERTEX_TYPE_PROPERTY_KEY;
 
 
         @Override
@@ -118,7 +86,7 @@ public class JsonNodeParsers {
 
         @Override
         Object getId(JsonNode node) {
-            return getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
+            return 
utility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
         }
 
         @Override
@@ -163,7 +131,7 @@ public class JsonNodeParsers {
 
         @Override
         Object getId(JsonNode node) {
-            return getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
+            return 
utility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
index c1e9d20..6d24619 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java
@@ -108,7 +108,7 @@ public class JsonNodeProcessManager {
             display("commit-size: {}: Done!", size);
         }
 
-        private void updateSchema(Map<String, Object> schema, 
org.apache.tinkerpop.shaded.jackson.databind.JsonNode node) {
+        private void updateSchema(Map<String, Object> schema, JsonNode node) {
             synchronized (graph) {
                 String typeName = parseElement.getType(node);
 
@@ -142,7 +142,7 @@ public class JsonNodeProcessManager {
 
             try {
                 Thread.sleep(WAIT_DURATION_AFTER_COMMIT_EXCEPTION);
-                for (org.apache.tinkerpop.shaded.jackson.databind.JsonNode n : 
nodes) {
+                for (JsonNode n : nodes) {
                     parseElement.parse(bulkLoadGraph, cache, n);
                 }
                 commitBulk();

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
index fe9e327..cca72ad 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.repository.graphdb.janus.migration;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.utils.LruCache;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -33,8 +34,11 @@ import static 
org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
 public class MappedElementCache {
     private static final Logger LOG = 
LoggerFactory.getLogger(MappedElementCache.class);
 
-    private final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 
100000);
-    private final Map<String, String> lruEdgeCache   = new LruCache<>(500, 
100000);
+    @VisibleForTesting
+    final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000);
+
+    @VisibleForTesting
+    final Map<String, String> lruEdgeCache   = new LruCache<>(500, 100000);
 
     public Vertex getMappedVertex(Graph gr, Object key) {
         try {
@@ -83,7 +87,8 @@ public class MappedElementCache {
         }
     }
 
-    private Vertex fetchVertex(Graph gr, Object key) {
+    @VisibleForTesting
+    Vertex fetchVertex(Graph gr, Object key) {
         try {
             return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, key).next();
         } catch (Exception ex) {
@@ -92,7 +97,8 @@ public class MappedElementCache {
         }
     }
 
-    private Edge fetchEdge(Graph gr, String key) {
+    @VisibleForTesting
+    Edge fetchEdge(Graph gr, String key) {
         try {
             return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, key).next();
         } catch (Exception ex) {
@@ -101,16 +107,8 @@ public class MappedElementCache {
         }
     }
 
-    public void clearVertexCache() {
+    public void clearAll() {
         lruVertexCache.clear();
-    }
-
-    public void clearEdgeCache() {
         lruEdgeCache.clear();
     }
-
-    public void clearAll() {
-        clearVertexCache();
-        clearEdgeCache();
-    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
index 4c5e357..d0a65f7 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.BlockingQueue;
 
 public class PostProcessManager {
-    private static class Consumer extends WorkItemConsumer<Object> {
+    static class Consumer extends WorkItemConsumer<Object> {
         private static final Logger LOG = 
LoggerFactory.getLogger(Consumer.class);
 
         private final Graph              bulkLoadGraph;

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java
index 67dbf22..11732c4 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.atlas.repository.graphdb.janus.migration;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.model.impexp.MigrationStatus;
 import org.apache.atlas.repository.Constants;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -43,7 +45,8 @@ public class ReaderStatusManager {
     public static final String STATUS_SUCCESS     = "SUCCESS";
     public static final String STATUS_FAILED      = "FAILED";
 
-    private Object migrationStatusId = null;
+    @VisibleForTesting
+    Object migrationStatusId = null;
     private Vertex migrationStatus   = null;
 
     public ReaderStatusManager(Graph graph, Graph bulkLoadGraph) {
@@ -71,7 +74,10 @@ public class ReaderStatusManager {
 
     public void update(Graph graph, Long counter) {
         migrationStatus.property(CURRENT_INDEX_PROPERTY, counter);
-        graph.tx().commit();
+
+        if(graph.features().graph().supportsTransactions()) {
+            graph.tx().commit();
+        }
     }
 
     public void update(Graph graph, Long counter, String status) {
@@ -91,7 +97,7 @@ public class ReaderStatusManager {
         return g.V(migrationStatusId).next();
     }
 
-    private Vertex fetchUsingTypeName(GraphTraversalSource g) {
+    private static Vertex fetchUsingTypeName(GraphTraversalSource g) {
         GraphTraversal src = g.V().has(Constants.ENTITY_TYPE_PROPERTY_KEY, 
MIGRATION_STATUS_TYPE_NAME);
         return src.hasNext() ? (Vertex) src.next() : null;
     }
@@ -109,8 +115,42 @@ public class ReaderStatusManager {
 
         migrationStatusId = v.id();
 
-        rGraph.tx().commit();
+        if(rGraph.features().graph().supportsTransactions()) {
+            rGraph.tx().commit();
+        }
 
         LOG.info("migrationStatus vertex created! v[{}]", migrationStatusId);
     }
+
+    public static MigrationStatus updateFromVertex(Graph graph, 
MigrationStatus ms) {
+        Vertex vertex = fetchUsingTypeName(graph.traversal());
+
+        if(ms == null) {
+            ms = new MigrationStatus();
+        }
+
+        ms.setStartTime((Date) vertex.property(START_TIME_PROPERTY).value());
+        ms.setEndTime((Date) vertex.property(END_TIME_PROPERTY).value());
+        ms.setCurrentIndex((Long) 
vertex.property(CURRENT_INDEX_PROPERTY).value());
+        ms.setOperationStatus((String) 
vertex.property(OPERATION_STATUS_PROPERTY).value());
+        ms.setTotalCount((Long) vertex.property(TOTAL_COUNT_PROPERTY).value());
+
+        return ms;
+    }
+
+    public static MigrationStatus get(Graph graph) {
+        MigrationStatus ms = new MigrationStatus();
+        try {
+            Vertex v = fetchUsingTypeName(graph.traversal());
+            ms.setStartTime((Date) v.property(START_TIME_PROPERTY).value());
+            ms.setEndTime((Date) v.property(END_TIME_PROPERTY).value());
+            ms.setCurrentIndex((long) 
v.property(CURRENT_INDEX_PROPERTY).value());
+            ms.setOperationStatus((String) 
v.property(OPERATION_STATUS_PROPERTY).value());
+            ms.setTotalCount((long) v.property(TOTAL_COUNT_PROPERTY).value());
+        } catch (Exception ex) {
+            LOG.error("get: failed!", ex);
+        }
+
+        return ms;
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
index 48b3595..e4e8264 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java
@@ -18,14 +18,9 @@
 
 package org.apache.atlas.repository.graphdb.janus.migration;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 
 public class RelationshipTypeCache {

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java
index 5de8db9..39b50de 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java
@@ -86,7 +86,7 @@ public abstract class WorkItemConsumer<T> implements Runnable 
{
 
     protected abstract void processItem(T item);
 
-    private void updateCommitTime(long commitTime) {
+    protected void updateCommitTime(long commitTime) {
         if (this.maxCommitTimeSeconds < commitTime) {
             this.maxCommitTimeSeconds = commitTime;
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java
index 7e21495..d50ada4 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java
@@ -55,7 +55,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
     public void shutdown() throws InterruptedException {
         int avgCommitTimeSeconds = getAvgCommitTimeSeconds() * 2;
 
-        LOG.info("WorkItemManager: Shutdown started. Will wait for: {} 
seconds...", avgCommitTimeSeconds);
+        LOG.info("WorkItemManager: Shutdown started. Will wait for: {} 
minutes...", avgCommitTimeSeconds);
 
         service.shutdown();
         service.awaitTermination(avgCommitTimeSeconds, TimeUnit.MINUTES);

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/BaseUtils.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/BaseUtils.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/BaseUtils.java
new file mode 100644
index 0000000..de43e0f
--- /dev/null
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/BaseUtils.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
+import org.testng.ITestContext;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+
+import static org.testng.AssertJUnit.assertTrue;
+
+public class BaseUtils {
+    private static final String resourcesDirRelativePath = 
"/src/test/resources/";
+    private String resourceDir;
+
+    protected final RelationshipTypeCache emptyRelationshipCache = new 
RelationshipTypeCache(new HashMap<>());
+    protected GraphSONUtility graphSONUtility;
+
+    protected Object[][] getJsonNodeFromFile(String s) throws IOException {
+        File f = new File(getFilePath(s));
+        return new Object[][]{{getEntityNode(FileUtils.readFileToString(f))}};
+    }
+
+    protected String getFilePath(String fileName) {
+        return Paths.get(resourceDir, fileName).toString();
+    }
+
+    @BeforeClass
+    public void setup() {
+        resourceDir = System.getProperty("user.dir") + 
resourcesDirRelativePath;
+        graphSONUtility = new GraphSONUtility(emptyRelationshipCache);
+    }
+
+    protected Object getId(JsonNode node) {
+        GraphSONUtility gu = graphSONUtility;
+        return gu.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
+    }
+
+
+    private JsonNode getEntityNode(String json) throws IOException {
+        GraphSONMapper.Builder builder = GraphSONMapper.build();
+        final ObjectMapper mapper  = 
builder.embedTypes(false).create().createMapper();
+        return mapper.readTree(json);
+    }
+
+    protected void addVertex(TinkerGraph tg, JsonNode node) {
+        GraphSONUtility utility = new GraphSONUtility(emptyRelationshipCache);
+        utility.vertexFromJson(tg, node);
+    }
+
+    protected void addEdge(TinkerGraph tg, MappedElementCache cache) throws 
IOException {
+        GraphSONUtility gu = graphSONUtility;
+
+        gu.vertexFromJson(tg, (JsonNode) (getDBV(null)[0][0]));
+        gu.vertexFromJson(tg, (JsonNode) (getTableV(null))[0][0]);
+        gu.edgeFromJson(tg, cache, (JsonNode) getEdge(null)[0][0]);
+    }
+
+    protected Vertex fetchTableVertex(TinkerGraph tg) {
+        GraphTraversal query = tg.traversal().V().has("__typeName", 
"hive_table");
+        assertTrue(query.hasNext());
+
+        return (Vertex) query.next();
+    }
+
+    @DataProvider(name = "col1")
+    public Object[][] getCol1(ITestContext context) throws IOException {
+        return getJsonNodeFromFile("col-legacy.json");
+    }
+
+    @DataProvider(name = "dbType")
+    public Object[][] getDbType(ITestContext context) throws IOException {
+        return getJsonNodeFromFile("db-type-legacy.json");
+    }
+
+    @DataProvider(name = "edge")
+    public Object[][] getEdge(ITestContext context) throws IOException {
+        return getJsonNodeFromFile("edge-legacy.json");
+    }
+
+    @DataProvider(name = "dbV")
+    public Object[][] getDBV(ITestContext context) throws IOException {
+        return getJsonNodeFromFile("db-v-65544.json");
+    }
+
+
+    @DataProvider(name = "tableV")
+    public Object[][] getTableV(ITestContext context) throws IOException {
+        return getJsonNodeFromFile("table-v-147504.json");
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityPostProcessTest.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityPostProcessTest.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityPostProcessTest.java
new file mode 100644
index 0000000..4d73c78
--- /dev/null
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityPostProcessTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class GraphSONUtilityPostProcessTest extends BaseUtils {
+    final String HIVE_COLUMNS_PROPERTY = "hive_table.columns";
+    final String edgeId1 = "816u-35tc-ao0l-47so";
+    final String edgeId2 = "82rq-35tc-ao0l-2glc";
+
+    final String edgeId1x = "816u-35tc-ao0l-xxxx";
+    final String edgeId2x = "82rq-35tc-ao0l-xxxx";
+
+    private TinkerGraph tg;
+    private MappedElementCache cache = new MappedElementCache();
+    private Vertex tableV;
+
+    @Test
+    public void noRefNoUpdate() throws IOException {
+        tg = TinkerGraph.open();
+        graphSONUtility = new GraphSONUtility(emptyRelationshipCache);
+
+        addEdge(tg, cache);
+
+        tableV = fetchTableVertex(tg);
+        assertNotNull(tableV);
+
+        assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
+
+        graphSONUtility.replaceReferencedEdgeIdForList(tg, cache, tableV, 
HIVE_COLUMNS_PROPERTY);
+        assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
+    }
+
+    @Test(dependsOnMethods = "noRefNoUpdate")
+    public void refFoundVertexUpdated() throws IOException {
+
+        cache.lruEdgeCache.put(edgeId1, edgeId1x);
+        cache.lruEdgeCache.put(edgeId2, edgeId2x);
+
+        graphSONUtility.replaceReferencedEdgeIdForList(tg, cache, tableV, 
HIVE_COLUMNS_PROPERTY);
+        assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1x, edgeId2x, tableV);
+    }
+
+    @Test(dependsOnMethods = "refFoundVertexUpdated")
+    public void updateUsingPostProcessConsumer() throws IOException {
+        MappedElementCache cache = new MappedElementCache();
+        BlockingQueue<Object> bc = new BlockingArrayQueue<>();
+        PostProcessManager.Consumer consumer = new 
PostProcessManager.Consumer(bc, tg, graphSONUtility,
+                new String[] {HIVE_COLUMNS_PROPERTY}, cache, 5);
+
+        cache.lruEdgeCache.put(edgeId1x, edgeId1);
+        cache.lruEdgeCache.put(edgeId2x, edgeId2);
+        consumer.processItem(tableV.id());
+
+        assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
+    }
+
+    private void assertListProperty(String HIVE_COLUMNS_PROPERTY, String 
edgeId1, String edgeId2, Vertex tableV) {
+        assertTrue(tableV.property(HIVE_COLUMNS_PROPERTY).isPresent());
+        List list = (List) tableV.property(HIVE_COLUMNS_PROPERTY).value();
+
+        assertEquals(list.size(), 2);
+        assertEquals(list.get(0), edgeId1);
+        assertEquals(list.get(1), edgeId2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityTest.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityTest.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityTest.java
new file mode 100644
index 0000000..794b547
--- /dev/null
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY;
+import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
+import static org.testng.Assert.*;
+
+public class GraphSONUtilityTest extends BaseUtils {
+
+    @Test(dataProvider = "col1")
+    public void idFetch(JsonNode node) {
+        Object o = 
GraphSONUtility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
+
+        assertNotNull(o);
+        assertEquals((int) o, 98336);
+    }
+
+    @Test(dataProvider = "col1")
+    public void verifyReadProperties(JsonNode node) {
+        Map<String, Object> props = GraphSONUtility.readProperties(node);
+
+        assertEquals(props.get("__superTypeNames").getClass(), 
ArrayList.class);
+        assertEquals(props.get("Asset.name").getClass(), String.class);
+        assertEquals(props.get("hive_column.position").getClass(), 
Integer.class);
+        assertEquals(props.get("__timestamp").getClass(), Long.class);
+
+        assertNotNull(props);
+    }
+
+    @Test(dataProvider = "col1")
+    public void dataNodeReadAndVertexAddedToGraph(JsonNode entityNode) throws 
IOException {
+        TinkerGraph tg = TinkerGraph.open();
+        GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache);
+        Map<String, Object> map = gu.vertexFromJson(tg, entityNode);
+
+        assertNull(map);
+        assertEquals((long) tg.traversal().V().count().next(), 1L);
+
+        Vertex v = tg.vertices().next();
+        assertTrue(v.property(VERTEX_ID_IN_IMPORT_KEY).isPresent());
+    }
+
+    @Test(dataProvider = "dbType")
+    public void typeNodeReadAndVertexNotAddedToGraph(JsonNode entityNode) 
throws IOException {
+        TinkerGraph tg = TinkerGraph.open();
+        GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache);
+        gu.vertexFromJson(tg, entityNode);
+
+        Assert.assertEquals((long) tg.traversal().V().count().next(), 0L);
+    }
+
+    @Test
+    public void edgeReadAndAddedToGraph() throws IOException {
+        TinkerGraph tg = TinkerGraph.open();
+        GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache);
+        Map<String, Object> m = null;
+
+        m = gu.vertexFromJson(tg, (JsonNode) (getDBV(null)[0][0]));
+        assertNull(m);
+
+        m = gu.vertexFromJson(tg, (JsonNode) (getTableV(null))[0][0]);
+        assertNull(m);
+
+        m = gu.edgeFromJson(tg, new MappedElementCache(), (JsonNode) 
getEdge(null)[0][0]);
+        assertNull(m);
+
+        Assert.assertEquals((long) tg.traversal().V().count().next(), 2L);
+        Assert.assertEquals((long) tg.traversal().E().count().next(), 1L);
+
+        Edge e = tg.edges().next();
+        assertTrue(e.property(EDGE_ID_IN_IMPORT_KEY).isPresent());
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsersTest.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsersTest.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsersTest.java
new file mode 100644
index 0000000..170b1af
--- /dev/null
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsersTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
+
+public class JsonNodeParsersTest extends BaseUtils {
+
+    @Test(dataProvider = "col1")
+    public void parseVertex(JsonNode nd) {
+        final int COL1_ORIGINAL_ID = 98336;
+
+        Object nodeId = getId(nd);
+        TinkerGraph tg = TinkerGraph.open();
+
+        JsonNodeParsers.ParseElement pe = new JsonNodeParsers.ParseVertex();
+        pe.setContext(graphSONUtility);
+        pe.parse(tg, new MappedElementCache(),  nd);
+
+        Vertex v = tg.vertices().next();
+        Vertex vUsingPe = (Vertex) pe.get(tg, nodeId);
+        Vertex vUsingOriginalId = (Vertex) pe.getByOriginalId(tg, 
COL1_ORIGINAL_ID);
+        Vertex vUsingOriginalId2 = (Vertex) pe.getByOriginalId(tg, nd);
+
+        updateParseElement(tg, pe, vUsingPe);
+
+        assertNotNull(v);
+        assertNotNull(vUsingPe);
+        assertNotNull(vUsingOriginalId);
+
+        assertEquals(v.id(), vUsingPe.id());
+        assertEquals(nodeId, pe.getId(nd));
+        assertFalse(pe.isTypeNode(nd));
+        assertEquals(pe.getType(nd), "\"hive_column\"");
+        assertEquals(vUsingOriginalId.id(), v.id());
+        assertEquals(vUsingOriginalId2.id(), v.id());
+
+        assertProperties(vUsingPe);
+    }
+
+    @Test(dataProvider = "edge")
+    public void parseEdge(JsonNode nd) throws IOException {
+        final String EDGE_ORIGINAL_ID = "8k5i-35tc-acyd-1eko";
+        Object nodeId = getId(nd);
+
+        TinkerGraph tg = TinkerGraph.open();
+        MappedElementCache cache = new MappedElementCache();
+        JsonNodeParsers.ParseElement peVertex = new 
JsonNodeParsers.ParseVertex();
+        peVertex.setContext(graphSONUtility);
+
+        peVertex.parse(tg, cache, (JsonNode) (getDBV(null)[0][0]));
+        peVertex.parse(tg, cache, (JsonNode) (getTableV(null)[0][0]));
+
+        JsonNodeParsers.ParseElement pe = new JsonNodeParsers.ParseEdge();
+        pe.setContext(graphSONUtility);
+        pe.parse(tg, cache, (JsonNode) getEdge(null)[0][0]);
+
+        updateParseElement(tg, pe, nodeId);
+
+        Edge e = tg.edges().next();
+        Edge eUsingPe = (Edge) pe.get(tg, nodeId);
+        Edge eUsingOriginalId = (Edge) pe.getByOriginalId(tg, 
EDGE_ORIGINAL_ID);
+        Edge eUsingOriginalId2 = (Edge) pe.getByOriginalId(tg, nd);
+
+        assertNotNull(e);
+        assertNotNull(eUsingPe);
+        assertNotNull(eUsingOriginalId);
+
+        assertEquals(e.id(), eUsingPe.id());
+        assertEquals(nodeId, pe.getId(nd));
+        assertFalse(pe.isTypeNode(nd));
+        assertEquals(eUsingOriginalId.id(), e.id());
+        assertEquals(eUsingOriginalId2.id(), e.id());
+
+        assertProperties(e);
+    }
+
+    private void updateParseElement(TinkerGraph tg, 
JsonNodeParsers.ParseElement pe, Object nodeId) {
+        Map<String, Object> props = new HashMap<>();
+        props.put("k1", "v1");
+        props.put("k2", "v2");
+        pe.update(tg, nodeId, props);
+    }
+
+    private void assertProperties(Element v) {
+        assertNotNull(v);
+        assertTrue(v.property("k1").isPresent());
+        assertTrue(v.property("k2").isPresent());
+
+        assertEquals(v.property("k1").value(), "v1");
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCacheTest.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCacheTest.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCacheTest.java
new file mode 100644
index 0000000..cac09d2
--- /dev/null
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCacheTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.testng.ITestContext;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.testng.Assert.*;
+
+public class MappedElementCacheTest extends BaseUtils {
+
+    @Test(dataProvider = "col1")
+    public void vertexFetch(JsonNode node) {
+        MappedElementCache cache = new MappedElementCache();
+        TinkerGraph tg = TinkerGraph.open();
+
+        addVertex(tg, node);
+
+        Vertex vx = cache.getMappedVertex(tg, 98336);
+        assertNotNull(vx);
+        assertEquals(cache.lruVertexCache.size(), 1);
+        assertEquals(cache.lruEdgeCache.size(), 0);
+    }
+
+    @Test
+    public void edgeFetch() throws IOException {
+        MappedElementCache cache = new MappedElementCache();
+        TinkerGraph tg = TinkerGraph.open();
+
+        addEdge(tg, cache);
+
+        assertEquals(cache.lruVertexCache.size(), 2);
+        assertEquals(cache.lruEdgeCache.size(), 0);
+    }
+
+
+    @Test
+    public void nonExistentVertexReturnsNull() {
+        TinkerGraph tg = TinkerGraph.open();
+        MappedElementCache cache = new MappedElementCache();
+
+        assertNull(cache.fetchVertex(tg, 1111));
+        assertNull(cache.fetchEdge(tg, "abcd"));
+    }
+
+    @DataProvider(name = "col1")
+    public Object[][] getCol1(ITestContext context) throws IOException {
+        return getJsonNodeFromFile("col-legacy.json");
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManagerTest.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManagerTest.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManagerTest.java
new file mode 100644
index 0000000..8c3d157
--- /dev/null
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManagerTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.atlas.model.impexp.MigrationStatus;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class ReaderStatusManagerTest {
+    @Test
+    public void createsNewStatusNode() {
+        TinkerGraph tg = TinkerGraph.open();
+        ReaderStatusManager sm = new ReaderStatusManager(tg, tg);
+        assertEquals(sm.getStartIndex(), 0L);
+
+        assertNotNull(tg.traversal().V(sm.migrationStatusId).next());
+
+        MigrationStatus ms = ReaderStatusManager.updateFromVertex(tg, null);
+        assertEquals(ms.getCurrentIndex(), 0L);
+        assertEquals(ms.getTotalCount(), 0L);
+        assertEquals(ms.getOperationStatus(), "NOT STARTED");
+        assertNotNull(ms.getStartTime());
+        assertNotNull(ms.getEndTime());
+    }
+
+    @Test
+    public void verifyUpdates() {
+        long expectedTotalCount = 1001L;
+        String expectedOperationStatus = "SUCCESS";
+
+        TinkerGraph tg = TinkerGraph.open();
+        ReaderStatusManager sm = new ReaderStatusManager(tg, tg);
+
+        sm.update(tg, 1000L, "IN PROGRESS");
+        sm.end(tg, expectedTotalCount, expectedOperationStatus);
+
+        MigrationStatus ms = ReaderStatusManager.updateFromVertex(tg, null);
+        assertEquals(ms.getCurrentIndex(), expectedTotalCount);
+        assertEquals(ms.getTotalCount(), expectedTotalCount);
+        assertEquals(ms.getOperationStatus(), expectedOperationStatus);
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemConsumerTest.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemConsumerTest.java
new file mode 100644
index 0000000..4cd4c2c
--- /dev/null
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemConsumerTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class WorkItemConsumerTest {
+
+    private class IntegerConsumerSpy extends WorkItemConsumer<Integer> {
+        boolean commitDirtyCalled = false;
+        private boolean updateCommitTimeCalled;
+
+        public IntegerConsumerSpy(BlockingQueue<Integer> queue) {
+            super(queue);
+        }
+
+        @Override
+        protected void doCommit() {
+
+        }
+
+        @Override
+        protected void processItem(Integer item) {
+
+        }
+
+        @Override
+        protected void commitDirty() {
+            commitDirtyCalled = true;
+            super.commitDirty();
+        }
+
+        @Override
+        protected void updateCommitTime(long commitTime) {
+            updateCommitTimeCalled = true;
+        }
+
+        public boolean isCommitDirtyCalled() {
+            return commitDirtyCalled;
+        }
+
+        public boolean isUpdateCommitTimeCalled() {
+            return updateCommitTimeCalled;
+        }
+    }
+
+
+    @Test
+    public void callingRunOnEmptyQueueCallsDoesNotCallCommitDirty() {
+        BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5);
+
+        IntegerConsumerSpy ic = new IntegerConsumerSpy(bc);
+        ic.run();
+
+        assertTrue(bc.isEmpty());
+        assertTrue(ic.isCommitDirtyCalled());
+        assertFalse(ic.isUpdateCommitTimeCalled());
+    }
+
+
+    @Test
+    public void runOnQueueRemovesItemFromQueuCallsCommitDirty() {
+        BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5);
+        bc.add(1);
+
+        IntegerConsumerSpy ic = new IntegerConsumerSpy(bc);
+        ic.run();
+
+        assertTrue(bc.isEmpty());
+        assertTrue(ic.isCommitDirtyCalled());
+        assertTrue(ic.isUpdateCommitTimeCalled());
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemManagerTest.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemManagerTest.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemManagerTest.java
new file mode 100644
index 0000000..25488bd
--- /dev/null
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemManagerTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graphdb.janus.migration;
+
+import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder;
+import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
+import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager;
+import org.apache.commons.lang3.RandomUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.SkipException;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.testng.Assert.assertEquals;
+
+public class WorkItemManagerTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(WorkItemManagerTest.class);
+
+    private class IntegerConsumer extends WorkItemConsumer<Integer> {
+
+        private final ConcurrentLinkedQueue<Integer> target;
+
+        public IntegerConsumer(BlockingQueue<Integer> queue, 
ConcurrentLinkedQueue<Integer> target) {
+            super(queue);
+            this.target = target;
+        }
+
+        @Override
+        protected void doCommit() {
+            try {
+                Thread.sleep(20 * RandomUtils.nextInt(10, 15));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        protected void processItem(Integer item) {
+            LOG.info("adding: {}: size: {}", item, target.size());
+            target.add(item);
+        }
+    }
+
+    private class IntegerConsumerBuilder implements 
WorkItemBuilder<IntegerConsumer, Integer> {
+        ConcurrentLinkedQueue<Integer> integers = new 
ConcurrentLinkedQueue<>();
+
+        @Override
+        public IntegerConsumer build(BlockingQueue<Integer> queue) {
+            return new IntegerConsumer(queue, integers);
+        }
+    }
+
+    @Test
+    public void oneWorkerSequences() {
+        IntegerConsumerBuilder cb = new IntegerConsumerBuilder();
+        int numberOfItems = 10;
+        try {
+            WorkItemManager<Integer, WorkItemConsumer> wi = 
getWorkItemManger(cb, 1);
+            for (int i = 0; i < numberOfItems; i++) {
+                wi.produce(i);
+            }
+
+            wi.shutdown();
+        } catch (InterruptedException e) {
+            throw new SkipException("Test skipped!");
+        }
+
+        assertEquals(cb.integers.size(), numberOfItems);
+        Integer[] ints = cb.integers.toArray(new Integer[]{});
+        for (int i = 0; i < numberOfItems; i++) {
+            assertEquals(ints[i], i, i);
+        }
+    }
+
+
+    @Test
+    public void multipleWorkersUnpredictableSequence() {
+        IntegerConsumerBuilder cb = new IntegerConsumerBuilder();
+        int numberOfItems = 100;
+        try {
+            WorkItemManager<Integer, WorkItemConsumer> wi = 
getWorkItemManger(cb, 5);
+            for (int i = 0; i < numberOfItems; i++) {
+                wi.produce(i);
+            }
+
+            wi.shutdown();
+        } catch (InterruptedException e) {
+            throw new SkipException("Test skipped!");
+        }
+
+        assertEquals(cb.integers.size(), numberOfItems);
+    }
+
+    private WorkItemManager<Integer, WorkItemConsumer> 
getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) {
+        return new WorkItemManager<>(cb, 5, numWorkers);
+    }
+}

Reply via email to