http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/resources/col-legacy.json ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/resources/col-legacy.json b/graphdb/janus/src/test/resources/col-legacy.json new file mode 100644 index 0000000..75f6b38 --- /dev/null +++ b/graphdb/janus/src/test/resources/col-legacy.json @@ -0,0 +1,73 @@ +{ + "Asset.name": { + "type": "string", + "value": "col4" + }, + "hive_column.type": { + "type": "string", + "value": "string" + }, + "__modifiedBy": { + "type": "string", + "value": "anonymous" + }, + "__state": { + "type": "string", + "value": "ACTIVE" + }, + "entityText": { + "type": "string", + "value": "hive_column owner anonymous qualifiedName stocks.test_table.col4@cl1 name col4 position 0 type string table " + }, + "Referenceable.qualifiedName": { + "type": "string", + "value": "stocks.test_table.col4@cl1" + }, + "__guid": { + "type": "string", + "value": "0693682a-30ae-4fec-a533-179e572792ce" + }, + "__version": { + "type": "integer", + "value": 0 + }, + "__superTypeNames": { + "type": "list", + "value": [{ + "type": "string", + "value": "Asset" + }, { + "type": "string", + "value": "DataSet" + }, { + "type": "string", + "value": "Referenceable" + }] + }, + "__createdBy": { + "type": "string", + "value": "anonymous" + }, + "__typeName": { + "type": "string", + "value": "hive_column" + }, + "__modificationTimestamp": { + "type": "long", + "value": 1522693838471 + }, + "Asset.owner": { + "type": "string", + "value": "anonymous" + }, + "hive_column.position": { + "type": "integer", + "value": 0 + }, + "__timestamp": { + "type": "long", + "value": 1522693826849 + }, + "_id": 98336, + "_type": "vertex" +}
http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/resources/db-type-legacy.json ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/resources/db-type-legacy.json b/graphdb/janus/src/test/resources/db-type-legacy.json new file mode 100644 index 0000000..ed62171 --- /dev/null +++ b/graphdb/janus/src/test/resources/db-type-legacy.json @@ -0,0 +1,84 @@ +{ + "__type.name": { + "type": "string", + "value": "hive_db" + }, + "__type.hive_db.parameters": { + "type": "string", + "value": "{\"multiplicity\":\"{\\\"lower\\\":0,\\\"upper\\\":1,\\\"isUnique\\\":false}\",\"isIndexable\":false,\"isComposite\":false,\"reverseAttributeName\":null,\"dataType\":\"map<string,string>\",\"name\":\"parameters\",\"isUnique\":false}" + }, + "__modifiedBy": { + "type": "string", + "value": "root" + }, + "__type.hive_db": { + "type": "list", + "value": [{ + "type": "string", + "value": "clusterName" + }, { + "type": "string", + "value": "location" + }, { + "type": "string", + "value": "parameters" + }, { + "type": "string", + "value": "ownerType" + }] + }, + "__type.options": { + "type": "string", + "value": "null" + }, + "__guid": { + "type": "string", + "value": "b2685ea8-16c5-4d54-88f2-41b1d66bd1fb" + }, + "__version": { + "type": "long", + "value": 1 + }, + "__type.hive_db.clusterName": { + "type": "string", + "value": "{\"multiplicity\":\"{\\\"lower\\\":1,\\\"upper\\\":1,\\\"isUnique\\\":false}\",\"isIndexable\":true,\"isComposite\":false,\"reverseAttributeName\":null,\"dataType\":\"string\",\"name\":\"clusterName\",\"isUnique\":false}" + }, + "__type.category": { + "type": "string", + "value": "CLASS" + }, + "__type.version": { + "type": "string", + "value": "1.0" + }, + "__type.hive_db.location": { + "type": "string", + "value": "{\"multiplicity\":\"{\\\"lower\\\":0,\\\"upper\\\":1,\\\"isUnique\\\":false}\",\"isIndexable\":false,\"isComposite\":false,\"reverseAttributeName\":null,\"dataType\":\"string\",\"name\":\"location\",\"isUnique\":false}" + }, + "__createdBy": { + "type": "string", + "value": "root" + }, + "__modificationTimestamp": { + "type": "long", + "value": 1522693758158 + }, + "__type": { + "type": "string", + "value": "typeSystem" + }, + "__type.description": { + "type": "string", + "value": "hive_db" + }, + "__timestamp": { + "type": "long", + "value": 1522693758158 + }, + "__type.hive_db.ownerType": { + "type": "string", + "value": "{\"multiplicity\":\"{\\\"lower\\\":0,\\\"upper\\\":1,\\\"isUnique\\\":false}\",\"isIndexable\":false,\"isComposite\":false,\"reverseAttributeName\":null,\"dataType\":\"hive_principal_type\",\"name\":\"ownerType\",\"isUnique\":false}" + }, + "_id": 16392, + "_type": "vertex" +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/resources/db-v-65544.json ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/resources/db-v-65544.json b/graphdb/janus/src/test/resources/db-v-65544.json new file mode 100644 index 0000000..dc88e10 --- /dev/null +++ b/graphdb/janus/src/test/resources/db-v-65544.json @@ -0,0 +1,78 @@ +{ + "hive_db.parameters": { + "type": "list", + "value": [] + }, + "Asset.name": { + "type": "string", + "value": "stocks" + }, + "hive_db.ownerType": { + "type": "string", + "value": "USER" + }, + "__modifiedBy": { + "type": "string", + "value": "anonymous" + }, + "__state": { + "type": "string", + "value": "ACTIVE" + }, + "entityText": { + "type": "string", + "value": "hive_db owner anonymous ownerType USER qualifiedName stocks@cl1 clusterName cl1 name stocks location hdfs://localhost.localdomain:8020/apps/hive/warehouse/stocks.db " + }, + "Referenceable.qualifiedName": { + "type": "string", + "value": "stocks@cl1" + }, + "__guid": { + "type": "string", + "value": "229b7fd4-e46e-4338-9e44-18ce630eb5bf" + }, + "__version": { + "type": "integer", + "value": 0 + }, + "__superTypeNames": { + "type": "list", + "value": [{ + "type": "string", + "value": "Asset" + }, { + "type": "string", + "value": "Referenceable" + }] + }, + "__createdBy": { + "type": "string", + "value": "anonymous" + }, + "__typeName": { + "type": "string", + "value": "hive_db" + }, + "__modificationTimestamp": { + "type": "long", + "value": 1522693838471 + }, + "hive_db.clusterName": { + "type": "string", + "value": "cl1" + }, + "Asset.owner": { + "type": "string", + "value": "anonymous" + }, + "hive_db.location": { + "type": "string", + "value": "hdfs://localhost.localdomain:8020/apps/hive/warehouse/stocks.db" + }, + "__timestamp": { + "type": "long", + "value": 1522693806944 + }, + "_id": 65544, + "_type": "vertex" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/resources/edge-legacy.json ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/resources/edge-legacy.json b/graphdb/janus/src/test/resources/edge-legacy.json new file mode 100644 index 0000000..c370f55 --- /dev/null +++ b/graphdb/janus/src/test/resources/edge-legacy.json @@ -0,0 +1,27 @@ +{ + "__modifiedBy": { + "type": "string", + "value": "anonymous" + }, + "__state": { + "type": "string", + "value": "ACTIVE" + }, + "__createdBy": { + "type": "string", + "value": "anonymous" + }, + "__modificationTimestamp": { + "type": "long", + "value": 1522693835017 + }, + "__timestamp": { + "type": "long", + "value": 1522693835017 + }, + "_id": "8k5i-35tc-acyd-1eko", + "_type": "edge", + "_outV": 147504, + "_inV": 65544, + "_label": "__hive_table.db" +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/janus/src/test/resources/table-v-147504.json ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/test/resources/table-v-147504.json b/graphdb/janus/src/test/resources/table-v-147504.json new file mode 100644 index 0000000..898dce5 --- /dev/null +++ b/graphdb/janus/src/test/resources/table-v-147504.json @@ -0,0 +1,121 @@ +{ + "hive_table.createTime": { + "type": "long", + "value": 1522693834000 + }, + "hive_table.tableType": { + "type": "string", + "value": "VIRTUAL_VIEW" + }, + "Asset.name": { + "type": "string", + "value": "test_table_view" + }, + "__modifiedBy": { + "type": "string", + "value": "anonymous" + }, + "__state": { + "type": "string", + "value": "ACTIVE" + }, + "entityText": { + "type": "string", + "value": "hive_table owner anonymous temporary false lastAccessTime Mon Apr 02 11:30:34 PDT 2018 qualifiedName stocks.test_table_view@cl1 columns viewExpandedText select `test_table`.`col1`, `test_table`.`col2`, `test_table`.`col4` from `stocks`.`test_table` sd tableType VIRTUAL_VIEW createTime Mon Apr 02 11:30:34 PDT 2018 name test_table_view partitionKeys parameters transient_lastDdlTime 1522693834 db retention 0 viewOriginalText select col1, col2, col4 from test_table " + }, + "Referenceable.qualifiedName": { + "type": "string", + "value": "stocks.test_table_view@cl1" + }, + "hive_table.parameters.transient_lastDdlTime": { + "type": "string", + "value": "1522693834" + }, + "hive_table.parameters": { + "type": "list", + "value": [{ + "type": "string", + "value": "transient_lastDdlTime" + }] + }, + "hive_table.retention": { + "type": "integer", + "value": 0 + }, + "hive_table.partitionKeys": { + "type": "list", + "value": [{ + "type": "string", + "value": "8dty-35tc-amfp-23xs" + }] + }, + "__guid": { + "type": "string", + "value": "111091f1-2661-4946-b09b-64e28f10c109" + }, + "hive_table.temporary": { + "type": "boolean", + "value": false + }, + "__version": { + "type": "integer", + "value": 0 + }, + "__superTypeNames": { + "type": "list", + "value": [{ + "type": "string", + "value": "Asset" + }, { + "type": "string", + "value": "DataSet" + }, { + "type": "string", + "value": "Referenceable" + }] + }, + "hive_table.viewExpandedText": { + "type": "string", + "value": "select `test_table`.`col1`, `test_table`.`col2`, `test_table`.`col4` from `stocks`.`test_table`" + }, + "__createdBy": { + "type": "string", + "value": "anonymous" + }, + "__typeName": { + "type": "string", + "value": "hive_table" + }, + "__modificationTimestamp": { + "type": "long", + "value": 1522693838471 + }, + "Asset.owner": { + "type": "string", + "value": "anonymous" + }, + "hive_table.lastAccessTime": { + "type": "long", + "value": 1522693834000 + }, + "hive_table.viewOriginalText": { + "type": "string", + "value": "select col1, col2, col4 from test_table" + }, + "hive_table.columns": { + "type": "list", + "value": [{ + "type": "string", + "value": "816u-35tc-ao0l-47so" + }, { + "type": "string", + "value": "82rq-35tc-ao0l-2glc" + }] + }, + "__timestamp": { + "type": "long", + "value": 1522693835017 + }, + "_id": 147504, + "_type": "vertex" +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java index 82a800e..d191b55 100644 --- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java +++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java @@ -35,6 +35,7 @@ import com.tinkerpop.pipes.util.structures.Row; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.groovy.GroovyExpression; +import org.apache.atlas.model.impexp.MigrationStatus; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraphManagement; @@ -419,6 +420,11 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> { public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException { } + @Override + public MigrationStatus getMigrationStatus() { + return new MigrationStatus(); + } + public void addMultiProperties(Set<String> names) { multiProperties.addAll(names); } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java index 1daf371..9620c13 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java @@ -18,97 +18,58 @@ package org.apache.atlas.repository.impexp; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.model.impexp.MigrationStatus; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; import javax.inject.Singleton; -import java.util.Date; -import java.util.Iterator; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; @AtlasService @Singleton public class MigrationProgressService { private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class); + public static final String MIGRATION_QUERY_CACHE_TTL = "atlas.migration.query.cache.ttlInSecs"; - private static final String MIGRATION_STATUS_TYPE_NAME = "__MigrationStatus"; - private static final String CURRENT_INDEX_PROPERTY = "currentIndex"; - private static final String OPERATION_STATUS_PROPERTY = "operationStatus"; - private static final String START_TIME_PROPERTY = "startTime"; - private static final String END_TIME_PROPERTY = "endTime"; - private static final String TOTAL_COUNT_PROPERTY = "totalCount"; - private static final String MIGRATION_STATUS_KEY = "1"; + @VisibleForTesting + static long DEFAULT_CACHE_TTL_IN_SECS = 30 * 1000; // 30 secs + private final long cacheValidity; private final AtlasGraph graph; - private final MigrationStatus defaultStatus = new MigrationStatus(); - private LoadingCache<String, MigrationStatus> cache; + private MigrationStatus cachedStatus; + private long cacheExpirationTime = 0; @Inject - public MigrationProgressService(AtlasGraph graph) { + public MigrationProgressService(Configuration configuration, AtlasGraph graph) { this.graph = graph; + this.cacheValidity = (configuration != null) ? + configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) : + DEFAULT_CACHE_TTL_IN_SECS; } public MigrationStatus getStatus() { - try { - if (cache == null) { - initCache(); - cache.get(MIGRATION_STATUS_KEY); - } - - if(cache.size() > 0) { - return cache.get(MIGRATION_STATUS_KEY); - } - - return defaultStatus; - } catch (ExecutionException e) { - return defaultStatus; - } + return fetchStatus(); } - private void initCache() { - this.cache = CacheBuilder.newBuilder().refreshAfterWrite(30, TimeUnit.SECONDS). - build(new CacheLoader<String, MigrationStatus>() { - @Override - public MigrationStatus load(String key) { - try { - return from(fetchStatusVertex()); - } catch (Exception e) { - LOG.error("Error retrieving status.", e); - return defaultStatus; - } - } - - private MigrationStatus from(AtlasVertex vertex) { - if (vertex == null) { - return null; - } - - MigrationStatus ms = new MigrationStatus(); + private MigrationStatus fetchStatus() { + long currentTime = System.currentTimeMillis(); + if(resetCache(currentTime)) { + cachedStatus = graph.getMigrationStatus(); + } - ms.setStartTime(GraphHelper.getSingleValuedProperty(vertex, START_TIME_PROPERTY, Date.class)); - ms.setEndTime(GraphHelper.getSingleValuedProperty(vertex, END_TIME_PROPERTY, Date.class)); - ms.setCurrentIndex(GraphHelper.getSingleValuedProperty(vertex, CURRENT_INDEX_PROPERTY, Long.class)); - ms.setOperationStatus(GraphHelper.getSingleValuedProperty(vertex, OPERATION_STATUS_PROPERTY, String.class)); - ms.setTotalCount(GraphHelper.getSingleValuedProperty(vertex, TOTAL_COUNT_PROPERTY, Long.class)); + return cachedStatus; + } - return ms; - } + private boolean resetCache(long currentTime) { + boolean ret = cachedStatus == null || currentTime > cacheExpirationTime; + if(ret) { + cacheExpirationTime = currentTime + cacheValidity; + } - private AtlasVertex fetchStatusVertex() { - Iterator<AtlasVertex> itr = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME).vertices().iterator(); - return itr.hasNext() ? itr.next() : null; - } - }); + return ret; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java index 2fad333..a4f7a2f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationService.java @@ -120,10 +120,12 @@ public class DataMigrationService implements Service { @VisibleForTesting void processIncomingTypesDef(File typesDefFile) throws AtlasBaseException { try { + AtlasImportResult result = new AtlasImportResult(); String jsonStr = FileUtils.readFileToString(typesDefFile); AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class); ImportTypeDefProcessor processor = new ImportTypeDefProcessor(typeDefStore, typeRegistry); - processor.processTypes(typesDef, new AtlasImportResult()); + processor.processTypes(typesDef, result); + LOG.info(" types migrated: {}", result.getMetrics()); } catch (IOException e) { LOG.error("processIncomingTypesDef: Could not process file: {}! Imported data may not be usable.", typesDefFile.getName()); } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/repository/src/test/java/org/apache/atlas/repository/migration/HiveParititionTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/migration/HiveParititionTest.java b/repository/src/test/java/org/apache/atlas/repository/migration/HiveParititionTest.java index 382c288..8e4a2f6 100644 --- a/repository/src/test/java/org/apache/atlas/repository/migration/HiveParititionTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/migration/HiveParititionTest.java @@ -43,11 +43,16 @@ public class HiveParititionTest extends MigrationBaseAsserts { @Test public void fileImporterTest() throws IOException, AtlasBaseException { + final int EXPECTED_TOTAL_COUNT = 140; + final int EXPECTED_DB_COUNT = 1; + final int EXPECTED_TABLE_COUNT = 2; + final int EXPECTED_COLUMN_COUNT = 7; + runFileImporter("parts_db"); assertPartitionKeyProperty(getVertex("hive_table", "t1"), 1); assertPartitionKeyProperty(getVertex("hive_table", "tv1"), 1); - assertHiveVertices(1, 2, 7); + assertHiveVertices(EXPECTED_DB_COUNT, EXPECTED_TABLE_COUNT, EXPECTED_COLUMN_COUNT); assertTypeCountNameGuid("hive_db", 1, "parts_db", "ae30d78b-51b4-42ab-9436-8d60c8f68b95"); assertTypeCountNameGuid("hive_process", 1, "", ""); @@ -55,7 +60,7 @@ public class HiveParititionTest extends MigrationBaseAsserts { assertEdges("hive_table", "t1", AtlasEdgeDirection.OUT, 1, 1, "hive_db_tables"); assertEdges("hive_table", "tv1", AtlasEdgeDirection.OUT, 1, 1, "hive_db_tables"); - assertMigrationStatus(136); + assertMigrationStatus(EXPECTED_TOTAL_COUNT); } private void assertPartitionKeyProperty(AtlasVertex vertex, int expectedCount) { http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/repository/src/test/java/org/apache/atlas/repository/migration/HiveStocksTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/migration/HiveStocksTest.java b/repository/src/test/java/org/apache/atlas/repository/migration/HiveStocksTest.java index 25c72a4..dc47172 100644 --- a/repository/src/test/java/org/apache/atlas/repository/migration/HiveStocksTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/migration/HiveStocksTest.java @@ -37,9 +37,14 @@ public class HiveStocksTest extends MigrationBaseAsserts { @Test public void migrateStocks() throws AtlasBaseException, IOException { + final int EXPECTED_TOTAL_COUNT = 187; + final int EXPECTED_DB_COUNT = 1; + final int EXPECTED_TABLE_COUNT = 1; + final int EXPECTED_COLUMN_COUNT = 7; + runFileImporter("stocks_db"); - assertHiveVertices(1, 1, 7); + assertHiveVertices(EXPECTED_DB_COUNT, EXPECTED_TABLE_COUNT, EXPECTED_COLUMN_COUNT); assertTypeCountNameGuid("hive_db", 1, "stocks", "4e13b36b-9c54-4616-9001-1058221165d0"); assertTypeCountNameGuid("hive_table", 1, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774"); assertTypeAttribute("hive_table", 7, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774", "hive_table.columns"); @@ -58,6 +63,6 @@ public class HiveStocksTest extends MigrationBaseAsserts { assertEdges(getVertex("hive_table", "stocks_daily").getEdges(AtlasEdgeDirection.OUT).iterator(), 1, 1, "hive_db_tables"); assertEdges(getVertex("hive_column", "high").getEdges(AtlasEdgeDirection.OUT).iterator(), 1,1, "hive_table_columns"); - assertMigrationStatus(187); + assertMigrationStatus(EXPECTED_TOTAL_COUNT); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/repository/src/test/java/org/apache/atlas/repository/migration/MigrationBaseAsserts.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/migration/MigrationBaseAsserts.java b/repository/src/test/java/org/apache/atlas/repository/migration/MigrationBaseAsserts.java index 5639b43..ec6e64a 100644 --- a/repository/src/test/java/org/apache/atlas/repository/migration/MigrationBaseAsserts.java +++ b/repository/src/test/java/org/apache/atlas/repository/migration/MigrationBaseAsserts.java @@ -40,7 +40,7 @@ import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.AssertJUnit.assertTrue; +import static org.testng.Assert.assertTrue; public class MigrationBaseAsserts { protected static final String ASSERT_NAME_PROPERTY = "Asset.name"; @@ -76,6 +76,7 @@ public class MigrationBaseAsserts { private void loadTypesFromJson() throws IOException, AtlasBaseException { loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry); + loadModelFromJson("1000-Hadoop/1020-fs_model.json", typeDefStore, typeRegistry); loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry); } http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/repository/src/test/java/org/apache/atlas/repository/migration/MigrationProgressServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/migration/MigrationProgressServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/migration/MigrationProgressServiceTest.java new file mode 100644 index 0000000..a0e2e03 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/migration/MigrationProgressServiceTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.migration; + +import org.apache.atlas.model.impexp.MigrationStatus; +import org.apache.atlas.repository.graphdb.*; +import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager; +import org.apache.atlas.repository.impexp.MigrationProgressService; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.testng.annotations.Test; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.*; + +public class MigrationProgressServiceTest { + + private final long currentIndex = 100l; + private final long totalIndex = 1000l; + private final long increment = 1001l; + private final String statusSuccess = ReaderStatusManager.STATUS_SUCCESS; + + private static class AtlasTinkerGraph { + + public static AtlasGraph create(TinkerGraph tg) { + AtlasGraph g = mock(AtlasGraph.class); + when(g.getMigrationStatus()).thenAnswer(invocation -> ReaderStatusManager.get(tg)); + return g; + } + + public static AtlasGraph create() { + return create(TinkerGraph.open()); + } + } + + @Test + public void absentStatusNodeReturnsDefaultStatus() { + MigrationProgressService mps = getMigrationStatusForTest(null, null); + MigrationStatus ms = mps.getStatus(); + + assertNotNull(ms); + assertTrue(StringUtils.isEmpty(ms.getOperationStatus())); + assertEquals(ms.getCurrentIndex(), 0); + assertEquals(ms.getTotalCount(), 0); + } + + @Test + public void existingStatusNodeRetrurnStatus() { + final long currentIndex = 100l; + final long totalIndex = 1000l; + final String status = ReaderStatusManager.STATUS_SUCCESS; + + TinkerGraph tg = createUpdateStatusNode(null, currentIndex, totalIndex, status); + MigrationProgressService mps = getMigrationStatusForTest(null, tg); + MigrationStatus ms = mps.getStatus(); + + assertMigrationStatus(totalIndex, status, ms); + } + + @Test + public void cachedStatusReturnedIfQueriedBeforeCacheExpiration() { + TinkerGraph tg = createUpdateStatusNode(null, currentIndex, totalIndex, statusSuccess); + + MigrationProgressService mps = getMigrationStatusForTest(null, tg); + MigrationStatus ms = mps.getStatus(); + + createUpdateStatusNode(tg, currentIndex + increment, totalIndex + increment, ReaderStatusManager.STATUS_FAILED); + MigrationStatus ms2 = mps.getStatus(); + + assertEquals(ms.hashCode(), ms2.hashCode()); + assertMigrationStatus(totalIndex, statusSuccess, ms); + } + + private MigrationProgressService getMigrationStatusForTest(Configuration cfg, TinkerGraph tg) { + return new MigrationProgressService(cfg, AtlasTinkerGraph.create(tg)); + } + + @Test + public void cachedUpdatedIfQueriedAfterCacheExpiration() throws InterruptedException { + final String statusFailed = ReaderStatusManager.STATUS_FAILED; + + TinkerGraph tg = createUpdateStatusNode(null, currentIndex, totalIndex, statusSuccess); + long cacheTTl = 100l; + MigrationProgressService mps = getMigrationStatusForTest(getStubConfiguration(cacheTTl), tg); + MigrationStatus ms = mps.getStatus(); + + assertMigrationStatus(totalIndex, statusSuccess, ms); + + createUpdateStatusNode(tg, currentIndex + increment, totalIndex + increment, ReaderStatusManager.STATUS_FAILED); + Thread.sleep(2 * cacheTTl); + + MigrationStatus ms2 = mps.getStatus(); + + assertNotEquals(ms.hashCode(), ms2.hashCode()); + + assertMigrationStatus(totalIndex + increment, statusFailed, ms2); + } + + private Configuration getStubConfiguration(long ttl) { + Configuration cfg = mock(Configuration.class); + when(cfg.getLong(anyString(), anyLong())).thenReturn(ttl); + return cfg; + } + + private TinkerGraph createUpdateStatusNode(TinkerGraph tg, long currentIndex, long totalIndex, String status) { + if(tg == null) { + tg = TinkerGraph.open(); + } + + ReaderStatusManager rsm = new ReaderStatusManager(tg, tg); + rsm.update(tg, currentIndex); + rsm.end(tg, totalIndex, status); + return tg; + } + + private void assertMigrationStatus(long totalIndex, String status, MigrationStatus ms) { + assertNotNull(ms); + assertEquals(ms.getOperationStatus(), status); + assertEquals(ms.getCurrentIndex(), totalIndex); + assertEquals(ms.getTotalCount(), totalIndex); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/repository/src/test/java/org/apache/atlas/repository/migration/PathTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/migration/PathTest.java b/repository/src/test/java/org/apache/atlas/repository/migration/PathTest.java new file mode 100644 index 0000000..43357e6 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/migration/PathTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.migration; + +import com.google.inject.Inject; +import org.apache.atlas.TestModules; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasBuiltInTypes; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Iterator; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class PathTest extends MigrationBaseAsserts { + @Inject + public PathTest(AtlasGraph graph) { + super(graph); + } + + @Test + public void migrationImport() throws IOException, AtlasBaseException { + runFileImporter("path_db"); + + AtlasVertex v = assertHdfsPathVertices(1); + assertVertexProperties(v); + assertMigrationStatus(88); + } + + private void assertVertexProperties(AtlasVertex v) { + final String HASH_CODE_PROPERTY = "hdfs_path.hashCode"; + final String RETENTION_PROPERTY = "hdfs_path.retention"; + + AtlasBuiltInTypes.AtlasBigIntegerType bitRef = new AtlasBuiltInTypes.AtlasBigIntegerType(); + AtlasBuiltInTypes.AtlasBigDecimalType bdtRef = new AtlasBuiltInTypes.AtlasBigDecimalType(); + + BigInteger bitExpected = bitRef.getNormalizedValue(612361213421234L); + BigDecimal bdtExpected = bdtRef.getNormalizedValue(125353); + + BigInteger bit = GraphHelper.getSingleValuedProperty(v, HASH_CODE_PROPERTY, BigInteger.class); + BigDecimal bdt = GraphHelper.getSingleValuedProperty(v, RETENTION_PROPERTY, BigDecimal.class); + + assertEquals(bit, bitExpected); + assertEquals(bdt.compareTo(bdtExpected), 0); + } + + protected AtlasVertex assertHdfsPathVertices(int expectedCount) { + int i = 0; + + AtlasVertex vertex = null; + Iterator<AtlasVertex> results = getVertices("hdfs_path", null); + for (Iterator<AtlasVertex> it = results; it.hasNext(); i++) { + vertex = it.next(); + assertNotNull(vertex); + } + + assertEquals(i, expectedCount); + return vertex; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/15967a93/repository/src/test/java/org/apache/atlas/repository/migration/RelationshipMappingTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/migration/RelationshipMappingTest.java b/repository/src/test/java/org/apache/atlas/repository/migration/RelationshipMappingTest.java index 0be4be8..48e5391 100644 --- a/repository/src/test/java/org/apache/atlas/repository/migration/RelationshipMappingTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/migration/RelationshipMappingTest.java @@ -29,6 +29,7 @@ import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasRelationshipType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.lang.StringUtils; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; import org.jcodings.util.Hash; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice;
