ATLAS-2555: Migration-Import: Support for BigInteger, BigDecimal. Unit tests.
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/15967a93 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/15967a93 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/15967a93 Branch: refs/heads/master Commit: 15967a9309b1fe8ca3645d82abd4749017c5553a Parents: 7515915 Author: Ashutosh Mestry <[email protected]> Authored: Fri Apr 13 09:58:53 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Fri Apr 13 10:28:26 2018 -0700 ---------------------------------------------------------------------- .../atlas/repository/graphdb/AtlasGraph.java | 3 + .../graphdb/janus/AtlasJanusGraph.java | 7 + .../graphdb/janus/AtlasJanusGraphDatabase.java | 6 + .../janus/migration/AtlasGraphSONReader.java | 10 + .../janus/migration/GraphSONTokensTP2.java | 47 +- .../janus/migration/GraphSONUtility.java | 20 +- .../janus/migration/JsonNodeParsers.java | 36 +- .../janus/migration/JsonNodeProcessManager.java | 4 +- .../janus/migration/MappedElementCache.java | 24 +- .../janus/migration/PostProcessManager.java | 2 +- .../janus/migration/ReaderStatusManager.java | 48 +- .../janus/migration/RelationshipTypeCache.java | 5 - .../janus/migration/pc/WorkItemConsumer.java | 2 +- .../janus/migration/pc/WorkItemManager.java | 2 +- .../graphdb/janus/migration/BaseUtils.java | 118 + .../GraphSONUtilityPostProcessTest.java | 95 + .../janus/migration/GraphSONUtilityTest.java | 104 + .../janus/migration/JsonNodeParsersTest.java | 122 + .../janus/migration/MappedElementCacheTest.java | 72 + .../migration/ReaderStatusManagerTest.java | 61 + .../janus/migration/WorkItemConsumerTest.java | 96 + .../janus/migration/WorkItemManagerTest.java | 116 + .../test/resources/atlas-migration-data.json | 7338 ++++++++++++++++++ .../janus/src/test/resources/col-legacy.json | 73 + .../src/test/resources/db-type-legacy.json | 84 + .../janus/src/test/resources/db-v-65544.json | 78 + .../janus/src/test/resources/edge-legacy.json | 27 + .../src/test/resources/table-v-147504.json | 121 + .../repository/graphdb/titan0/Titan0Graph.java | 6 + .../impexp/MigrationProgressService.java | 91 +- .../migration/DataMigrationService.java | 4 +- .../migration/HiveParititionTest.java | 9 +- .../repository/migration/HiveStocksTest.java | 9 +- .../migration/MigrationBaseAsserts.java | 3 +- .../migration/MigrationProgressServiceTest.java | 141 + .../atlas/repository/migration/PathTest.java | 84 + .../migration/RelationshipMappingTest.java | 1 + .../resources/path_db/atlas-migration-data.json | 2189 ++++++ .../path_db/atlas-migration-typesdef.json | 1 + 39 files changed, 11104 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java ---------------------------------------------------------------------- diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java index f252dc3..607baf6 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java @@ -28,6 +28,7 @@ import javax.script.ScriptException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.groovy.GroovyExpression; +import org.apache.atlas.model.impexp.MigrationStatus; import org.apache.atlas.type.AtlasType; /** @@ -320,4 +321,6 @@ public interface AtlasGraph<V, E> { boolean isMultiProperty(String name); void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException; + + MigrationStatus getMigrationStatus(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index 2a1ce4e..c0b9c17 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -25,6 +25,7 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.groovy.GroovyExpression; +import org.apache.atlas.model.impexp.MigrationStatus; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraphManagement; @@ -33,6 +34,7 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.GremlinVersion; +import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager; import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery; import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter; import org.apache.atlas.type.AtlasType; @@ -459,4 +461,9 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException { AtlasJanusGraphDatabase.loadLegacyGraphSON(relationshipCache, fs); } + + @Override + public MigrationStatus getMigrationStatus() { + return AtlasJanusGraphDatabase.getMigrationStatus(); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java index c9407bb..86cd299 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java @@ -21,9 +21,11 @@ package org.apache.atlas.repository.graphdb.janus; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.MigrationStatus; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.GraphDatabase; import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader; +import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager; import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer; @@ -259,4 +261,8 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, LOG.info("Done! loadLegacyGraphSON."); } } + + public static MigrationStatus getMigrationStatus() { + return ReaderStatusManager.get(getGraphInstance()); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java index 099e6b9..636e6e8 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/AtlasGraphSONReader.java @@ -103,6 +103,16 @@ public final class AtlasGraphSONReader { processElement(parser, new JsonNodeParsers.ParseEdge(), startIndex); break; + case GraphSONTokensTP2.VERTEX_COUNT: + parser.nextToken(); + LOG.info("Vertex count: {}", parser.getLongValue()); + break; + + case GraphSONTokensTP2.EDGE_COUNT: + parser.nextToken(); + LOG.info("Edge count: {}", parser.getLongValue()); + break; + default: throw new IllegalStateException(String.format("Unexpected token in GraphSON - %s", fieldName)); } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java index e8b0758..c44dfac 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONTokensTP2.java @@ -19,27 +19,32 @@ package org.apache.atlas.repository.graphdb.janus.migration; public final class GraphSONTokensTP2 { - public static final String _ID = "_id"; - public static final String _LABEL = "_label"; - public static final String _TYPE = "_type"; - public static final String _OUT_V = "_outV"; - public static final String _IN_V = "_inV"; - public static final String VALUE = "value"; - public static final String TYPE = "type"; - public static final String TYPE_LIST = "list"; - public static final String TYPE_STRING = "string"; - public static final String TYPE_DOUBLE = "double"; - public static final String TYPE_INTEGER = "integer"; - public static final String TYPE_FLOAT = "float"; - public static final String TYPE_MAP = "map"; - public static final String TYPE_BOOLEAN = "boolean"; - public static final String TYPE_LONG = "long"; - public static final String TYPE_SHORT = "short"; - public static final String TYPE_BYTE = "byte"; - public static final String TYPE_UNKNOWN = "unknown"; - public static final String VERTICES = "vertices"; - public static final String EDGES = "edges"; - public static final String MODE = "mode"; + public static final String _ID = "_id"; + public static final String _LABEL = "_label"; + public static final String _TYPE = "_type"; + public static final String _OUT_V = "_outV"; + public static final String _IN_V = "_inV"; + public static final String VALUE = "value"; + public static final String TYPE = "type"; + public static final String TYPE_LIST = "list"; + public static final String TYPE_STRING = "string"; + public static final String TYPE_DOUBLE = "double"; + public static final String TYPE_INTEGER = "integer"; + public static final String TYPE_FLOAT = "float"; + public static final String TYPE_MAP = "map"; + public static final String TYPE_BOOLEAN = "boolean"; + public static final String TYPE_LONG = "long"; + public static final String TYPE_SHORT = "short"; + public static final String TYPE_BYTE = "byte"; + public static final String TYPE_BIG_DECIMAL = "bigdecimal"; + public static final String TYPE_BIG_INTEGER = "biginteger"; + public static final String TYPE_DATE = "date"; + public static final String TYPE_UNKNOWN = "unknown"; + public static final String VERTICES = "vertices"; + public static final String EDGES = "edges"; + public static final String MODE = "mode"; + public static final String VERTEX_COUNT = "vertexCount"; + public static final String EDGE_COUNT = "edgeCount"; private GraphSONTokensTP2() { } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java index 5120935..ec320b0 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtility.java @@ -20,6 +20,7 @@ package org.apache.atlas.repository.graphdb.janus.migration; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.repository.Constants; +import org.apache.atlas.type.AtlasBuiltInTypes; import org.apache.commons.lang.StringUtils; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -43,6 +44,8 @@ class GraphSONUtility { private static final String EMPTY_STRING = ""; private final RelationshipTypeCache relationshipTypeCache; + private static AtlasBuiltInTypes.AtlasBigIntegerType bigIntegerType = new AtlasBuiltInTypes.AtlasBigIntegerType(); + private static AtlasBuiltInTypes.AtlasBigDecimalType bigDecimalType = new AtlasBuiltInTypes.AtlasBigDecimalType(); public GraphSONUtility(final RelationshipTypeCache relationshipTypeCache) { this.relationshipTypeCache = relationshipTypeCache; @@ -187,7 +190,9 @@ class GraphSONUtility { if (StringUtils.isNotEmpty(typeName)) { props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName); } else { - LOG.debug("Could not find relationship type for: {}", edgeLabel); + if (LOG.isDebugEnabled()) { + LOG.debug("Could not find relationship type for: {}", edgeLabel); + } } } @@ -223,6 +228,7 @@ class GraphSONUtility { } } + @VisibleForTesting static Map<String, Object> readProperties(final JsonNode node) { final Map<String, Object> map = new HashMap<>(); final Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); @@ -267,7 +273,13 @@ class GraphSONUtility { } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_INTEGER)) { propertyValue = node.get(GraphSONTokensTP2.VALUE).intValue(); } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_LONG)) { - propertyValue = node.get(GraphSONTokensTP2.VALUE).longValue(); + propertyValue = node.get(GraphSONTokensTP2.VALUE).asLong(); + } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_BIG_DECIMAL)) { + propertyValue = bigDecimalType.getNormalizedValue(node.get(GraphSONTokensTP2.VALUE)); + } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_BIG_INTEGER)) { + propertyValue = bigIntegerType.getNormalizedValue(node.get(GraphSONTokensTP2.VALUE)); + } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_DATE)) { + propertyValue = new Date(node.get(GraphSONTokensTP2.VALUE).asLong()); } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_STRING)) { propertyValue = node.get(GraphSONTokensTP2.VALUE).textValue(); } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_LIST)) { @@ -308,6 +320,10 @@ class GraphSONUtility { theValue = node.longValue(); } else if (node.isTextual()) { theValue = node.textValue(); + } else if (node.isBigDecimal()) { + theValue = node.decimalValue(); + } else if (node.isBigInteger()) { + theValue = node.bigIntegerValue(); } else if (node.isArray()) { // this is an array so just send it back so that it can be // reprocessed to its primitive components http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java index acf8cb2..cd65460 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsers.java @@ -73,42 +73,10 @@ public class JsonNodeParsers { return el; } - static Object getTypedValueFromJsonNode(final JsonNode node) { - Object theValue = null; - - if (node != null && !node.isNull()) { - if (node.isBoolean()) { - theValue = node.booleanValue(); - } else if (node.isDouble()) { - theValue = node.doubleValue(); - } else if (node.isFloatingPointNumber()) { - theValue = node.floatValue(); - } else if (node.isInt()) { - theValue = node.intValue(); - } else if (node.isLong()) { - theValue = node.longValue(); - } else if (node.isTextual()) { - theValue = node.textValue(); - } else if (node.isArray()) { - // this is an array so just send it back so that it can be - // reprocessed to its primitive components - theValue = node; - } else if (node.isObject()) { - // this is an object so just send it back so that it can be - // reprocessed to its primitive components - theValue = node; - } else { - theValue = node.textValue(); - } - } - - return theValue; - } } static class ParseEdge extends ParseElement { private static final String MESSAGE_EDGE = "edge"; - private static final String TYPE_NAME_NODE_NAME = Constants.VERTEX_TYPE_PROPERTY_KEY; @Override @@ -118,7 +86,7 @@ public class JsonNodeParsers { @Override Object getId(JsonNode node) { - return getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID)); + return utility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID)); } @Override @@ -163,7 +131,7 @@ public class JsonNodeParsers { @Override Object getId(JsonNode node) { - return getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID)); + return utility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID)); } @Override http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java index c1e9d20..6d24619 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java @@ -108,7 +108,7 @@ public class JsonNodeProcessManager { display("commit-size: {}: Done!", size); } - private void updateSchema(Map<String, Object> schema, org.apache.tinkerpop.shaded.jackson.databind.JsonNode node) { + private void updateSchema(Map<String, Object> schema, JsonNode node) { synchronized (graph) { String typeName = parseElement.getType(node); @@ -142,7 +142,7 @@ public class JsonNodeProcessManager { try { Thread.sleep(WAIT_DURATION_AFTER_COMMIT_EXCEPTION); - for (org.apache.tinkerpop.shaded.jackson.databind.JsonNode n : nodes) { + for (JsonNode n : nodes) { parseElement.parse(bulkLoadGraph, cache, n); } commitBulk(); http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java index fe9e327..cca72ad 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCache.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.graphdb.janus.migration; +import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.utils.LruCache; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -33,8 +34,11 @@ import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY; public class MappedElementCache { private static final Logger LOG = LoggerFactory.getLogger(MappedElementCache.class); - private final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000); - private final Map<String, String> lruEdgeCache = new LruCache<>(500, 100000); + @VisibleForTesting + final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000); + + @VisibleForTesting + final Map<String, String> lruEdgeCache = new LruCache<>(500, 100000); public Vertex getMappedVertex(Graph gr, Object key) { try { @@ -83,7 +87,8 @@ public class MappedElementCache { } } - private Vertex fetchVertex(Graph gr, Object key) { + @VisibleForTesting + Vertex fetchVertex(Graph gr, Object key) { try { return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, key).next(); } catch (Exception ex) { @@ -92,7 +97,8 @@ public class MappedElementCache { } } - private Edge fetchEdge(Graph gr, String key) { + @VisibleForTesting + Edge fetchEdge(Graph gr, String key) { try { return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, key).next(); } catch (Exception ex) { @@ -101,16 +107,8 @@ public class MappedElementCache { } } - public void clearVertexCache() { + public void clearAll() { lruVertexCache.clear(); - } - - public void clearEdgeCache() { lruEdgeCache.clear(); } - - public void clearAll() { - clearVertexCache(); - clearEdgeCache(); - } } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java index 4c5e357..d0a65f7 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/PostProcessManager.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; public class PostProcessManager { - private static class Consumer extends WorkItemConsumer<Object> { + static class Consumer extends WorkItemConsumer<Object> { private static final Logger LOG = LoggerFactory.getLogger(Consumer.class); private final Graph bulkLoadGraph; http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java index 67dbf22..11732c4 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManager.java @@ -18,6 +18,8 @@ package org.apache.atlas.repository.graphdb.janus.migration; +import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.model.impexp.MigrationStatus; import org.apache.atlas.repository.Constants; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; @@ -43,7 +45,8 @@ public class ReaderStatusManager { public static final String STATUS_SUCCESS = "SUCCESS"; public static final String STATUS_FAILED = "FAILED"; - private Object migrationStatusId = null; + @VisibleForTesting + Object migrationStatusId = null; private Vertex migrationStatus = null; public ReaderStatusManager(Graph graph, Graph bulkLoadGraph) { @@ -71,7 +74,10 @@ public class ReaderStatusManager { public void update(Graph graph, Long counter) { migrationStatus.property(CURRENT_INDEX_PROPERTY, counter); - graph.tx().commit(); + + if(graph.features().graph().supportsTransactions()) { + graph.tx().commit(); + } } public void update(Graph graph, Long counter, String status) { @@ -91,7 +97,7 @@ public class ReaderStatusManager { return g.V(migrationStatusId).next(); } - private Vertex fetchUsingTypeName(GraphTraversalSource g) { + private static Vertex fetchUsingTypeName(GraphTraversalSource g) { GraphTraversal src = g.V().has(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME); return src.hasNext() ? (Vertex) src.next() : null; } @@ -109,8 +115,42 @@ public class ReaderStatusManager { migrationStatusId = v.id(); - rGraph.tx().commit(); + if(rGraph.features().graph().supportsTransactions()) { + rGraph.tx().commit(); + } LOG.info("migrationStatus vertex created! v[{}]", migrationStatusId); } + + public static MigrationStatus updateFromVertex(Graph graph, MigrationStatus ms) { + Vertex vertex = fetchUsingTypeName(graph.traversal()); + + if(ms == null) { + ms = new MigrationStatus(); + } + + ms.setStartTime((Date) vertex.property(START_TIME_PROPERTY).value()); + ms.setEndTime((Date) vertex.property(END_TIME_PROPERTY).value()); + ms.setCurrentIndex((Long) vertex.property(CURRENT_INDEX_PROPERTY).value()); + ms.setOperationStatus((String) vertex.property(OPERATION_STATUS_PROPERTY).value()); + ms.setTotalCount((Long) vertex.property(TOTAL_COUNT_PROPERTY).value()); + + return ms; + } + + public static MigrationStatus get(Graph graph) { + MigrationStatus ms = new MigrationStatus(); + try { + Vertex v = fetchUsingTypeName(graph.traversal()); + ms.setStartTime((Date) v.property(START_TIME_PROPERTY).value()); + ms.setEndTime((Date) v.property(END_TIME_PROPERTY).value()); + ms.setCurrentIndex((long) v.property(CURRENT_INDEX_PROPERTY).value()); + ms.setOperationStatus((String) v.property(OPERATION_STATUS_PROPERTY).value()); + ms.setTotalCount((long) v.property(TOTAL_COUNT_PROPERTY).value()); + } catch (Exception ex) { + LOG.error("get: failed!", ex); + } + + return ms; + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java index 48b3595..e4e8264 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/RelationshipTypeCache.java @@ -18,14 +18,9 @@ package org.apache.atlas.repository.graphdb.janus.migration; -import org.apache.commons.lang.StringUtils; -import org.apache.tinkerpop.gremlin.structure.Vertex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import java.util.Map; public class RelationshipTypeCache { http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java index 5de8db9..39b50de 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemConsumer.java @@ -86,7 +86,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { protected abstract void processItem(T item); - private void updateCommitTime(long commitTime) { + protected void updateCommitTime(long commitTime) { if (this.maxCommitTimeSeconds < commitTime) { this.maxCommitTimeSeconds = commitTime; } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java index 7e21495..d50ada4 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/pc/WorkItemManager.java @@ -55,7 +55,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { public void shutdown() throws InterruptedException { int avgCommitTimeSeconds = getAvgCommitTimeSeconds() * 2; - LOG.info("WorkItemManager: Shutdown started. Will wait for: {} seconds...", avgCommitTimeSeconds); + LOG.info("WorkItemManager: Shutdown started. Will wait for: {} minutes...", avgCommitTimeSeconds); service.shutdown(); service.awaitTermination(avgCommitTimeSeconds, TimeUnit.MINUTES); http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/BaseUtils.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/BaseUtils.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/BaseUtils.java new file mode 100644 index 0000000..de43e0f --- /dev/null +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/BaseUtils.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.commons.io.FileUtils; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; +import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; +import org.testng.ITestContext; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashMap; + +import static org.testng.AssertJUnit.assertTrue; + +public class BaseUtils { + private static final String resourcesDirRelativePath = "/src/test/resources/"; + private String resourceDir; + + protected final RelationshipTypeCache emptyRelationshipCache = new RelationshipTypeCache(new HashMap<>()); + protected GraphSONUtility graphSONUtility; + + protected Object[][] getJsonNodeFromFile(String s) throws IOException { + File f = new File(getFilePath(s)); + return new Object[][]{{getEntityNode(FileUtils.readFileToString(f))}}; + } + + protected String getFilePath(String fileName) { + return Paths.get(resourceDir, fileName).toString(); + } + + @BeforeClass + public void setup() { + resourceDir = System.getProperty("user.dir") + resourcesDirRelativePath; + graphSONUtility = new GraphSONUtility(emptyRelationshipCache); + } + + protected Object getId(JsonNode node) { + GraphSONUtility gu = graphSONUtility; + return gu.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID)); + } + + + private JsonNode getEntityNode(String json) throws IOException { + GraphSONMapper.Builder builder = GraphSONMapper.build(); + final ObjectMapper mapper = builder.embedTypes(false).create().createMapper(); + return mapper.readTree(json); + } + + protected void addVertex(TinkerGraph tg, JsonNode node) { + GraphSONUtility utility = new GraphSONUtility(emptyRelationshipCache); + utility.vertexFromJson(tg, node); + } + + protected void addEdge(TinkerGraph tg, MappedElementCache cache) throws IOException { + GraphSONUtility gu = graphSONUtility; + + gu.vertexFromJson(tg, (JsonNode) (getDBV(null)[0][0])); + gu.vertexFromJson(tg, (JsonNode) (getTableV(null))[0][0]); + gu.edgeFromJson(tg, cache, (JsonNode) getEdge(null)[0][0]); + } + + protected Vertex fetchTableVertex(TinkerGraph tg) { + GraphTraversal query = tg.traversal().V().has("__typeName", "hive_table"); + assertTrue(query.hasNext()); + + return (Vertex) query.next(); + } + + @DataProvider(name = "col1") + public Object[][] getCol1(ITestContext context) throws IOException { + return getJsonNodeFromFile("col-legacy.json"); + } + + @DataProvider(name = "dbType") + public Object[][] getDbType(ITestContext context) throws IOException { + return getJsonNodeFromFile("db-type-legacy.json"); + } + + @DataProvider(name = "edge") + public Object[][] getEdge(ITestContext context) throws IOException { + return getJsonNodeFromFile("edge-legacy.json"); + } + + @DataProvider(name = "dbV") + public Object[][] getDBV(ITestContext context) throws IOException { + return getJsonNodeFromFile("db-v-65544.json"); + } + + + @DataProvider(name = "tableV") + public Object[][] getTableV(ITestContext context) throws IOException { + return getJsonNodeFromFile("table-v-147504.json"); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityPostProcessTest.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityPostProcessTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityPostProcessTest.java new file mode 100644 index 0000000..4d73c78 --- /dev/null +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityPostProcessTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.AssertJUnit.assertTrue; + +public class GraphSONUtilityPostProcessTest extends BaseUtils { + final String HIVE_COLUMNS_PROPERTY = "hive_table.columns"; + final String edgeId1 = "816u-35tc-ao0l-47so"; + final String edgeId2 = "82rq-35tc-ao0l-2glc"; + + final String edgeId1x = "816u-35tc-ao0l-xxxx"; + final String edgeId2x = "82rq-35tc-ao0l-xxxx"; + + private TinkerGraph tg; + private MappedElementCache cache = new MappedElementCache(); + private Vertex tableV; + + @Test + public void noRefNoUpdate() throws IOException { + tg = TinkerGraph.open(); + graphSONUtility = new GraphSONUtility(emptyRelationshipCache); + + addEdge(tg, cache); + + tableV = fetchTableVertex(tg); + assertNotNull(tableV); + + assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV); + + graphSONUtility.replaceReferencedEdgeIdForList(tg, cache, tableV, HIVE_COLUMNS_PROPERTY); + assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV); + } + + @Test(dependsOnMethods = "noRefNoUpdate") + public void refFoundVertexUpdated() throws IOException { + + cache.lruEdgeCache.put(edgeId1, edgeId1x); + cache.lruEdgeCache.put(edgeId2, edgeId2x); + + graphSONUtility.replaceReferencedEdgeIdForList(tg, cache, tableV, HIVE_COLUMNS_PROPERTY); + assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1x, edgeId2x, tableV); + } + + @Test(dependsOnMethods = "refFoundVertexUpdated") + public void updateUsingPostProcessConsumer() throws IOException { + MappedElementCache cache = new MappedElementCache(); + BlockingQueue<Object> bc = new BlockingArrayQueue<>(); + PostProcessManager.Consumer consumer = new PostProcessManager.Consumer(bc, tg, graphSONUtility, + new String[] {HIVE_COLUMNS_PROPERTY}, cache, 5); + + cache.lruEdgeCache.put(edgeId1x, edgeId1); + cache.lruEdgeCache.put(edgeId2x, edgeId2); + consumer.processItem(tableV.id()); + + assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV); + } + + private void assertListProperty(String HIVE_COLUMNS_PROPERTY, String edgeId1, String edgeId2, Vertex tableV) { + assertTrue(tableV.property(HIVE_COLUMNS_PROPERTY).isPresent()); + List list = (List) tableV.property(HIVE_COLUMNS_PROPERTY).value(); + + assertEquals(list.size(), 2); + assertEquals(list.get(0), edgeId1); + assertEquals(list.get(1), edgeId2); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityTest.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityTest.java new file mode 100644 index 0000000..794b547 --- /dev/null +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/GraphSONUtilityTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; +import org.testng.Assert; +import org.testng.ITestContext; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY; +import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY; +import static org.testng.Assert.*; + +public class GraphSONUtilityTest extends BaseUtils { + + @Test(dataProvider = "col1") + public void idFetch(JsonNode node) { + Object o = GraphSONUtility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID)); + + assertNotNull(o); + assertEquals((int) o, 98336); + } + + @Test(dataProvider = "col1") + public void verifyReadProperties(JsonNode node) { + Map<String, Object> props = GraphSONUtility.readProperties(node); + + assertEquals(props.get("__superTypeNames").getClass(), ArrayList.class); + assertEquals(props.get("Asset.name").getClass(), String.class); + assertEquals(props.get("hive_column.position").getClass(), Integer.class); + assertEquals(props.get("__timestamp").getClass(), Long.class); + + assertNotNull(props); + } + + @Test(dataProvider = "col1") + public void dataNodeReadAndVertexAddedToGraph(JsonNode entityNode) throws IOException { + TinkerGraph tg = TinkerGraph.open(); + GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache); + Map<String, Object> map = gu.vertexFromJson(tg, entityNode); + + assertNull(map); + assertEquals((long) tg.traversal().V().count().next(), 1L); + + Vertex v = tg.vertices().next(); + assertTrue(v.property(VERTEX_ID_IN_IMPORT_KEY).isPresent()); + } + + @Test(dataProvider = "dbType") + public void typeNodeReadAndVertexNotAddedToGraph(JsonNode entityNode) throws IOException { + TinkerGraph tg = TinkerGraph.open(); + GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache); + gu.vertexFromJson(tg, entityNode); + + Assert.assertEquals((long) tg.traversal().V().count().next(), 0L); + } + + @Test + public void edgeReadAndAddedToGraph() throws IOException { + TinkerGraph tg = TinkerGraph.open(); + GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache); + Map<String, Object> m = null; + + m = gu.vertexFromJson(tg, (JsonNode) (getDBV(null)[0][0])); + assertNull(m); + + m = gu.vertexFromJson(tg, (JsonNode) (getTableV(null))[0][0]); + assertNull(m); + + m = gu.edgeFromJson(tg, new MappedElementCache(), (JsonNode) getEdge(null)[0][0]); + assertNull(m); + + Assert.assertEquals((long) tg.traversal().V().count().next(), 2L); + Assert.assertEquals((long) tg.traversal().E().count().next(), 1L); + + Edge e = tg.edges().next(); + assertTrue(e.property(EDGE_ID_IN_IMPORT_KEY).isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsersTest.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsersTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsersTest.java new file mode 100644 index 0000000..170b1af --- /dev/null +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeParsersTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; + +public class JsonNodeParsersTest extends BaseUtils { + + @Test(dataProvider = "col1") + public void parseVertex(JsonNode nd) { + final int COL1_ORIGINAL_ID = 98336; + + Object nodeId = getId(nd); + TinkerGraph tg = TinkerGraph.open(); + + JsonNodeParsers.ParseElement pe = new JsonNodeParsers.ParseVertex(); + pe.setContext(graphSONUtility); + pe.parse(tg, new MappedElementCache(), nd); + + Vertex v = tg.vertices().next(); + Vertex vUsingPe = (Vertex) pe.get(tg, nodeId); + Vertex vUsingOriginalId = (Vertex) pe.getByOriginalId(tg, COL1_ORIGINAL_ID); + Vertex vUsingOriginalId2 = (Vertex) pe.getByOriginalId(tg, nd); + + updateParseElement(tg, pe, vUsingPe); + + assertNotNull(v); + assertNotNull(vUsingPe); + assertNotNull(vUsingOriginalId); + + assertEquals(v.id(), vUsingPe.id()); + assertEquals(nodeId, pe.getId(nd)); + assertFalse(pe.isTypeNode(nd)); + assertEquals(pe.getType(nd), "\"hive_column\""); + assertEquals(vUsingOriginalId.id(), v.id()); + assertEquals(vUsingOriginalId2.id(), v.id()); + + assertProperties(vUsingPe); + } + + @Test(dataProvider = "edge") + public void parseEdge(JsonNode nd) throws IOException { + final String EDGE_ORIGINAL_ID = "8k5i-35tc-acyd-1eko"; + Object nodeId = getId(nd); + + TinkerGraph tg = TinkerGraph.open(); + MappedElementCache cache = new MappedElementCache(); + JsonNodeParsers.ParseElement peVertex = new JsonNodeParsers.ParseVertex(); + peVertex.setContext(graphSONUtility); + + peVertex.parse(tg, cache, (JsonNode) (getDBV(null)[0][0])); + peVertex.parse(tg, cache, (JsonNode) (getTableV(null)[0][0])); + + JsonNodeParsers.ParseElement pe = new JsonNodeParsers.ParseEdge(); + pe.setContext(graphSONUtility); + pe.parse(tg, cache, (JsonNode) getEdge(null)[0][0]); + + updateParseElement(tg, pe, nodeId); + + Edge e = tg.edges().next(); + Edge eUsingPe = (Edge) pe.get(tg, nodeId); + Edge eUsingOriginalId = (Edge) pe.getByOriginalId(tg, EDGE_ORIGINAL_ID); + Edge eUsingOriginalId2 = (Edge) pe.getByOriginalId(tg, nd); + + assertNotNull(e); + assertNotNull(eUsingPe); + assertNotNull(eUsingOriginalId); + + assertEquals(e.id(), eUsingPe.id()); + assertEquals(nodeId, pe.getId(nd)); + assertFalse(pe.isTypeNode(nd)); + assertEquals(eUsingOriginalId.id(), e.id()); + assertEquals(eUsingOriginalId2.id(), e.id()); + + assertProperties(e); + } + + private void updateParseElement(TinkerGraph tg, JsonNodeParsers.ParseElement pe, Object nodeId) { + Map<String, Object> props = new HashMap<>(); + props.put("k1", "v1"); + props.put("k2", "v2"); + pe.update(tg, nodeId, props); + } + + private void assertProperties(Element v) { + assertNotNull(v); + assertTrue(v.property("k1").isPresent()); + assertTrue(v.property("k2").isPresent()); + + assertEquals(v.property("k1").value(), "v1"); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCacheTest.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCacheTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCacheTest.java new file mode 100644 index 0000000..cac09d2 --- /dev/null +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/MappedElementCacheTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; +import org.testng.ITestContext; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.testng.Assert.*; + +public class MappedElementCacheTest extends BaseUtils { + + @Test(dataProvider = "col1") + public void vertexFetch(JsonNode node) { + MappedElementCache cache = new MappedElementCache(); + TinkerGraph tg = TinkerGraph.open(); + + addVertex(tg, node); + + Vertex vx = cache.getMappedVertex(tg, 98336); + assertNotNull(vx); + assertEquals(cache.lruVertexCache.size(), 1); + assertEquals(cache.lruEdgeCache.size(), 0); + } + + @Test + public void edgeFetch() throws IOException { + MappedElementCache cache = new MappedElementCache(); + TinkerGraph tg = TinkerGraph.open(); + + addEdge(tg, cache); + + assertEquals(cache.lruVertexCache.size(), 2); + assertEquals(cache.lruEdgeCache.size(), 0); + } + + + @Test + public void nonExistentVertexReturnsNull() { + TinkerGraph tg = TinkerGraph.open(); + MappedElementCache cache = new MappedElementCache(); + + assertNull(cache.fetchVertex(tg, 1111)); + assertNull(cache.fetchEdge(tg, "abcd")); + } + + @DataProvider(name = "col1") + public Object[][] getCol1(ITestContext context) throws IOException { + return getJsonNodeFromFile("col-legacy.json"); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManagerTest.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManagerTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManagerTest.java new file mode 100644 index 0000000..8c3d157 --- /dev/null +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/ReaderStatusManagerTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.atlas.model.impexp.MigrationStatus; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class ReaderStatusManagerTest { + @Test + public void createsNewStatusNode() { + TinkerGraph tg = TinkerGraph.open(); + ReaderStatusManager sm = new ReaderStatusManager(tg, tg); + assertEquals(sm.getStartIndex(), 0L); + + assertNotNull(tg.traversal().V(sm.migrationStatusId).next()); + + MigrationStatus ms = ReaderStatusManager.updateFromVertex(tg, null); + assertEquals(ms.getCurrentIndex(), 0L); + assertEquals(ms.getTotalCount(), 0L); + assertEquals(ms.getOperationStatus(), "NOT STARTED"); + assertNotNull(ms.getStartTime()); + assertNotNull(ms.getEndTime()); + } + + @Test + public void verifyUpdates() { + long expectedTotalCount = 1001L; + String expectedOperationStatus = "SUCCESS"; + + TinkerGraph tg = TinkerGraph.open(); + ReaderStatusManager sm = new ReaderStatusManager(tg, tg); + + sm.update(tg, 1000L, "IN PROGRESS"); + sm.end(tg, expectedTotalCount, expectedOperationStatus); + + MigrationStatus ms = ReaderStatusManager.updateFromVertex(tg, null); + assertEquals(ms.getCurrentIndex(), expectedTotalCount); + assertEquals(ms.getTotalCount(), expectedTotalCount); + assertEquals(ms.getOperationStatus(), expectedOperationStatus); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemConsumerTest.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemConsumerTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemConsumerTest.java new file mode 100644 index 0000000..4cd4c2c --- /dev/null +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemConsumerTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer; +import org.testng.annotations.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class WorkItemConsumerTest { + + private class IntegerConsumerSpy extends WorkItemConsumer<Integer> { + boolean commitDirtyCalled = false; + private boolean updateCommitTimeCalled; + + public IntegerConsumerSpy(BlockingQueue<Integer> queue) { + super(queue); + } + + @Override + protected void doCommit() { + + } + + @Override + protected void processItem(Integer item) { + + } + + @Override + protected void commitDirty() { + commitDirtyCalled = true; + super.commitDirty(); + } + + @Override + protected void updateCommitTime(long commitTime) { + updateCommitTimeCalled = true; + } + + public boolean isCommitDirtyCalled() { + return commitDirtyCalled; + } + + public boolean isUpdateCommitTimeCalled() { + return updateCommitTimeCalled; + } + } + + + @Test + public void callingRunOnEmptyQueueCallsDoesNotCallCommitDirty() { + BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5); + + IntegerConsumerSpy ic = new IntegerConsumerSpy(bc); + ic.run(); + + assertTrue(bc.isEmpty()); + assertTrue(ic.isCommitDirtyCalled()); + assertFalse(ic.isUpdateCommitTimeCalled()); + } + + + @Test + public void runOnQueueRemovesItemFromQueuCallsCommitDirty() { + BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5); + bc.add(1); + + IntegerConsumerSpy ic = new IntegerConsumerSpy(bc); + ic.run(); + + assertTrue(bc.isEmpty()); + assertTrue(ic.isCommitDirtyCalled()); + assertTrue(ic.isUpdateCommitTimeCalled()); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemManagerTest.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemManagerTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemManagerTest.java new file mode 100644 index 0000000..25488bd --- /dev/null +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/migration/WorkItemManagerTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graphdb.janus.migration; + +import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder; +import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer; +import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager; +import org.apache.commons.lang3.RandomUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.SkipException; +import org.testng.annotations.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.testng.Assert.assertEquals; + +public class WorkItemManagerTest { + private static final Logger LOG = LoggerFactory.getLogger(WorkItemManagerTest.class); + + private class IntegerConsumer extends WorkItemConsumer<Integer> { + + private final ConcurrentLinkedQueue<Integer> target; + + public IntegerConsumer(BlockingQueue<Integer> queue, ConcurrentLinkedQueue<Integer> target) { + super(queue); + this.target = target; + } + + @Override + protected void doCommit() { + try { + Thread.sleep(20 * RandomUtils.nextInt(10, 15)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + protected void processItem(Integer item) { + LOG.info("adding: {}: size: {}", item, target.size()); + target.add(item); + } + } + + private class IntegerConsumerBuilder implements WorkItemBuilder<IntegerConsumer, Integer> { + ConcurrentLinkedQueue<Integer> integers = new ConcurrentLinkedQueue<>(); + + @Override + public IntegerConsumer build(BlockingQueue<Integer> queue) { + return new IntegerConsumer(queue, integers); + } + } + + @Test + public void oneWorkerSequences() { + IntegerConsumerBuilder cb = new IntegerConsumerBuilder(); + int numberOfItems = 10; + try { + WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 1); + for (int i = 0; i < numberOfItems; i++) { + wi.produce(i); + } + + wi.shutdown(); + } catch (InterruptedException e) { + throw new SkipException("Test skipped!"); + } + + assertEquals(cb.integers.size(), numberOfItems); + Integer[] ints = cb.integers.toArray(new Integer[]{}); + for (int i = 0; i < numberOfItems; i++) { + assertEquals(ints[i], i, i); + } + } + + + @Test + public void multipleWorkersUnpredictableSequence() { + IntegerConsumerBuilder cb = new IntegerConsumerBuilder(); + int numberOfItems = 100; + try { + WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 5); + for (int i = 0; i < numberOfItems; i++) { + wi.produce(i); + } + + wi.shutdown(); + } catch (InterruptedException e) { + throw new SkipException("Test skipped!"); + } + + assertEquals(cb.integers.size(), numberOfItems); + } + + private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) { + return new WorkItemManager<>(cb, 5, numWorkers); + } +}
