ATLAS-2814: Cluster stores replication details.
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/b9aa6d5d Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/b9aa6d5d Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/b9aa6d5d Branch: refs/heads/branch-0.8 Commit: b9aa6d5d36191544174cdd7ee97cda4474aa2ca8 Parents: 9bdbb31 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Thu Aug 16 12:11:39 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Thu Aug 16 22:07:23 2018 -0700 ---------------------------------------------------------------------- .../004-base_model_replication_attributes.json | 10 +- .../java/org/apache/atlas/AtlasBaseClient.java | 10 +- .../atlas/model/clusterinfo/AtlasCluster.java | 115 -------------- .../apache/atlas/model/impexp/AtlasCluster.java | 155 +++++++++++++++++++ .../atlas/model/impexp/AtlasExportResult.java | 16 ++ .../atlas/model/impexp/AtlasImportResult.java | 9 ++ .../graph/GraphToTypedInstanceMapper.java | 18 ++- .../atlas/repository/impexp/AuditsWriter.java | 82 +++++----- .../atlas/repository/impexp/ClusterService.java | 81 ++++++---- .../impexp/ExportImportAuditService.java | 77 ++++++++- .../atlas/repository/impexp/ExportService.java | 46 +++--- .../atlas/repository/impexp/ImportService.java | 12 ++ .../repository/impexp/ImportTransformer.java | 24 +++ .../atlas/repository/ogm/AtlasClusterDTO.java | 4 +- .../apache/atlas/repository/ogm/DataAccess.java | 16 ++ .../ogm/ExportImportAuditEntryDTO.java | 41 +++-- .../store/graph/v1/EntityGraphMapper.java | 9 ++ .../repository/impexp/ClusterServiceTest.java | 58 +++---- .../impexp/ExportImportAuditServiceTest.java | 28 ++-- .../repository/impexp/ExportImportTestBase.java | 22 ++- .../impexp/ExportIncrementalTest.java | 3 - .../repository/impexp/ImportServiceTest.java | 2 +- .../repository/impexp/ImportTransformsTest.java | 16 ++ .../impexp/ReplicationEntityAttributeTest.java | 59 ++++--- .../stocksDB-Entities/replicationAttrs.json | 5 +- .../typesystem/types/AttributeDefinition.java | 5 + .../atlas/typesystem/types/AttributeInfo.java | 23 ++- .../atlas/web/resources/AdminResource.java | 28 ++-- .../web/resources/AdminExportImportTestIT.java | 47 +++++- .../test/resources/json/export-incremental.json | 4 +- webapp/src/test/resources/stocks-base.zip | Bin 13166 -> 17706 bytes 31 files changed, 692 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/addons/models/patches/004-base_model_replication_attributes.json ---------------------------------------------------------------------- diff --git a/addons/models/patches/004-base_model_replication_attributes.json b/addons/models/patches/004-base_model_replication_attributes.json index bee3718..1b49ff7 100644 --- a/addons/models/patches/004-base_model_replication_attributes.json +++ b/addons/models/patches/004-base_model_replication_attributes.json @@ -13,7 +13,10 @@ "cardinality": "SET", "isIndexable": false, "isOptional": true, - "isUnique": false + "isUnique": false, + "options": { + "isSoftReference": "true" + } }, { "name": "replicatedToCluster", @@ -21,7 +24,10 @@ "cardinality": "SET", "isIndexable": false, "isOptional": true, - "isUnique": false + "isUnique": false, + "options": { + "isSoftReference": "true" + } } ] } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java ---------------------------------------------------------------------- diff --git a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java index f73ba2c..a529380 100644 --- a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java +++ b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java @@ -27,7 +27,6 @@ import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; import com.sun.jersey.api.json.JSONConfiguration; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; -import com.sun.jersey.core.util.MultivaluedMapImpl; import com.sun.jersey.multipart.BodyPart; import com.sun.jersey.multipart.FormDataBodyPart; import com.sun.jersey.multipart.FormDataMultiPart; @@ -35,7 +34,7 @@ import com.sun.jersey.multipart.MultiPart; import com.sun.jersey.multipart.file.FileDataBodyPart; import com.sun.jersey.multipart.file.StreamDataBodyPart; import com.sun.jersey.multipart.impl.MultiPartWriter; -import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; @@ -78,7 +77,7 @@ public abstract class AtlasBaseClient { public static final String ADMIN_METRICS = "admin/metrics"; public static final String ADMIN_IMPORT = "admin/import"; public static final String ADMIN_EXPORT = "admin/export"; - public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled"; + public static final String ADMIN_CLUSTER_TEMPLATE = "%sadmin/cluster/%s"; public static final String QUERY = "query"; public static final String LIMIT = "limit"; @@ -511,6 +510,11 @@ public abstract class AtlasBaseClient { return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE); } + public AtlasCluster getCluster(String clusterName) throws AtlasServiceException { + API api = new API(String.format(ADMIN_CLUSTER_TEMPLATE, BASE_URI, clusterName), HttpMethod.GET, Response.Status.OK); + return callAPI(api, AtlasCluster.class, null); + } + boolean isRetryableException(ClientHandlerException che) { return che.getCause().getClass().equals(IOException.class) || che.getCause().getClass().equals(ConnectException.class); http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java b/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java deleted file mode 100644 index 3ce50e3..0000000 --- a/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.model.clusterinfo; - -import org.apache.atlas.model.AtlasBaseModelObject; -import org.codehaus.jackson.annotate.JsonAutoDetect; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; -import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; - -@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class AtlasCluster extends AtlasBaseModelObject implements Serializable { - private static final long serialVersionUID = 1L; - - public static final String SYNC_INFO_KEY = "syncInfo"; - public static final String OPERATION = "operation"; - public static final String NEXT_MODIFIED_TIMESTAMP = "nextModifiedTimestamp"; - - private String name; - private String qualifiedName; - private Map<String, String> additionalInfo; - private List<String> urls; - - public AtlasCluster() { - urls = new ArrayList<>(); - } - - public AtlasCluster(String name, String qualifiedName) { - this.name = name; - this.qualifiedName = qualifiedName; - } - - public void setName(String name) { - this.name = name; - } - - public String getName() { - return this.name; - } - - public void setAdditionalInfo(Map<String, String> additionalInfo) { - if(this.additionalInfo == null) { - this.additionalInfo = new HashMap<>(); - } - - this.additionalInfo = additionalInfo; - } - - public void setAdditionalInfo(String key, String value) { - if(this.additionalInfo == null) { - this.additionalInfo = new HashMap<>(); - } - - additionalInfo.put(key, value); - } - - public Map<String, String> getAdditionalInfo() { - return this.additionalInfo; - } - - public String getAdditionalInfo(String key) { - return additionalInfo.get(key); - } - - public String getQualifiedName() { - return qualifiedName; - } - - public void setQualifiedName(String qualifiedName) { - this.qualifiedName = qualifiedName; - } - - public void setUrls(List<String> urls) { - this.urls = urls; - } - - public List<String> getUrls() { - return this.urls; - } - - @Override - public StringBuilder toString(StringBuilder sb) { - sb.append(", name=").append(name); - sb.append(", qualifiedName=").append(getQualifiedName()); - sb.append(", urls=").append(urls); - sb.append(", additionalInfo=").append(additionalInfo); - sb.append("}"); - return sb; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java new file mode 100644 index 0000000..320c0c7 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.impexp; + +import org.apache.atlas.model.AtlasBaseModelObject; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.type.AtlasType; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AtlasCluster extends AtlasBaseModelObject implements Serializable { + private static final long serialVersionUID = 1L; + + public static final String KEY_REPLICATION_DETAILS = "REPL_DETAILS"; + + private String name; + private String qualifiedName; + private Map<String, String> additionalInfo; + private List<String> urls; + + public AtlasCluster() { + urls = new ArrayList<>(); + additionalInfo = new HashMap<>(); + } + + public AtlasCluster(String name, String qualifiedName) { + this(); + this.name = name; + this.qualifiedName = qualifiedName; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return this.name; + } + + public void setAdditionalInfo(Map<String, String> additionalInfo) { + this.additionalInfo = additionalInfo; + } + + public void setAdditionalInfo(String key, String value) { + if(additionalInfo == null) { + additionalInfo = new HashMap<>(); + } + + additionalInfo.put(key, value); + } + + public void setAdditionalInfoRepl(String guid, long modifiedTimestamp) { + Map<String, Object> replicationDetailsMap = null; + + if(additionalInfo != null && additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) { + replicationDetailsMap = AtlasType.fromJson(getAdditionalInfo().get(KEY_REPLICATION_DETAILS), Map.class); + } + + if(replicationDetailsMap == null) { + replicationDetailsMap = new HashMap<>(); + } + + if(modifiedTimestamp == 0) { + replicationDetailsMap.remove(guid); + } else { + replicationDetailsMap.put(guid, modifiedTimestamp); + } + + updateReplicationMap(replicationDetailsMap); + } + + private void updateReplicationMap(Map<String, Object> replicationDetailsMap) { + String json = AtlasType.toJson(replicationDetailsMap); + setAdditionalInfo(KEY_REPLICATION_DETAILS, json); + } + + + public Object getAdditionalInfoRepl(String guid) { + if(additionalInfo == null || !additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) { + return null; + } + + String key = guid; + String mapJson = additionalInfo.get(KEY_REPLICATION_DETAILS); + + Map<String, String> replicationDetailsMap = AtlasType.fromJson(mapJson, Map.class); + if(!replicationDetailsMap.containsKey(key)) { + return null; + } + + return replicationDetailsMap.get(key); + } + + public Map<String, String> getAdditionalInfo() { + return this.additionalInfo; + } + + public String getAdditionalInfo(String key) { + return additionalInfo.get(key); + } + + public String getQualifiedName() { + return qualifiedName; + } + + public void setQualifiedName(String qualifiedName) { + this.qualifiedName = qualifiedName; + } + + public void setUrls(List<String> urls) { + this.urls = urls; + } + + public List<String> getUrls() { + return this.urls; + } + + @Override + public StringBuilder toString(StringBuilder sb) { + sb.append(", name=").append(name); + sb.append(", qualifiedName=").append(getQualifiedName()); + sb.append(", urls=").append(urls); + sb.append(", additionalInfo=").append(additionalInfo); + sb.append("}"); + return sb; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java index 85a606c..fd68712 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java @@ -169,6 +169,22 @@ public class AtlasExportResult implements Serializable { metrics.put(key, currentValue + incrementBy); } + public AtlasExportResult shallowCopy() { + AtlasExportResult result = new AtlasExportResult(); + + result.setRequest(getRequest()); + result.setUserName(getUserName()); + result.setClientIpAddress(getClientIpAddress()); + result.setHostName(getHostName()); + result.setTimeStamp(getTimeStamp()); + result.setMetrics(getMetrics()); + result.setOperationStatus(getOperationStatus()); + result.setSourceClusterName(getSourceClusterName()); + result.setLastModifiedTimestamp(getLastModifiedTimestamp()); + + return result; + } + public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java index bfb7637..f066688 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java @@ -54,6 +54,7 @@ public class AtlasImportResult { private Map<String, Integer> metrics; private List<String> processedEntities; private OperationStatus operationStatus; + private AtlasExportResult exportResultWithoutData; public AtlasImportResult() { this(null, null, null, null, System.currentTimeMillis()); @@ -141,6 +142,14 @@ public class AtlasImportResult { public List<String> getProcessedEntities() { return this.processedEntities; } + public AtlasExportResult getExportResult() { + return exportResultWithoutData; + } + + public void setExportResult(AtlasExportResult exportResult) { + this.exportResultWithoutData = exportResult.shallowCopy(); + } + public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java index 55f7076..78ea1c9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java @@ -251,11 +251,21 @@ public final class GraphToTypedInstanceMapper { return; } - String edgeLabel = GraphHelper.EDGE_LABEL_PREFIX + propertyName; ArrayList values = new ArrayList(); - for (Object aList : list) { - values.add(mapVertexToCollectionEntry(instanceVertex, attributeInfo, elementType, aList, - edgeLabel)); + if(!attributeInfo.isSoftRef) { + String edgeLabel = GraphHelper.EDGE_LABEL_PREFIX + propertyName; + for (Object aList : list) { + values.add(mapVertexToCollectionEntry(instanceVertex, attributeInfo, elementType, aList, + edgeLabel)); + } + } else { + for (Object o : list) { + if(o == null) { + continue; + } + + values.add(o); + } } if (values.size() > 0) { http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index 6a3fbec..467d383 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -22,19 +22,24 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.ExportImportAuditEntry; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.repository.Constants; import org.apache.atlas.type.AtlasType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import javax.inject.Inject; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -55,7 +60,9 @@ public class AuditsWriter { this.auditService = auditService; } - public void write(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entityCreationOrder) throws AtlasBaseException { + public void write(String userName, AtlasExportResult result, + long startTime, long endTime, + List<String> entityCreationOrder) throws AtlasBaseException { auditForExport.add(userName, result, startTime, endTime, entityCreationOrder); } @@ -67,15 +74,17 @@ public class AuditsWriter { return options.containsKey(replicatedKey); } - private void updateReplicationAttribute(boolean isReplicationSet, String clusterName, + private void updateReplicationAttribute(boolean isReplicationSet, + String clusterName, List<String> exportedGuids, - String attrNameReplicated) throws AtlasBaseException { - if (!isReplicationSet) { + String attrNameReplicated, + long lastModifiedTimestamp) throws AtlasBaseException { + if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) { return; } - AtlasCluster cluster = saveCluster(clusterName); - clusterService.updateEntityWithCluster(cluster, exportedGuids, attrNameReplicated); + AtlasCluster cluster = saveCluster(clusterName, exportedGuids.get(0), lastModifiedTimestamp); + clusterService.updateEntitiesWithCluster(cluster, exportedGuids, attrNameReplicated); } private String getClusterNameFromOptions(Map options, String key) { @@ -84,27 +93,14 @@ public class AuditsWriter { : ""; } - private void addAuditEntry(String userName, String sourceCluster, String targetCluster, String operation, - String result, long startTime, long endTime, boolean hasData) throws AtlasBaseException { - if(!hasData) return; - - ExportImportAuditEntry entry = new ExportImportAuditEntry(); - - entry.setUserName(userName); - entry.setSourceClusterName(sourceCluster); - entry.setTargetClusterName(targetCluster); - entry.setOperation(operation); - entry.setResultSummary(result); - entry.setStartTime(startTime); - entry.setEndTime(endTime); - - auditService.save(entry); - LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: {}", entry.getUserName(), - entry.getSourceClusterName(), entry.getTargetClusterName(), entry.getOperation()); + private AtlasCluster saveCluster(String clusterName) throws AtlasBaseException { + AtlasCluster cluster = new AtlasCluster(clusterName, clusterName); + return clusterService.save(cluster); } - private AtlasCluster saveCluster(String clusterName) throws AtlasBaseException { + private AtlasCluster saveCluster(String clusterName, String entityGuid, long lastModifiedTimestamp) throws AtlasBaseException { AtlasCluster cluster = new AtlasCluster(clusterName, clusterName); + cluster.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp); return clusterService.save(cluster); } @@ -128,22 +124,25 @@ public class AuditsWriter { public void add(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException { optionKeyReplicatedTo = AtlasExportRequest.OPTION_KEY_REPLICATED_TO; request = result.getRequest(); - cluster = saveCluster(getCurrentClusterName()); replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo); targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); - addAuditEntry(userName, - cluster.getName(), targetClusterName, + cluster = saveCluster(getCurrentClusterName()); + + auditService.add(userName, getCurrentClusterName(), targetClusterName, ExportImportAuditEntry.OPERATION_EXPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); - updateReplicationAttributeForExport(entitityGuids, request); + updateReplicationAttributeForExport(request, entitityGuids); } - private void updateReplicationAttributeForExport(List<String> entityGuids, AtlasExportRequest request) throws AtlasBaseException { - if(!replicationOptionState) return; + private void updateReplicationAttributeForExport(AtlasExportRequest request, List<String> entityGuids) throws AtlasBaseException { + if(!replicationOptionState) { + return; + } - updateReplicationAttribute(replicationOptionState, targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER); + updateReplicationAttribute(replicationOptionState, targetClusterName, + entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, 0L); } } @@ -159,12 +158,13 @@ public class AuditsWriter { request = result.getRequest(); optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM; replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom); - cluster = saveCluster(getClusterNameFromOptionsState()); - String sourceCluster = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom); - addAuditEntry(userName, - sourceCluster, cluster.getName(), - ExportImportAuditEntry.OPERATION_EXPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); + String sourceCluster = getClusterNameFromOptionsState(); + cluster = saveCluster(sourceCluster); + + auditService.add(userName, + sourceCluster, getCurrentClusterName(), + ExportImportAuditEntry.OPERATION_IMPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); updateReplicationAttributeForImport(entitityGuids); } @@ -173,13 +173,17 @@ public class AuditsWriter { if(!replicationOptionState) return; String targetClusterName = cluster.getName(); - updateReplicationAttribute(replicationOptionState, targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER); + + updateReplicationAttribute(replicationOptionState, targetClusterName, + entityGuids, + Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER, + result.getExportResult().getLastModifiedTimestamp()); } private String getClusterNameFromOptionsState() { return replicationOptionState ? getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom) - : getCurrentClusterName(); + : ""; } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java index fd8e2bf..950850e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java @@ -18,11 +18,12 @@ package org.apache.atlas.repository.impexp; +import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContextV1; import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.repository.Constants; @@ -31,10 +32,14 @@ import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityStreamForImport; import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.repository.store.graph.v1.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,43 +48,49 @@ import javax.inject.Inject; import java.util.ArrayList; import java.util.List; +import static org.apache.atlas.repository.Constants.ATTR_NAME_REFERENCEABLE; + @AtlasService public class ClusterService { private static final Logger LOG = LoggerFactory.getLogger(ClusterService.class); private final DataAccess dataAccess; private final AtlasEntityStore entityStore; + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityGraphRetriever; @Inject - public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore) { + public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever) { this.dataAccess = dataAccess; this.entityStore = entityStore; + this.typeRegistry = typeRegistry; + this.entityGraphRetriever = entityGraphRetriever; } - public AtlasCluster get(AtlasCluster cluster) { + public AtlasCluster get(AtlasCluster cluster) throws AtlasBaseException { try { return dataAccess.load(cluster); } catch (AtlasBaseException e) { LOG.error("dataAccess", e); + throw e; } - - return null; } @GraphTransaction - public AtlasCluster save(AtlasCluster clusterInfo) throws AtlasBaseException { - return dataAccess.save(clusterInfo); + public AtlasCluster save(AtlasCluster cluster) throws AtlasBaseException { + return dataAccess.save(cluster); } @GraphTransaction - public void updateEntityWithCluster(AtlasCluster cluster, List<String> guids, String attributeName) throws AtlasBaseException { - if(cluster != null && StringUtils.isEmpty(cluster.getGuid())) return; + public void updateEntitiesWithCluster(AtlasCluster cluster, List<String> entityGuids, String attributeName) throws AtlasBaseException { + if (cluster != null && StringUtils.isEmpty(cluster.getGuid())) { + return; + } AtlasObjectId objectId = getObjectId(cluster); - for (String guid : guids) { + for (String guid : entityGuids) { AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(guid); updateAttribute(entityWithExtInfo, attributeName, objectId); - entityStore.createOrUpdate(new AtlasEntityStream(entityWithExtInfo), true); } } @@ -92,33 +103,49 @@ public class ClusterService { * Attribute passed by name is updated with the value passed. * @param entityWithExtInfo Entity to be updated * @param propertyName attribute name - * @param value Value to be set for attribute + * @param objectId Value to be set for attribute */ - private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, String propertyName, Object value) { + private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, + String propertyName, + AtlasObjectId objectId) { + String value = EntityGraphMapper.getSoftRefFormattedValue(objectId); updateAttribute(entityWithExtInfo.getEntity(), propertyName, value); for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { updateAttribute(e, propertyName, value); } } - private void updateAttribute(AtlasEntity e, String propertyName, Object value) { - if(e.hasAttribute(propertyName) == false) return; - - Object oVal = e.getAttribute(propertyName); - if (oVal != null && !(oVal instanceof List)) return; + private void updateAttribute(AtlasEntity entity, String attributeName, Object value) { + if(entity.hasAttribute(attributeName) == false) return; - List list; + try { + AtlasVertex vertex = entityGraphRetriever.getEntityVertex(entity.getGuid()); + if(vertex == null) { + return; + } + + String qualifiedFieldName = getVertexPropertyName(entity, attributeName); + List list = vertex.getListProperty(qualifiedFieldName); + if (list == null) { + list = new ArrayList(); + } + + if (!list.contains(value)) { + list.add(value); + vertex.setListProperty(qualifiedFieldName, list); + } - if (oVal == null) { - list = new ArrayList(); - } else { - list = (List) oVal; } - - if (!list.contains(value)) { - list.add(value); + catch (AtlasBaseException ex) { + LOG.error("error retrieving vertex from guid: {}", entity.getGuid(), ex); + } catch (AtlasException ex) { + LOG.error("error setting property to vertex with guid: {}", entity.getGuid(), ex); } + } - e.setAttribute(propertyName, list); + private String getVertexPropertyName(AtlasEntity entity, String attributeName) throws AtlasBaseException { + AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); + AtlasStructType.AtlasAttribute attribute = type.getAttribute(attributeName); + return attribute.getVertexPropertyName(); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java index e90b6b9..8bd52e6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java @@ -27,14 +27,18 @@ import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; import org.apache.atlas.model.discovery.SearchParameters.Operator; import org.apache.atlas.model.impexp.ExportImportAuditEntry; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.ogm.ExportImportAuditEntryDTO; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; import javax.inject.Inject; import java.util.ArrayList; +import java.util.List; +import java.util.Set; @AtlasService public class ExportImportAuditService { @@ -61,16 +65,41 @@ public class ExportImportAuditService { return dataAccess.load(entry); } - public AtlasSearchResult get(String userName, String operation, String sourceCluster, String targetCluster, - String startTime, String endTime, - int limit, int offset) throws AtlasBaseException { + public List<ExportImportAuditEntry> get(String userName, String operation, String cluster, + String startTime, String endTime, + int limit, int offset) throws AtlasBaseException { FilterCriteria criteria = new FilterCriteria(FilterCriteria.Condition.AND, new ArrayList<FilterCriteria>()); - addSearchParameters(criteria, userName, operation, sourceCluster, targetCluster, startTime, endTime); + addSearchParameters(criteria, userName, operation, cluster, startTime, endTime); SearchParameters searchParameters = getSearchParameters(limit, offset, criteria); + searchParameters.setAttributes(getAuditEntityAttributes()); - return discoveryService.searchWithParameters(searchParameters); + AtlasSearchResult result = discoveryService.searchWithParameters(searchParameters); + return toExportImportAuditEntry(result); + } + + private Set<String> getAuditEntityAttributes() { + return ExportImportAuditEntryDTO.getAttributes(); + } + + private List<ExportImportAuditEntry> toExportImportAuditEntry(AtlasSearchResult result) { + List<ExportImportAuditEntry> ret = new ArrayList<>(); + if(CollectionUtils.isEmpty(result.getEntities())) { + return ret; + } + + for (AtlasEntityHeader entityHeader : result.getEntities()) { + ExportImportAuditEntry entry = ExportImportAuditEntryDTO.from(entityHeader.getGuid(), + entityHeader.getAttributes()); + if(entry == null) { + continue; + } + + ret.add(entry); + } + + return ret; } private SearchParameters getSearchParameters(int limit, int offset, FilterCriteria criteria) { @@ -84,13 +113,26 @@ public class ExportImportAuditService { } private void addSearchParameters(FilterCriteria criteria, String userName, String operation, - String sourceCluster, String targetCluster, String startTime, String endTime) { + String cluster, String startTime, String endTime) { addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_USER_NAME, userName); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_OPERATION, operation); - addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_SOURCE_CLUSTER_NAME, sourceCluster); - addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_TARGET_CLUSTER_NAME, targetCluster); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_START_TIME, startTime); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_END_TIME, endTime); + + addClusterFilterCriteria(criteria, cluster); + } + + private void addClusterFilterCriteria(FilterCriteria parentCriteria, String cluster) { + if (StringUtils.isEmpty(cluster)) { + return; + } + + FilterCriteria criteria = new FilterCriteria(FilterCriteria.Condition.OR, new ArrayList<FilterCriteria>()); + + addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_SOURCE_CLUSTER_NAME, cluster); + addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_TARGET_CLUSTER_NAME, cluster); + + parentCriteria.getCriterion().add(criteria); } private void addParameterIfValueNotEmpty(FilterCriteria criteria, String attributeName, String value) { @@ -100,4 +142,23 @@ public class ExportImportAuditService { criteria.getCriterion().add(new FilterCriteria(attributeName, Operator.EQ, value)); } + + public void add(String userName, String sourceCluster, String targetCluster, String operation, + String result, long startTime, long endTime, boolean hasData) throws AtlasBaseException { + if(!hasData) return; + + ExportImportAuditEntry entry = new ExportImportAuditEntry(); + + entry.setUserName(userName); + entry.setSourceClusterName(sourceCluster); + entry.setTargetClusterName(targetCluster); + entry.setOperation(operation); + entry.setResultSummary(result); + entry.setStartTime(startTime); + entry.setEndTime(endTime); + + save(entry); + LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: {}", entry.getUserName(), + entry.getSourceClusterName(), entry.getTargetClusterName(), entry.getOperation()); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index b15f828..b507002 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -117,11 +117,11 @@ public class ExportService { context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setTypesDef(context.result.getData().getTypesDef()); - auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder()); - clearContextData(context); + context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp); context.result.setOperationStatus(getOverallOperationStatus(statuses)); context.result.incrementMeticsCounter("duration", duration); - context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp); + auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder()); + clearContextData(context); context.sink.setResult(context.result); } @@ -186,9 +186,7 @@ public class ExportService { } private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> processObjectId({})", item); - } + debugLog("==> processObjectId({})", item); try { List<String> entityGuids = getStartingEntity(item, context); @@ -217,11 +215,16 @@ public class ExportService { return AtlasExportResult.OperationStatus.FAIL; } - if (LOG.isDebugEnabled()) { - LOG.debug("<== processObjectId({})", item); + debugLog("<== processObjectId({})", item); + return AtlasExportResult.OperationStatus.SUCCESS; + } + + private void debugLog(String s, Object... params) { + if (!LOG.isDebugEnabled()) { + return; } - return AtlasExportResult.OperationStatus.SUCCESS; + LOG.debug(s, params); } private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { @@ -322,9 +325,7 @@ public class ExportService { } private void processEntity(String guid, ExportContext context) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> processEntity({})", guid); - } + debugLog("==> processEntity({})", guid); if (!context.guidsProcessed.contains(guid)) { TraversalDirection direction = context.guidDirection.get(guid); @@ -350,9 +351,7 @@ public class ExportService { } } - if (LOG.isDebugEnabled()) { - LOG.debug("<== processEntity({})", guid); - } + debugLog("<== processEntity({})", guid); } private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { @@ -395,8 +394,8 @@ public class ExportService { for (TraversalDirection direction : directions) { String query = getQueryForTraversalDirection(direction); - if (LOG.isDebugEnabled()) { - LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); + if(LOG.isDebugEnabled()) { + debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); } context.bindings.clear(); @@ -425,8 +424,8 @@ public class ExportService { } } - if (LOG.isDebugEnabled()) { - LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + if(LOG.isDebugEnabled()) { + debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); } } } @@ -443,8 +442,8 @@ public class ExportService { } private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); + if(LOG.isDebugEnabled()) { + debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); } String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); @@ -469,8 +468,9 @@ public class ExportService { } } - if (LOG.isDebugEnabled()) { - LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + if(LOG.isDebugEnabled()) { + debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", + entity.getGuid(), result.size(), context.guidsToProcess.size()); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 98ef389..8a184fa 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -26,6 +26,7 @@ import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; @@ -111,6 +112,16 @@ public class ImportService { updateTransformsWithSubTypes(importTransform); source.setImportTransform(importTransform); + + if(LOG.isDebugEnabled()) { + debugLog(" => transforms: {}", AtlasType.toJson(importTransform)); + } + } + + private void debugLog(String s, Object... params) { + if(!LOG.isDebugEnabled()) return; + + LOG.debug(s, params); } private void updateTransformsWithSubTypes(ImportTransforms importTransforms) throws AtlasBaseException { @@ -189,6 +200,7 @@ public class ImportService { endTimestamp = System.currentTimeMillis(); result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp)); + result.setExportResult(importSource.getExportResult()); auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder()); } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java index 348bcd2..dc71c2a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java @@ -36,6 +36,7 @@ public abstract class ImportTransformer { private static final String TRANSFORMER_NAME_UPPERCASE = "uppercase"; private static final String TRANSFORMER_NAME_REMOVE_CLASSIFICATION = "removeClassification"; private static final String TRANSFORMER_NAME_REPLACE = "replace"; + private static final String TRANSFORMER_SET_DELETED = "setDeleted"; private final String transformType; @@ -65,6 +66,8 @@ public abstract class ImportTransformer { } else if (key.equals(TRANSFORMER_NAME_CLEAR_ATTR)) { String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length); ret = new ClearAttributes(name); + } else if (key.equals(TRANSFORMER_SET_DELETED)) { + ret = new SetDeleted(); } else { throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec); } @@ -291,4 +294,25 @@ public abstract class ImportTransformer { return entity; } } + + static class SetDeleted extends ImportTransformer { + protected SetDeleted() { + super(TRANSFORMER_SET_DELETED); + } + + @Override + public Object apply(Object o) { + if (o == null) { + return o; + } + + if (!(o instanceof AtlasEntity)) { + return o; + } + + AtlasEntity entity = (AtlasEntity) o; + entity.setStatus(AtlasEntity.Status.DELETED); + return entity; + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java index 424fb88..a96ca49 100644 --- a/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java @@ -18,8 +18,7 @@ package org.apache.atlas.repository.ogm; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.type.AtlasTypeRegistry; @@ -60,6 +59,7 @@ public class AtlasClusterDTO extends AbstractDataTransferObject<AtlasCluster> { entity.setAttribute(PROPERTY_CLUSTER_NAME, obj.getName()); entity.setAttribute(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName()); entity.setAttribute(PROPERTY_ADDITIONAL_INFO, obj.getAdditionalInfo()); + entity.setAttribute(PROPERTY_URLS, obj.getUrls()); return entity; } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java b/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java index b7e943f..63b345e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java @@ -71,6 +71,22 @@ public class DataAccess { return dto.from(entityWithExtInfo); } + public <T extends AtlasBaseModelObject> T load(String guid, Class<? extends AtlasBaseModelObject> clazz) throws AtlasBaseException { + DataTransferObject<T> dto = (DataTransferObject<T>)dtoRegistry.get(clazz); + + AtlasEntityWithExtInfo entityWithExtInfo = null; + + if (StringUtils.isNotEmpty(guid)) { + entityWithExtInfo = entityStore.getById(guid); + } + + if(entityWithExtInfo == null) { + return null; + } + + return dto.from(entityWithExtInfo); + } + public void deleteUsingGuid(String guid) throws AtlasBaseException { entityStore.deleteById(guid); } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java index 8d1aebf..963ca51 100644 --- a/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java @@ -19,11 +19,14 @@ package org.apache.atlas.repository.ogm; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.impexp.ExportImportAuditEntry; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.type.AtlasTypeRegistry; +import java.util.Arrays; +import java.util.HashSet; import java.util.Map; +import java.util.Set; public class ExportImportAuditEntryDTO extends AbstractDataTransferObject<ExportImportAuditEntry> { @@ -36,28 +39,42 @@ public class ExportImportAuditEntryDTO extends AbstractDataTransferObject<Export public static final String PROPERTY_SOURCE_CLUSTER_NAME = "sourceClusterName"; public static final String PROPERTY_TARGET_CLUSTER_NAME = "targetClusterName"; + private static final Set<String> ATTRIBUTE_NAMES = new HashSet<>(Arrays.asList(PROPERTY_USER_NAME, + PROPERTY_OPERATION, PROPERTY_OPERATION_PARAMS, + PROPERTY_START_TIME, PROPERTY_END_TIME, + PROPERTY_RESULT_SUMMARY, + PROPERTY_SOURCE_CLUSTER_NAME, PROPERTY_TARGET_CLUSTER_NAME)); + protected ExportImportAuditEntryDTO(AtlasTypeRegistry typeRegistry) { super(typeRegistry, ExportImportAuditEntry.class); } - @Override - public ExportImportAuditEntry from(AtlasEntity entity) { + public static Set<String> getAttributes() { + return ATTRIBUTE_NAMES; + } + + public static ExportImportAuditEntry from(String guid, Map<String,Object> attributes) { ExportImportAuditEntry entry = new ExportImportAuditEntry(); - setGuid(entry, entity); - entry.setUserName((String) entity.getAttribute(PROPERTY_USER_NAME)); - entry.setOperation((String) entity.getAttribute(PROPERTY_OPERATION)); - entry.setOperationParams((String) entity.getAttribute(PROPERTY_OPERATION_PARAMS)); - entry.setStartTime((long) entity.getAttribute(PROPERTY_START_TIME)); - entry.setEndTime((long) entity.getAttribute(PROPERTY_END_TIME)); - entry.setSourceClusterName((String) entity.getAttribute(PROPERTY_SOURCE_CLUSTER_NAME)); - entry.setTargetClusterName((String) entity.getAttribute(PROPERTY_TARGET_CLUSTER_NAME)); - entry.setResultSummary((String) entity.getAttribute(PROPERTY_RESULT_SUMMARY)); + entry.setGuid(guid); + entry.setUserName((String) attributes.get(PROPERTY_USER_NAME)); + entry.setOperation((String) attributes.get(PROPERTY_OPERATION)); + entry.setOperationParams((String) attributes.get(PROPERTY_OPERATION_PARAMS)); + entry.setStartTime((long) attributes.get(PROPERTY_START_TIME)); + entry.setEndTime((long) attributes.get(PROPERTY_END_TIME)); + entry.setSourceClusterName((String) attributes.get(PROPERTY_SOURCE_CLUSTER_NAME)); + entry.setTargetClusterName((String) attributes.get(PROPERTY_TARGET_CLUSTER_NAME)); + entry.setResultSummary((String) attributes.get(PROPERTY_RESULT_SUMMARY)); return entry; } @Override + public ExportImportAuditEntry from(AtlasEntity entity) { + return from(entity.getGuid(), entity.getAttributes()); + } + + @Override public ExportImportAuditEntry from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { return from(entityWithExtInfo.getEntity()); } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index fcdd379..ac2d4c9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -959,6 +959,7 @@ public class EntityGraphMapper { for (AtlasClassification classification : classifications) { addClassificationWithNoMetadataUpdate(context, instanceVertex, entityType, classification); } + updateModificationMetadata(instanceVertex); } } @@ -1100,4 +1101,12 @@ public class EntityGraphMapper { return ret; } + + public static String getSoftRefFormattedValue(AtlasObjectId objectId) { + return getSoftRefFormattedString(objectId.getTypeName(), objectId.getGuid()); + } + + private static String getSoftRefFormattedString(String typeName, String resolvedGuid) { + return String.format(SOFT_REF_FORMAT, typeName, resolvedGuid); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java index cfd272f..6f624c3 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java @@ -20,10 +20,10 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.TestModules; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; -import org.apache.atlas.repository.impexp.ClusterService; +import org.apache.atlas.model.impexp.AtlasCluster; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.repository.Constants; import org.apache.atlas.store.AtlasTypeDefStore; -import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; @@ -34,22 +34,24 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static org.apache.atlas.repository.Constants.ATTR_NAME_REFERENCEABLE; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; @Guice(modules = TestModules.TestOnlyModule.class) public class ClusterServiceTest { private final String TOP_LEVEL_ENTITY_NAME = "db1@cl1"; private final String CLUSTER_NAME = "testCl1"; private final String TARGET_CLUSTER_NAME = "testCl2"; + private final String QUALIFIED_NAME_STOCKS = "stocks@cl1"; + private final String TYPE_HIVE_DB = "hive_db"; private AtlasTypeDefStore typeDefStore; private AtlasTypeRegistry typeRegistry; private ClusterService clusterService; + private String topLevelEntityGuid = "AAA-BBB-CCC"; @Inject public void UserProfileServiceTest(AtlasTypeRegistry typeRegistry, @@ -67,8 +69,8 @@ public class ClusterServiceTest { @Test public void saveAndRetrieveClusterInfo() throws AtlasBaseException { - AtlasCluster expected = getCluster(CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME); - AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME); + AtlasCluster expected = getCluster(CLUSTER_NAME + "_1", TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME); + AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME + "_1", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME); AtlasCluster expected3 = getCluster(TARGET_CLUSTER_NAME + "_3", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0, TARGET_CLUSTER_NAME); AtlasCluster actual = clusterService.save(expected); @@ -86,36 +88,38 @@ public class ClusterServiceTest { assertEquals(actual.getName(), expected.getName()); assertEquals(actual.getQualifiedName(), expected.getQualifiedName()); - assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION), - getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION)); - - assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP), - getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP)); } - private AtlasCluster getCluster(String name, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) { - AtlasCluster cluster = new AtlasCluster(name, name); + private AtlasCluster getCluster(String clusterName, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) { + AtlasCluster cluster = new AtlasCluster(clusterName, clusterName); + + Map<String, String> syncMap = new HashMap<>(); - Map<String, Object> syncMap = new HashMap<>(); + syncMap.put("topLevelEntity", topLevelEntity); syncMap.put("operation", operation); - syncMap.put("nextModifiedTimestamp", nextModifiedTimestamp); + syncMap.put("nextModifiedTimestamp", Long.toString(nextModifiedTimestamp)); syncMap.put("targetCluster", targetClusterName); - String syncMapJson = AtlasType.toJson(syncMap); - String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntity); - cluster.setAdditionalInfo(topLevelEntitySpecificKey, syncMapJson); + cluster.setAdditionalInfo(syncMap); + return cluster; } - private Map<String, Object> getAdditionalInfo(AtlasCluster cluster, String topLevelEntityName) { - String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntityName); - assertTrue(cluster.getAdditionalInfo().containsKey(topLevelEntitySpecificKey)); + @Test + public void verifyAdditionalInfo() throws AtlasBaseException { + final long expectedLastModifiedTimestamp = 200L; + + AtlasCluster expectedCluster = new AtlasCluster(CLUSTER_NAME, CLUSTER_NAME); - String json = cluster.getAdditionalInfo(topLevelEntitySpecificKey); - return AtlasType.fromJson(json, Map.class); - } + String qualifiedNameAttr = Constants.QUALIFIED_NAME.replace(ATTR_NAME_REFERENCEABLE, ""); + AtlasObjectId objectId = new AtlasObjectId(TYPE_HIVE_DB, qualifiedNameAttr, QUALIFIED_NAME_STOCKS); + expectedCluster.setAdditionalInfoRepl(topLevelEntityGuid, expectedLastModifiedTimestamp); + + AtlasCluster actualCluster = clusterService.save(expectedCluster); + assertEquals(actualCluster.getName(), expectedCluster.getName()); + + int actualModifiedTimestamp = (int) actualCluster.getAdditionalInfoRepl(topLevelEntityGuid); - private String getTopLevelEntitySpecificKey(String topLevelEntity) { - return String.format("%s:%s", AtlasCluster.SYNC_INFO_KEY, topLevelEntity); + assertEquals(actualModifiedTimestamp, expectedLastModifiedTimestamp); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java index d0188dd..13277a3 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java @@ -20,9 +20,7 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.TestModules; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.impexp.ExportImportAuditEntry; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; @@ -32,14 +30,16 @@ import org.testng.annotations.Test; import javax.inject.Inject; import java.io.IOException; +import java.util.List; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; @Guice(modules = TestModules.TestOnlyModule.class) -public class ExportImportAuditServiceTest { +public class ExportImportAuditServiceTest extends ExportImportTestBase { @Inject AtlasTypeRegistry typeRegistry; @@ -70,7 +70,7 @@ public class ExportImportAuditServiceTest { String target2 = "clx1"; ExportImportAuditEntry entry2 = saveAndGet(source2, ExportImportAuditEntry.OPERATION_EXPORT, target2); - Thread.sleep(1000); + pauseForIndexCreation(); ExportImportAuditEntry actualEntry = retrieveEntry(entry); ExportImportAuditEntry actualEntry2 = retrieveEntry(entry2); @@ -81,7 +81,7 @@ public class ExportImportAuditServiceTest { assertEquals(actualEntry.getOperation(), entry.getOperation()); } - @Test(enabled = false) + @Test public void numberOfSavedEntries_Retrieved() throws AtlasBaseException, InterruptedException { final String source1 = "cluster1"; final String target1 = "cly"; @@ -91,18 +91,20 @@ public class ExportImportAuditServiceTest { saveAndGet(source1, ExportImportAuditEntry.OPERATION_EXPORT, target1); } - Thread.sleep(1000); - AtlasSearchResult results = auditService.get(source1, ExportImportAuditEntry.OPERATION_EXPORT, "", "", "", "", 10, 0); - assertEquals(results.getEntities().size(), MAX_ENTRIES); + pauseForIndexCreation(); + List<ExportImportAuditEntry> results = auditService.get("", + ExportImportAuditEntry.OPERATION_EXPORT, + "", "", "", 10, 0); + assertTrue(results.size() > 0); } - private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) throws AtlasBaseException { - AtlasSearchResult result = auditService.get(entry.getUserName(), entry.getOperation(), entry.getSourceClusterName(), - entry.getTargetClusterName(), Long.toString(entry.getStartTime()), "", 10, 0); + List<ExportImportAuditEntry> result = auditService.get(entry.getUserName(), entry.getOperation(), + entry.getSourceClusterName(), + Long.toString(entry.getStartTime()), "", 10, 0); assertNotNull(result); - assertEquals(result.getEntities().size(), 1); - entry.setGuid(result.getEntities().get(0).getGuid()); + assertEquals(result.size(), 1); + entry.setGuid(result.get(0).getGuid()); return auditService.get(entry); } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java index fcf90d3..37c0443 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java @@ -23,8 +23,10 @@ import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityChangeNotifier; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; @@ -34,6 +36,7 @@ import org.testng.SkipException; import scala.actors.threadpool.Arrays; import java.io.IOException; +import java.util.List; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; @@ -54,6 +57,7 @@ public class ExportImportTestBase { protected static final String COLUMN_GUID_HIGH = "f87a5320-1529-4369-8d63-b637ebdf2c1c"; protected DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class); + protected AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); protected void basicSetup(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { loadBaseModel(typeDefStore, typeRegistry); @@ -77,20 +81,28 @@ public class ExportImportTestBase { } } - protected void assertAuditEntry(ExportImportAuditService auditService) { - AtlasSearchResult result = null; + protected void assertAuditEntry(ExportImportAuditService auditService) throws InterruptedException { + pauseForIndexCreation(); + List<ExportImportAuditEntry> result = null; try { - result = auditService.get("", "", "", "", "", "", 10, 0); + result = auditService.get("", "", "", "", "", 10, 0); } catch (AtlasBaseException e) { fail("auditService.get: failed!"); } assertNotNull(result); - assertNotNull(result.getEntities()); - assertTrue(result.getEntities().size() > 0); + assertTrue(result.size() > 0); } private String getCurrentCluster() throws AtlasException { return ApplicationProperties.get().getString(AtlasConstants.CLUSTER_NAME_KEY, "default"); } + + protected void pauseForIndexCreation() { + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + throw new SkipException("pause interrupted."); + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java index 86ab222..ed4fc37 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java @@ -60,9 +60,6 @@ public class ExportIncrementalTest extends ExportImportTestBase { ExportService exportService; @Inject - ClusterService clusterService; - - @Inject private AtlasEntityStoreV1 entityStore; private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental"; http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index dc25e92..08bbcd2 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -80,7 +80,7 @@ public class ImportServiceTest extends ExportImportTestBase { } @AfterTest - public void postTest() { + public void postTest() throws InterruptedException { assertAuditEntry(auditService); } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java index 9b38922..b241dda 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java @@ -45,6 +45,7 @@ public class ImportTransformsTest { private final String jsonReplaceAndAddAttrValue = "{ \"hive_table\": { \"qualifiedName\":[ \"replace:@%s:@%s\"], \"*\":[ \"add:%s=list:%s\" ] } }"; private final String jsonSingleClearAttrValue = "{ \"hive_table\": { \"*\":[ \"clearAttrValue:replicatedToCluster\", \"clearAttrValue:replicatedFromCluster\" ] } }"; private final String jsonMultipleClearAttrValue = "{ \"hive_table\": { \"*\":[ \"clearAttrValue:replicatedToCluster,replicatedFromCluster\" ] } }"; + private final String jsonSetDeleted = "{ \"hive_table\": { \"*\":[ \"setDeleted\" ] } }"; private ImportTransforms transform; private String HIVE_TABLE_ATTR_SYNC_INFO = "hive_table.syncInfo"; @@ -178,6 +179,20 @@ public class ImportTransformsTest { assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO)); } + @Test + public void setDeleted_SetsStatusToDeleted() throws AtlasBaseException { + AtlasEntity entity = getHiveTableAtlasEntity(); + assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); + ImportTransforms t = ImportTransforms.fromJson(jsonSetDeleted); + + assertTrue(t.getTransforms().size() > 0); + + t.apply(entity); + assertNotNull(t); + assertEquals(entity.getStatus(), AtlasEntity.Status.DELETED); + } + + private String[] getExtEntityExpectedValues(AtlasEntityWithExtInfo entityWithExtInfo) { String[] ret = new String[entityWithExtInfo.getReferredEntities().size()]; @@ -205,6 +220,7 @@ public class ImportTransformsTest { private AtlasEntity getHiveTableAtlasEntity() { AtlasEntity entity = new AtlasEntity("hive_table"); + entity.setStatus(AtlasEntity.Status.ACTIVE); Map<String, Object> attributes = new HashMap<>(); attributes.put(ATTR_NAME_QUALIFIED_NAME, "TABLE1.default" + lowerCaseCL1); http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java index 881368c..90232b7 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java @@ -24,15 +24,22 @@ import org.apache.atlas.RequestContextV1; import org.apache.atlas.TestModules; import org.apache.atlas.TestUtilsV2; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.store.graph.v1.AtlasEntityChangeNotifier; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1; import org.apache.atlas.repository.store.graph.v1.EntityGraphMapper; import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.TestResourceFileUtils; import org.testng.SkipException; @@ -46,15 +53,13 @@ import java.io.IOException; import java.util.List; import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO; -import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; -import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadEntity; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters; -import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; @Guice(modules = TestModules.TestOnlyModule.class) public class ReplicationEntityAttributeTest extends ExportImportTestBase { @@ -86,29 +91,18 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { @Inject ClusterService clusterService; - private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); private AtlasEntityStoreV1 entityStore; private ZipSource zipSource; @BeforeClass public void setup() throws IOException, AtlasBaseException { - loadBaseModel(typeDefStore, typeRegistry); - loadHiveModel(typeDefStore, typeRegistry); - createEntities(); - } - - private void createEntities() { + basicSetup(typeDefStore, typeRegistry); entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper); + createEntities(entityStore, ENTITIES_SUB_DIR, new String[]{"db", "table-columns"}); - createAtlasEntity(entityStore, loadEntity(ENTITIES_SUB_DIR,"db")); - createAtlasEntity(entityStore, loadEntity(ENTITIES_SUB_DIR, "table-columns")); - - try { - AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID)); - assertEquals(entities.getEntities().size(), 2); - } catch (AtlasBaseException e) { - throw new SkipException(String.format("getByIds: could not load '%s' & '%s'.", DB_GUID, TABLE_GUID)); - } + AtlasType refType = typeRegistry.getType("Referenceable"); + AtlasEntityDef entityDef = (AtlasEntityDef) typeDefStore.getByName(refType.getTypeName()); + assertNotNull(entityDef); } @BeforeMethod @@ -128,20 +122,21 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { assertNotNull(zipSource.getCreationOrder()); assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount); - assertClusterInfo(REPLICATED_TO_CLUSTER_NAME); + assertCluster(REPLICATED_TO_CLUSTER_NAME, null); assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_TO_CLUSTER); } @Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute") public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException { AtlasImportRequest request = getImportRequestWithReplicationOption(); - runImportWithParameters(importService, request, zipSource); + AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource); - assertClusterInfo(REPLICATED_FROM_CLUSTER_NAME); + assertCluster(REPLICATED_FROM_CLUSTER_NAME, importResult); assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER); } private void assertReplicationAttribute(String attrNameReplication) throws AtlasBaseException { + pauseForIndexCreation(); AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID)); for (AtlasEntity e : entities.getEntities()) { Object ex = e.getAttribute(attrNameReplication); @@ -152,11 +147,25 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { } } - private void assertClusterInfo(String name) { + private void assertCluster(String name, AtlasImportResult importResult) throws AtlasBaseException { AtlasCluster actual = clusterService.get(new AtlasCluster(name, name)); assertNotNull(actual); assertEquals(actual.getName(), name); + + if(importResult != null) { + assertClusterAdditionalInfo(actual, importResult); + } + } + + private void assertClusterAdditionalInfo(AtlasCluster cluster, AtlasImportResult importResult) throws AtlasBaseException { + AtlasExportRequest request = importResult.getExportResult().getRequest(); + AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(request.getItemsToExport().get(0).getTypeName()); + AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getByUniqueAttributes(type, request.getItemsToExport().get(0).getUniqueAttributes()); + long actualLastModifiedTimestamp = (long) cluster.getAdditionalInfoRepl(entity.getEntity().getGuid()); + + assertTrue(cluster.getAdditionalInfo().size() > 0); + assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getLastModifiedTimestamp()); } private AtlasExportRequest getUpdateMetaInfoUpdateRequest() { http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/repository/src/test/resources/json/stocksDB-Entities/replicationAttrs.json ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/json/stocksDB-Entities/replicationAttrs.json b/repository/src/test/resources/json/stocksDB-Entities/replicationAttrs.json index 8282638..4441036 100644 --- a/repository/src/test/resources/json/stocksDB-Entities/replicationAttrs.json +++ b/repository/src/test/resources/json/stocksDB-Entities/replicationAttrs.json @@ -4,5 +4,8 @@ "cardinality": "SET", "isIndexable": false, "isOptional": true, - "isUnique": false + "isUnique": false, + "options": { + "isSoftReference": "true" + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/b9aa6d5d/typesystem/src/main/java/org/apache/atlas/typesystem/types/AttributeDefinition.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/AttributeDefinition.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/AttributeDefinition.java index 5561f0b..370d43d 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/AttributeDefinition.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/AttributeDefinition.java @@ -37,6 +37,7 @@ public final class AttributeDefinition { * that this refers to. */ public final String reverseAttributeName; + public boolean isSoftRef; public AttributeDefinition(String name, String dataTypeName, Multiplicity multiplicity, boolean isComposite, String reverseAttributeName) { @@ -78,4 +79,8 @@ public final class AttributeDefinition { public String toString() { return name; } + + public void setSoftRef(boolean isSoftRef) { + this.isSoftRef = isSoftRef; + } }