ATLAS-2886: Support for fully qualified server name Signed-off-by: Madhan Neethiraj <mad...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/8639ada6 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/8639ada6 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/8639ada6 Branch: refs/heads/master Commit: 8639ada6a74cdaa32b0b493aaff1168733487eef Parents: 31c3bea Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Sun Sep 23 08:50:06 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Thu Oct 11 17:21:25 2018 -0700 ---------------------------------------------------------------------- addons/models/0010-base_model.json | 335 +++++++++++++++++++ .../apache/atlas/entitytransform/Condition.java | 2 - .../atlas/repository/impexp/AuditsWriter.java | 44 ++- .../impexp/ExportImportAuditServiceTest.java | 2 +- .../IncrementalExportEntityProviderTest.java | 2 - .../impexp/ReplicationEntityAttributeTest.java | 14 +- .../stocksDB-Entities/export-replicatedTo.json | 2 +- .../import-replicatedFrom.json | 2 +- 8 files changed, 377 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/8639ada6/addons/models/0010-base_model.json ---------------------------------------------------------------------- diff --git a/addons/models/0010-base_model.json b/addons/models/0010-base_model.json new file mode 100644 index 0000000..1bfbf2f --- /dev/null +++ b/addons/models/0010-base_model.json @@ -0,0 +1,335 @@ +{ + "enumDefs": [], + "structDefs": [], + "classificationDefs": [ + { + "name": "TaxonomyTerm", + "superTypes": [], + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "atlas.taxonomy", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + } + ] + } + ], + "entityDefs": [ + { + "name": "Referenceable", + "superTypes": [], + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "qualifiedName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": true + } + ] + }, + { + "name": "__internal", + "superTypes": [], + "typeVersion": "1.0", + "attributeDefs": [] + }, + { + "name": "Asset", + "superTypes": [], + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "name", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": false + }, + { + "name": "description", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + }, + { + "name": "owner", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + } + ] + }, + { + "name": "DataSet", + "superTypes": [ + "Referenceable", + "Asset" + ], + "typeVersion": "1.0", + "attributeDefs": [] + }, + { + "name": "Infrastructure", + "superTypes": [ + "Referenceable", + "Asset" + ], + "typeVersion": "1.0", + "attributeDefs": [] + }, + { + "name": "Process", + "superTypes": [ + "Referenceable", + "Asset" + ], + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "inputs", + "typeName": "array<DataSet>", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + }, + { + "name": "outputs", + "typeName": "array<DataSet>", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + } + ] + }, + { + "name": "AtlasServer", + "typeVersion": "1.0", + "superTypes": [ + ], + "attributeDefs": [ + { + "name": "name", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": false + }, + { + "name": "displayName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": false + }, + { + "name": "fullName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": true + }, + { + "name": "urls", + "typeName": "array<string>", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + }, + { + "name": "additionalInfo", + "typeName": "map<string,string>", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + } + ] + }, + { + "name": "__AtlasUserProfile", + "superTypes": [ + "__internal" + ], + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "name", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": true + }, + { + "name": "fullName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + }, + { + "name": "savedSearches", + "typeName": "array<__AtlasUserSavedSearch>", + "cardinality": "LIST", + "isIndexable": false, + "isOptional": true, + "isUnique": false, + "constraints": [ + { + "type": "ownedRef" + } + ] + } + ] + }, + { + "name": "__AtlasUserSavedSearch", + "superTypes": [ + "__internal" + ], + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "name", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": false, + "isUnique": false + }, + { + "name": "ownerName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": false, + "isUnique": false + }, + { + "name": "uniqueName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": true + }, + { + "name": "searchType", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": false + }, + { + "name": "searchParameters", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": false, + "isUnique": false + }, + { + "name": "searchParameters", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + } + ] + }, + { + "name": "__ExportImportAuditEntry", + "typeVersion": "1.0", + "superTypes": [ + "__internal" + ], + "attributeDefs": [ + { + "name": "userName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + }, + { + "name": "operation", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": false + }, + { + "name": "sourceServerName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + }, + { + "name": "targetServerName", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + }, + { + "name": "operationParams", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + }, + { + "name": "operationStartTime", + "typeName": "long", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": false + }, + { + "name": "operationEndTime", + "typeName": "long", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + }, + { + "name": "resultSummary", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": false, + "isOptional": true, + "isUnique": false + } + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/atlas/blob/8639ada6/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java b/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java index 3bf49f0..b834f46 100644 --- a/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java +++ b/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java @@ -17,7 +17,6 @@ */ package org.apache.atlas.entitytransform; -import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.instance.AtlasEntity; @@ -226,7 +225,6 @@ public abstract class Condition { } } - @VisibleForTesting void addObjectId(AtlasObjectId objId) { this.objectIds.add(objId); } http://git-wip-us.apache.org/repos/asf/atlas/blob/8639ada6/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index 407b406..f72de56 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -44,6 +44,7 @@ import java.util.Map; public class AuditsWriter { private static final Logger LOG = LoggerFactory.getLogger(AuditsWriter.class); private static final String CLUSTER_NAME_DEFAULT = "default"; + private static final String DC_SERVER_NAME_SEPARATOR = "$"; private AtlasServerService atlasServerService; private ExportImportAuditService auditService; @@ -74,7 +75,7 @@ public class AuditsWriter { } private void updateReplicationAttribute(boolean isReplicationSet, - String serverName, + String serverName, String serverFullName, List<String> exportedGuids, String attrNameReplicated, long lastModifiedTimestamp) throws AtlasBaseException { @@ -82,7 +83,7 @@ public class AuditsWriter { return; } - AtlasServer server = saveServer(serverName, exportedGuids.get(0), lastModifiedTimestamp); + AtlasServer server = saveServer(serverName, serverFullName, exportedGuids.get(0), lastModifiedTimestamp); atlasServerService.updateEntitiesWithServer(server, exportedGuids, attrNameReplicated); } @@ -92,15 +93,16 @@ public class AuditsWriter { : StringUtils.EMPTY; } - private AtlasServer saveServer(String name) throws AtlasBaseException { - return atlasServerService.save(new AtlasServer(name, name)); + private AtlasServer saveServer(String name, String serverFullName) { + AtlasServer cluster = new AtlasServer(name, serverFullName); + return atlasServerService.save(cluster); } - private AtlasServer saveServer(String name, + private AtlasServer saveServer(String name, String serverFullName, String entityGuid, - long lastModifiedTimestamp) throws AtlasBaseException { + long lastModifiedTimestamp) { - AtlasServer server = new AtlasServer(name, name); + AtlasServer server = new AtlasServer(name, serverFullName); server.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp); if (LOG.isDebugEnabled()) { @@ -120,11 +122,20 @@ public class AuditsWriter { return StringUtils.EMPTY; } + static String getServerNameFromFullName(String fullName) { + if (StringUtils.isEmpty(fullName) || !fullName.contains(DC_SERVER_NAME_SEPARATOR)) { + return fullName; + } + + return StringUtils.split(fullName, "$")[1]; + } + private class ExportAudits { private AtlasExportRequest request; private String targetServerName; private String optionKeyReplicatedTo; private boolean replicationOptionState; + private String targetServerFullName; public void add(String userName, AtlasExportResult result, long startTime, long endTime, @@ -143,16 +154,17 @@ public class AuditsWriter { return; } - updateReplicationAttribute(replicationOptionState, targetServerName, + updateReplicationAttribute(replicationOptionState, targetServerName, targetServerFullName, entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker()); } private void saveServers() throws AtlasBaseException { - saveServer(getCurrentClusterName()); + saveServer(getCurrentClusterName(), getCurrentClusterName()); - targetServerName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); + targetServerFullName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); + targetServerName = getServerNameFromFullName(targetServerFullName); if(StringUtils.isNotEmpty(targetServerName)) { - saveServer(targetServerName); + saveServer(targetServerName, targetServerFullName); } } } @@ -162,6 +174,7 @@ public class AuditsWriter { private boolean replicationOptionState; private String sourceServerName; private String optionKeyReplicatedFrom; + private String sourceServerFullName; public void add(String userName, AtlasImportResult result, long startTime, long endTime, @@ -181,16 +194,17 @@ public class AuditsWriter { return; } - updateReplicationAttribute(replicationOptionState, this.sourceServerName, entityGuids, + updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker()); } private void saveServers() throws AtlasBaseException { - saveServer(getCurrentClusterName()); + saveServer(getCurrentClusterName(), getCurrentClusterName()); - sourceServerName = getClusterNameFromOptionsState(); + sourceServerFullName = getClusterNameFromOptionsState(); + sourceServerName = getServerNameFromFullName(sourceServerFullName); if(StringUtils.isNotEmpty(sourceServerName)) { - saveServer(sourceServerName); + saveServer(sourceServerName, sourceServerFullName); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/8639ada6/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java index 16fd39d..ba7a8a0 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java @@ -61,7 +61,7 @@ public class ExportImportAuditServiceTest extends ExportImportTestBase { } @Test - public void saveLogEntry() throws AtlasBaseException, InterruptedException { + public void saveLogEntry() throws AtlasBaseException { final String source1 = "clx"; final String target1 = "cly"; ExportImportAuditEntry entry = saveAndGet(source1, ExportImportAuditEntry.OPERATION_EXPORT, target1); http://git-wip-us.apache.org/repos/asf/atlas/blob/8639ada6/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java index de0a8f8..85ed5f9 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java @@ -23,7 +23,6 @@ import org.apache.atlas.TestModules; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; -import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.util.UniqueList; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasTypeRegistry; @@ -64,7 +63,6 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase { verifyCreatedEntities(entityStore, entityGuids, 2); gremlinScriptEngine = atlasGraph.getGremlinScriptEngine(); - EntityGraphRetriever entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine); } http://git-wip-us.apache.org/repos/asf/atlas/blob/8639ada6/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java index 79a5e05..94483f5 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java @@ -116,7 +116,10 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { assertNotNull(zipSource.getCreationOrder()); assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount); - assertCluster(REPLICATED_TO_CLUSTER_NAME, null); + assertCluster( + AuditsWriter.getServerNameFromFullName(REPLICATED_TO_CLUSTER_NAME), + REPLICATED_TO_CLUSTER_NAME, null); + assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_TO); } @@ -125,7 +128,9 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { AtlasImportRequest request = getImportRequestWithReplicationOption(); AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource); - assertCluster(REPLICATED_FROM_CLUSTER_NAME, importResult); + assertCluster( + AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME), + REPLICATED_FROM_CLUSTER_NAME, importResult); assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM); } @@ -141,11 +146,12 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { } } - private void assertCluster(String name, AtlasImportResult importResult) throws AtlasBaseException { - AtlasServer actual = atlasServerService.get(new AtlasServer(name, name)); + private void assertCluster(String name, String fullName, AtlasImportResult importResult) throws AtlasBaseException { + AtlasServer actual = atlasServerService.get(new AtlasServer(name, fullName)); assertNotNull(actual); assertEquals(actual.getName(), name); + assertEquals(actual.getFullName(), fullName); if(importResult != null) { assertClusterAdditionalInfo(actual, importResult); http://git-wip-us.apache.org/repos/asf/atlas/blob/8639ada6/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json b/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json index a69fe9e..a6fec6c 100644 --- a/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json +++ b/repository/src/test/resources/json/stocksDB-Entities/export-replicatedTo.json @@ -6,6 +6,6 @@ ], "options": { "fetchType": "full", - "replicatedTo": "clTarget" + "replicatedTo": "dc2$clTarget" } } http://git-wip-us.apache.org/repos/asf/atlas/blob/8639ada6/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json b/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json index 1ce00ad..29268ef 100644 --- a/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json +++ b/repository/src/test/resources/json/stocksDB-Entities/import-replicatedFrom.json @@ -1,5 +1,5 @@ { "options": { - "replicatedFrom": "clSource" + "replicatedFrom": "dc1$clSource" } }