Repository: atlas Updated Branches: refs/heads/master 831ad0142 -> b37154f87
ATLAS-2811: Skip Lineage Export option. Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/4ff57281 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/4ff57281 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/4ff57281 Branch: refs/heads/master Commit: 4ff572815318772be6842ec93331f07c7e006bbd Parents: 831ad01 Author: Ashutosh Mestry <[email protected]> Authored: Mon Aug 6 13:57:05 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Tue Oct 9 10:48:52 2018 -0700 ---------------------------------------------------------------------- .../atlas/model/impexp/AtlasExportRequest.java | 1 + .../atlas/repository/impexp/ExportService.java | 104 +++++------- .../repository/impexp/ExportTypeProcessor.java | 159 +++++++++++++++++++ .../atlas/repository/util/UniqueList.java | 73 +++++++++ .../clusterinfo/ClusterServiceTest.java | 118 -------------- .../repository/impexp/ClusterServiceTest.java | 118 ++++++++++++++ .../repository/impexp/ExportImportTestBase.java | 26 ++- .../impexp/ExportSkipLineageTest.java | 125 +++++++++++++++ .../impexp/ReplicationEntityAttributeTest.java | 17 -- .../atlas/repository/impexp/UniqueListTest.java | 11 +- .../impexp/ZipFileResourceTestUtils.java | 17 ++ 11 files changed, 562 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java index 23d474c..93be953 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java @@ -48,6 +48,7 @@ public class AtlasExportRequest implements Serializable { public static final String OPTION_FETCH_TYPE = "fetchType"; public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; + public static final String OPTION_SKIP_LINEAGE = "skipLineage"; public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo"; public static final String FETCH_TYPE_FULL = "full"; public static final String FETCH_TYPE_CONNECTED = "connected"; http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 3ab7964..f74b3cd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -38,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.util.UniqueList; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasEntityType; @@ -68,11 +69,17 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.*; public class ExportService { private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); + private static final String PROPERTY_GUID = "__guid"; + private static final String PROPERTY_IS_PROCESS = "isProcess"; + + private final AtlasTypeRegistry typeRegistry; - private AuditsWriter auditsWriter; + private final String QUERY_BINDING_START_GUID = "startGuid"; + private AuditsWriter auditsWriter; private final AtlasGraph atlasGraph; private final EntityGraphRetriever entityGraphRetriever; private final AtlasGremlinQueryProvider gremlinQueryProvider; + private ExportTypeProcessor exportTypeProcessor; @Inject public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter) { @@ -87,7 +94,8 @@ public class ExportService { String requestingIP) throws AtlasBaseException { long startTime = System.currentTimeMillis(); AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime); - ExportContext context = new ExportContext(result, exportSink); + ExportContext context = new ExportContext(atlasGraph, result, exportSink); + exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context); try { LOG.info("==> export(user={}, from={})", userName, requestingIP); @@ -333,14 +341,14 @@ public class ExportService { } addEntity(entityWithExtInfo, context); - addTypes(entityWithExtInfo.getEntity(), context); + exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context); context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); if(entityWithExtInfo.getReferredEntities() != null) { for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { - addTypes(e, context); + exportTypeProcessor.addTypes(e, context); getConntedEntitiesBasedOnOption(e, context, direction); } @@ -377,7 +385,7 @@ public class ExportService { } } - private boolean isProcessEntity(AtlasEntity entity) throws AtlasBaseException { + private boolean isProcessEntity(AtlasEntity entity) { String typeName = entity.getTypeName(); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); @@ -397,7 +405,7 @@ public class ExportService { } context.bindings.clear(); - context.bindings.put("startGuid", entity.getGuid()); + context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid()); List<Map<String, Object>> result = executeGremlinQuery(query, context); @@ -405,10 +413,12 @@ public class ExportService { continue; } - for (Map<String, Object> map : result) { - String guid = (String) map.get("__guid"); + for (Map<String, Object> hashMap : result) { + String guid = (String) hashMap.get(PROPERTY_GUID); TraversalDirection currentDirection = context.guidDirection.get(guid); - boolean isLineage = (boolean) map.get("isProcess"); + boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS); + + if(context.skipLineage && isLineage) continue; if (currentDirection == null) { context.addToBeProcessed(isLineage, guid, direction); @@ -445,7 +455,7 @@ public class ExportService { String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); context.bindings.clear(); - context.bindings.put("startGuid", entity.getGuid()); + context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid()); List<Map<String, Object>> result = executeGremlinQuery(query, context); @@ -453,9 +463,11 @@ public class ExportService { return; } - for (Map<String, Object> map : result) { - String guid = (String) map.get("__guid"); - boolean isLineage = (boolean) map.get("isProcess"); + for (Map<String, Object> hashMap : result) { + String guid = (String) hashMap.get(PROPERTY_GUID); + boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS); + + if(context.getSkipLineage() && isLineage) continue; if (!context.guidsProcessed.contains(guid)) { context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH); @@ -642,58 +654,7 @@ public class ExportService { } } - public static class UniqueList<T> { - private final List<T> list = new ArrayList<>(); - private final Set<T> set = new HashSet<>(); - - public void add(T e) { - if(set.contains(e)) { - return; - } - - list.add(e); - set.add(e); - } - - public void addAll(UniqueList<T> uniqueList) { - for (T item : uniqueList.list) { - if(set.contains(item)) continue; - - set.add(item); - list.add(item); - } - } - - public T remove(int index) { - T e = list.remove(index); - set.remove(e); - return e; - } - - public boolean contains(T e) { - return set.contains(e); - } - - public int size() { - return list.size(); - } - - public boolean isEmpty() { - return list.isEmpty(); - } - - public void clear() { - list.clear(); - set.clear(); - } - - public List<T> getList() { - return list; - } - } - - - private class ExportContext { + static class ExportContext { final Set<String> guidsProcessed = new HashSet<>(); final UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> lineageToProcess = new UniqueList<>(); @@ -710,10 +671,11 @@ public class ExportService { private final Map<String, Object> bindings; private final ExportFetchType fetchType; private final String matchType; + private final boolean skipLineage; private int progressReportCount = 0; - ExportContext(AtlasExportResult result, ZipSink sink) throws AtlasBaseException { + ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException { this.result = result; this.sink = sink; @@ -721,6 +683,7 @@ public class ExportService { bindings = new HashMap<>(); fetchType = getFetchType(result.getRequest()); matchType = getMatchType(result.getRequest()); + skipLineage = getOptionSkipLineage(result.getRequest()); } private ExportFetchType getFetchType(AtlasExportRequest request) { @@ -747,6 +710,11 @@ public class ExportService { return matchType; } + private boolean getOptionSkipLineage(AtlasExportRequest request) { + return request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE) && + (boolean) request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE); + } + public void clear() { guidsToProcess.clear(); guidsProcessed.clear(); @@ -773,5 +741,9 @@ public class ExportService { LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size()); } } + + public boolean getSkipLineage() { + return skipLineage; + } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java new file mode 100644 index 0000000..6b5db61 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasEnumType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ExportTypeProcessor { + private static final Logger LOG = LoggerFactory.getLogger(ExportTypeProcessor.class); + + private AtlasTypeRegistry typeRegistry; + private final ExportService.ExportContext context; + + ExportTypeProcessor(AtlasTypeRegistry typeRegistry, ExportService.ExportContext context) { + this.typeRegistry = typeRegistry; + this.context = context; + } + + public void addTypes(AtlasEntity entity, ExportService.ExportContext context) { + addEntityType(entity.getTypeName(), context); + + if(CollectionUtils.isNotEmpty(entity.getClassifications())) { + for (AtlasClassification c : entity.getClassifications()) { + addClassificationType(c.getTypeName(), context); + } + } + } + + private void addType(String typeName, ExportService.ExportContext context) { + AtlasType type = null; + + try { + type = typeRegistry.getType(typeName); + + addType(type, context); + } catch (AtlasBaseException excp) { + LOG.error("unknown type {}", typeName); + } + } + + private void addEntityType(String typeName, ExportService.ExportContext context) { + if (!context.entityTypes.contains(typeName)) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + addEntityType(entityType, context); + } + } + + private void addClassificationType(String typeName, ExportService.ExportContext context) { + if (!context.classificationTypes.contains(typeName)) { + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName); + + addClassificationType(classificationType, context); + } + } + + private void addType(AtlasType type, ExportService.ExportContext context) { + if (type.getTypeCategory() == TypeCategory.PRIMITIVE) { + return; + } + + if (type instanceof AtlasArrayType) { + AtlasArrayType arrayType = (AtlasArrayType)type; + + addType(arrayType.getElementType(), context); + } else if (type instanceof AtlasMapType) { + AtlasMapType mapType = (AtlasMapType)type; + + addType(mapType.getKeyType(), context); + addType(mapType.getValueType(), context); + } else if (type instanceof AtlasEntityType) { + addEntityType((AtlasEntityType)type, context); + } else if (type instanceof AtlasClassificationType) { + addClassificationType((AtlasClassificationType)type, context); + } else if (type instanceof AtlasStructType) { + addStructType((AtlasStructType)type, context); + } else if (type instanceof AtlasEnumType) { + addEnumType((AtlasEnumType)type, context); + } + } + + private void addEntityType(AtlasEntityType entityType, ExportService.ExportContext context) { + if (!context.entityTypes.contains(entityType.getTypeName())) { + context.entityTypes.add(entityType.getTypeName()); + + addAttributeTypes(entityType, context); + + if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) { + for (String superType : entityType.getAllSuperTypes()) { + addEntityType(superType, context); + } + } + } + } + + private void addClassificationType(AtlasClassificationType classificationType, ExportService.ExportContext context) { + if (!context.classificationTypes.contains(classificationType.getTypeName())) { + context.classificationTypes.add(classificationType.getTypeName()); + + addAttributeTypes(classificationType, context); + + if (CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) { + for (String superType : classificationType.getAllSuperTypes()) { + addClassificationType(superType, context); + } + } + } + } + + private void addStructType(AtlasStructType structType, ExportService.ExportContext context) { + if (!context.structTypes.contains(structType.getTypeName())) { + context.structTypes.add(structType.getTypeName()); + + addAttributeTypes(structType, context); + } + } + + private void addEnumType(AtlasEnumType enumType, ExportService.ExportContext context) { + if (!context.enumTypes.contains(enumType.getTypeName())) { + context.enumTypes.add(enumType.getTypeName()); + } + } + + private void addAttributeTypes(AtlasStructType structType, ExportService.ExportContext context) { + for (AtlasStructDef.AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) { + addType(attributeDef.getTypeName(), context); + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java new file mode 100644 index 0000000..9148ce0 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.util; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class UniqueList<T> { + private final List<T> list = new ArrayList<>(); + private final Set<T> set = new HashSet<>(); + + public void add(T e) { + if(set.contains(e)) { + return; + } + + list.add(e); + set.add(e); + } + + public void addAll(UniqueList<T> uniqueList) { + for (T item : uniqueList.list) { + if(set.contains(item)) continue; + + set.add(item); + list.add(item); + } + } + + public T remove(int index) { + T e = list.remove(index); + set.remove(e); + return e; + } + + public boolean contains(T e) { + return set.contains(e); + } + + public int size() { + return list.size(); + } + + public boolean isEmpty() { + return list.isEmpty(); + } + + public void clear() { + list.clear(); + set.clear(); + } + + public List<T> getList() { + return list; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/test/java/org/apache/atlas/repository/clusterinfo/ClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/clusterinfo/ClusterServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/clusterinfo/ClusterServiceTest.java deleted file mode 100644 index 77e3339..0000000 --- a/repository/src/test/java/org/apache/atlas/repository/clusterinfo/ClusterServiceTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.repository.clusterinfo; - -import org.apache.atlas.TestModules; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.impexp.ClusterService; -import org.apache.atlas.store.AtlasTypeDefStore; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -import javax.inject.Inject; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -@Guice(modules = TestModules.TestOnlyModule.class) -public class ClusterServiceTest { - private final String TOP_LEVEL_ENTITY_NAME = "db1@cl1"; - private final String CLUSTER_NAME = "testCl1"; - private final String TARGET_CLUSTER_NAME = "testCl2"; - - @Inject - private AtlasTypeDefStore typeDefStore; - - @Inject - private AtlasTypeRegistry typeRegistry; - - @Inject - private ClusterService clusterService; - - @BeforeClass - public void setup() throws IOException, AtlasBaseException { - loadBaseModel(typeDefStore, typeRegistry); - } - - @Test - public void saveAndRetrieveClusterInfo() throws AtlasBaseException { - AtlasCluster expected = getCluster(CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME); - AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME); - AtlasCluster expected3 = getCluster(TARGET_CLUSTER_NAME + "_3", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0, TARGET_CLUSTER_NAME); - - AtlasCluster actual = clusterService.save(expected); - AtlasCluster actual2 = clusterService.save(expected2); - AtlasCluster actual3 = clusterService.save(expected3); - AtlasCluster actual2x = clusterService.get(expected2); - - assertNotNull(actual.getGuid()); - assertNotNull(actual2.getGuid()); - assertNotEquals(actual.getGuid(), actual2.getGuid()); - assertNotEquals(actual2.getGuid(), actual3.getGuid()); - - assertEquals(actual2.getGuid(), actual2x.getGuid()); - - - assertEquals(actual.getName(), expected.getName()); - assertEquals(actual.getQualifiedName(), expected.getQualifiedName()); - assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION), - getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION)); - - assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP), - getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP)); - } - - private AtlasCluster getCluster(String name, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) { - AtlasCluster cluster = new AtlasCluster(name, name); - - Map<String, Object> syncMap = new HashMap<>(); - syncMap.put("operation", operation); - syncMap.put("nextModifiedTimestamp", nextModifiedTimestamp); - syncMap.put("targetCluster", targetClusterName); - - String syncMapJson = AtlasType.toJson(syncMap); - String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntity); - cluster.setAdditionalInfo(topLevelEntitySpecificKey, syncMapJson); - return cluster; - } - - private Map<String, Object> getAdditionalInfo(AtlasCluster cluster, String topLevelEntityName) { - String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntityName); - assertTrue(cluster.getAdditionalInfo().containsKey(topLevelEntitySpecificKey)); - - String json = cluster.getAdditionalInfo(topLevelEntitySpecificKey); - return AtlasType.fromJson(json, Map.class); - } - - private String getTopLevelEntitySpecificKey(String topLevelEntity) { - return String.format("%s:%s", AtlasCluster.SYNC_INFO_KEY, topLevelEntity); - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java new file mode 100644 index 0000000..c931e74 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.TestModules; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.impexp.ClusterService; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class ClusterServiceTest { + private final String TOP_LEVEL_ENTITY_NAME = "db1@cl1"; + private final String CLUSTER_NAME = "testCl1"; + private final String TARGET_CLUSTER_NAME = "testCl2"; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private AtlasTypeRegistry typeRegistry; + + @Inject + private ClusterService clusterService; + + @BeforeClass + public void setup() throws IOException, AtlasBaseException { + loadBaseModel(typeDefStore, typeRegistry); + } + + @Test + public void saveAndRetrieveClusterInfo() throws AtlasBaseException { + AtlasCluster expected = getCluster(CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME); + AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME); + AtlasCluster expected3 = getCluster(TARGET_CLUSTER_NAME + "_3", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0, TARGET_CLUSTER_NAME); + + AtlasCluster actual = clusterService.save(expected); + AtlasCluster actual2 = clusterService.save(expected2); + AtlasCluster actual3 = clusterService.save(expected3); + AtlasCluster actual2x = clusterService.get(expected2); + + assertNotNull(actual.getGuid()); + assertNotNull(actual2.getGuid()); + assertNotEquals(actual.getGuid(), actual2.getGuid()); + assertNotEquals(actual2.getGuid(), actual3.getGuid()); + + assertEquals(actual2.getGuid(), actual2x.getGuid()); + + + assertEquals(actual.getName(), expected.getName()); + assertEquals(actual.getQualifiedName(), expected.getQualifiedName()); + assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION), + getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION)); + + assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP), + getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP)); + } + + private AtlasCluster getCluster(String name, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) { + AtlasCluster cluster = new AtlasCluster(name, name); + + Map<String, Object> syncMap = new HashMap<>(); + syncMap.put("operation", operation); + syncMap.put("nextModifiedTimestamp", nextModifiedTimestamp); + syncMap.put("targetCluster", targetClusterName); + + String syncMapJson = AtlasType.toJson(syncMap); + String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntity); + cluster.setAdditionalInfo(topLevelEntitySpecificKey, syncMapJson); + return cluster; + } + + private Map<String, Object> getAdditionalInfo(AtlasCluster cluster, String topLevelEntityName) { + String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntityName); + assertTrue(cluster.getAdditionalInfo().containsKey(topLevelEntitySpecificKey)); + + String json = cluster.getAdditionalInfo(topLevelEntitySpecificKey); + return AtlasType.fromJson(json, Map.class); + } + + private String getTopLevelEntitySpecificKey(String topLevelEntity) { + return String.format("%s:%s", AtlasCluster.SYNC_INFO_KEY, topLevelEntity); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java index 41c8486..d5984df 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java @@ -18,16 +18,23 @@ package org.apache.atlas.repository.impexp; -import com.google.inject.Inject; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.testng.SkipException; +import java.util.Arrays; + +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadEntity; import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -36,6 +43,23 @@ public class ExportImportTestBase { protected DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class); + protected int createEntities(AtlasEntityStoreV2 entityStore, String subDir, String entityFileNames[]) { + for (String fileName : entityFileNames) { + createAtlasEntity(entityStore, loadEntity(subDir, fileName)); + } + + return entityFileNames.length; + } + + protected void verifyCreatedEntities(AtlasEntityStoreV2 entityStore, Object[] entityGuids, int expectedNumberOfEntitiesCreated) { + try { + AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(Arrays.asList((String[]) entityGuids)); + assertEquals(entities.getEntities().size(), expectedNumberOfEntitiesCreated); + } catch (AtlasBaseException e) { + throw new SkipException(String.format("getByIds: could not load '%s'", entityGuids.toString())); + } + } + protected void assertAuditEntry(ExportImportAuditService auditService) { AtlasSearchResult result = null; try { http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java new file mode 100644 index 0000000..edd4f63 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + + +import org.apache.atlas.RequestContext; +import org.apache.atlas.TestModules; +import org.apache.atlas.TestUtilsV2; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; +import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.TestResourceFileUtils; +import org.testng.SkipException; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.Map; + +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.AssertJUnit.fail; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class ExportSkipLineageTest extends ExportImportTestBase { + private final String ENTITIES_SUB_DIR = "stocksDB-Entities"; + private final String DB_GUID = "1637a33e-6512-447b-ade7-249c8cb5344b"; + private final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8"; + private final String TABLE_TABLE_GUID = "6f3b305a-c459-4ae4-b651-aee0deb0685f"; + private final String TABLE_VIEW_GUID = "56415119-7cb0-40dd-ace8-1e50efd54991"; + + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private EntityGraphMapper graphMapper; + + @Inject + ExportService exportService; + + private DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class); + private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); + private AtlasEntityStoreV2 entityStore; + + @BeforeClass + public void setup() throws IOException, AtlasBaseException { + loadBaseModel(typeDefStore, typeRegistry); + loadHiveModel(typeDefStore, typeRegistry); + + entityStore = new AtlasEntityStoreV2(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper); + createEntities(entityStore, ENTITIES_SUB_DIR, new String[]{"db", "table-columns", "table-view", "table-table-lineage"}); + final Object[] entityGuids = new Object[]{DB_GUID, TABLE_GUID, TABLE_TABLE_GUID, TABLE_VIEW_GUID}; + verifyCreatedEntities(entityStore, entityGuids, 4); + } + + @BeforeMethod + public void setupTest() { + RequestContext.clear(); + RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); + } + + @Test + public void exportWithoutLineage() { + final int expectedEntityCount = 3; + + AtlasExportRequest request = getRequest(); + ZipSource source = runExportWithParameters(exportService, request); + AtlasEntity.AtlasEntityWithExtInfo entities = ZipFileResourceTestUtils.getEntities(source, expectedEntityCount); + + int count = 0; + for (Map.Entry<String, AtlasEntity> entry : entities.getReferredEntities().entrySet()) { + assertNotNull(entry.getValue()); + if(entry.getValue().getTypeName().equals("hive_process")) { + fail("Process entities should not be part of export!"); + } + count++; + } + + assertEquals(count, expectedEntityCount); + } + + private AtlasExportRequest getRequest() { + final String filename = "export-skip-lineage"; + try { + AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, filename, AtlasExportRequest.class); + + return request; + } catch (IOException e) { + throw new SkipException(String.format("getRequest: '%s' could not be laoded.", filename)); + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java index 34bb9f7..bbf3f63 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java @@ -166,23 +166,6 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { return request; } - private AtlasEntity.AtlasEntityWithExtInfo getEntities(ZipSource source, int expectedCount) { - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(); - try { - int count = 0; - for(String s : source.getCreationOrder()) { - AtlasEntity entity = source.getByGuid(s); - entityWithExtInfo.addReferredEntity(s, entity); - count++; - } - - assertEquals(count, expectedCount); - return entityWithExtInfo; - } catch (AtlasBaseException e) { - throw new SkipException("getEntities: failed!"); - } - } - private AtlasExportRequest getExportRequestWithReplicationOption() { try { AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_FILE, AtlasExportRequest.class); http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/test/java/org/apache/atlas/repository/impexp/UniqueListTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/UniqueListTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/UniqueListTest.java index 93aa518..2118df9 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/UniqueListTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/UniqueListTest.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.repository.impexp; +import org.apache.atlas.repository.util.UniqueList; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -24,11 +25,11 @@ import static org.testng.Assert.assertEquals; public class UniqueListTest { private final String firstElement = "firstElement"; - private ExportService.UniqueList<String> uniqueList; + private UniqueList<String> uniqueList; @BeforeClass public void setup() { - uniqueList = new ExportService.UniqueList(); + uniqueList = new UniqueList(); uniqueList.add(firstElement); uniqueList.add("def"); uniqueList.add("firstElement"); @@ -42,7 +43,7 @@ public class UniqueListTest { @Test public void addAllList_ListHas2() { - ExportService.UniqueList<String> uniqueList2 = new ExportService.UniqueList<>(); + UniqueList<String> uniqueList2 = new UniqueList<>(); uniqueList2.addAll(uniqueList); assertEquals(3, uniqueList2.size()); @@ -50,7 +51,7 @@ public class UniqueListTest { @Test public void attemptClear_SizeIsZero() { - ExportService.UniqueList<String> uniqueList2 = new ExportService.UniqueList<>(); + UniqueList<String> uniqueList2 = new UniqueList<>(); uniqueList2.addAll(uniqueList); uniqueList2.clear(); @@ -59,7 +60,7 @@ public class UniqueListTest { @Test public void attemptOneRemove_SizeIsReduced() { - ExportService.UniqueList<String> uniqueList2 = new ExportService.UniqueList<>(); + UniqueList<String> uniqueList2 = new UniqueList<>(); uniqueList2.addAll(uniqueList); String removedElement = uniqueList2.remove(0); http://git-wip-us.apache.org/repos/asf/atlas/blob/4ff57281/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java index fe2868d..b5c67f0 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java @@ -253,6 +253,23 @@ public class ZipFileResourceTestUtils { return r; } + public static AtlasEntity.AtlasEntityWithExtInfo getEntities(ZipSource source, int expectedCount) { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(); + try { + int count = 0; + for(String s : source.getCreationOrder()) { + AtlasEntity entity = source.getByGuid(s); + entityWithExtInfo.addReferredEntity(s, entity); + count++; + } + + assertEquals(count, expectedCount); + return entityWithExtInfo; + } catch (AtlasBaseException e) { + throw new SkipException("getEntities: failed!"); + } + } + public static void loadModelFromJson(String fileName, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { AtlasTypesDef typesFromJson = getAtlasTypesDefFromFile(fileName); addReplicationAttributes(typesFromJson);
