This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 921f487 ATLAS-3362: Updated logic for storing repl key for
table-level replication.
921f487 is described below
commit 921f487b043f801e70427e98e2d4ae9a8ff877dd
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Fri Aug 9 11:53:51 2019 -0700
ATLAS-3362: Updated logic for storing repl key for table-level replication.
Signed-off-by: nixonrodrigues <[email protected]>
---
.../atlas/repository/impexp/AuditsWriter.java | 65 ++++++++++++++++++++--
.../impexp/ReplicationEntityAttributeTest.java | 9 +++
2 files changed, 68 insertions(+), 6 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 9bf30f1..f2d36ed 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.impexp;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
@@ -28,9 +29,13 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
-import org.apache.commons.collections.MapUtils;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +43,8 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.inject.Inject;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
@Component
public class AuditsWriter {
@@ -47,6 +52,8 @@ public class AuditsWriter {
private static final String CLUSTER_NAME_DEFAULT = "default";
private static final String DC_SERVER_NAME_SEPARATOR = "$";
+ private AtlasTypeRegistry typeRegistry;
+ private AtlasEntityStore entityStore;
private AtlasServerService atlasServerService;
private ExportImportAuditService auditService;
@@ -54,7 +61,9 @@ public class AuditsWriter {
private ImportAudits auditForImport = new ImportAudits();
@Inject
- public AuditsWriter(AtlasServerService atlasServerService,
ExportImportAuditService auditService) {
+ public AuditsWriter(AtlasTypeRegistry typeRegistry, AtlasEntityStore
entityStore, AtlasServerService atlasServerService, ExportImportAuditService
auditService) {
+ this.typeRegistry = typeRegistry;
+ this.entityStore = entityStore;
this.atlasServerService = atlasServerService;
this.auditService = auditService;
}
@@ -80,7 +89,9 @@ public class AuditsWriter {
return;
}
- AtlasServer server = saveServer(serverName, serverFullName,
exportedGuids.get(0), lastModifiedTimestamp);
+ String candidateGuid = exportedGuids.get(0);
+ String replGuidKey = ReplKeyGuidFinder.get(typeRegistry, entityStore,
candidateGuid);
+ AtlasServer server = saveServer(serverName, serverFullName,
replGuidKey, lastModifiedTimestamp);
atlasServerService.updateEntitiesWithServer(server, exportedGuids,
attrNameReplicated);
}
@@ -126,6 +137,50 @@ public class AuditsWriter {
atlasServerService.getCreateAtlasServer(getCurrentClusterName(),
getCurrentClusterName());
}
+ static class ReplKeyGuidFinder {
+ private static final String ENTITY_TYPE_HIVE_DB = "hive_db";
+ private static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
+ private static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
+
+ public static String get(AtlasTypeRegistry typeRegistry,
AtlasEntityStore entityStore, String candidateGuid) {
+ String guid = null;
+ try {
+ guid = getParentEntityGuid(typeRegistry, entityStore,
candidateGuid);
+ } catch (AtlasBaseException e) {
+ LOG.error("Error fetching parent guid for child entity: {}",
candidateGuid);
+ }
+
+ if (StringUtils.isEmpty(guid)) {
+ guid = candidateGuid;
+ }
+
+ return guid;
+ }
+
+ private static String getParentEntityGuid(AtlasTypeRegistry
typeRegistry, AtlasEntityStore entityStore, String defaultGuid) throws
AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo extInfo =
entityStore.getById(defaultGuid);
+ if (extInfo == null || extInfo.getEntity() == null) {
+ return null;
+ }
+
+ String typeName = extInfo.getEntity().getTypeName();
+ if (!typeName.equals(ENTITY_TYPE_HIVE_TABLE) &&
!typeName.equals(ENTITY_TYPE_HIVE_COLUMN)) {
+ return null;
+ }
+
+ String hiveDBQualifiedName = extractHiveDBQualifiedName((String)
extInfo.getEntity().getAttribute(EntityGraphRetriever.QUALIFIED_NAME));
+ AtlasEntityType entityType =
typeRegistry.getEntityTypeByName(ENTITY_TYPE_HIVE_DB);
+ return entityStore.getGuidByUniqueAttributes(entityType,
Collections.singletonMap(EntityGraphRetriever.QUALIFIED_NAME,
hiveDBQualifiedName));
+ }
+
+ @VisibleForTesting
+ static String extractHiveDBQualifiedName(String qualifiedName) {
+ return String.format("%s@%s",
+ StringUtils.substringBefore(qualifiedName, "."),
+ StringUtils.substringAfter(qualifiedName, "@"));
+ }
+ }
+
private class ExportAudits {
private AtlasExportRequest request;
private String targetServerName;
@@ -159,13 +214,11 @@ public class AuditsWriter {
private AtlasImportRequest request;
private boolean replicationOptionState;
private String sourceServerName;
- private String optionKeyReplicatedFrom;
private String sourceServerFullName;
public void add(String userName, AtlasImportResult result,
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
- optionKeyReplicatedFrom =
AtlasImportRequest.OPTION_KEY_REPLICATED_FROM;
request = result.getRequest();
replicationOptionState = request.isReplicationOptionSet();
diff --git
a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 829390b..92d4fb0 100644
---
a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -33,6 +33,7 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
@@ -146,6 +147,14 @@ public class ReplicationEntityAttributeTest extends
ExportImportTestBase {
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM);
}
+ @Test
+ public void replKeyGuidFinder() {
+ String expectedDBQualifiedName = "largedb@cl1";
+
+
assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("largedb.testtable_0.col101@cl1"),
expectedDBQualifiedName);
+
assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("largedb.testtable_0@cl1"),
expectedDBQualifiedName);
+ }
+
private void assertReplicationAttribute(String attrNameReplication) throws
AtlasBaseException {
pauseForIndexCreation();
AtlasEntity.AtlasEntitiesWithExtInfo entities =
entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));