ATLAS-2869: Hdfs_path if requested are created and then proceeds with export.

Signed-off-by: Ashutosh Mestry <ames...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bf240459
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bf240459
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bf240459

Branch: refs/heads/branch-1.0
Commit: bf240459b2e39aa9471f42e77dd83b2f22a091db
Parents: e441415
Author: Ashutosh Mestry <ames...@hortonworks.com>
Authored: Tue Sep 11 15:29:02 2018 -0700
Committer: Ashutosh Mestry <ames...@hortonworks.com>
Committed: Thu Nov 1 15:42:55 2018 -0700

----------------------------------------------------------------------
 .../atlas/repository/impexp/ExportService.java  |  12 +-
 .../impexp/HdfsPathEntityCreator.java           | 131 +++++++++++++++++++
 .../impexp/HdfsPathEntityCreatorTest.java       |  81 ++++++++++++
 3 files changed, 221 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/bf240459/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index d3cff78..f10d615 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -81,14 +81,16 @@ public class ExportService {
     private final EntityGraphRetriever      entityGraphRetriever;
     private final AtlasGremlinQueryProvider gremlinQueryProvider;
     private       ExportTypeProcessor       exportTypeProcessor;
-
+    private final HdfsPathEntityCreator     hdfsPathEntityCreator;
     @Inject
-    public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph 
atlasGraph, AuditsWriter auditsWriter) {
+    public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph 
atlasGraph,
+                         AuditsWriter auditsWriter, HdfsPathEntityCreator 
hdfsPathEntityCreator) {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new 
EntityGraphRetriever(this.typeRegistry);
         this.atlasGraph           = atlasGraph;
         this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
-        this.auditsWriter = auditsWriter;
+        this.auditsWriter         = auditsWriter;
+        this.hdfsPathEntityCreator = hdfsPathEntityCreator;
     }
 
     public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest 
request, String userName, String hostName,
@@ -244,6 +246,10 @@ public class ExportService {
     private List<String> getStartingEntity(AtlasObjectId item, ExportContext 
context) throws AtlasBaseException {
         List<String> ret = null;
 
+        
if(item.getTypeName().equalsIgnoreCase(HdfsPathEntityCreator.HDFS_PATH_TYPE)) {
+            hdfsPathEntityCreator.getCreateEntity(item);
+        }
+
         if (StringUtils.isNotEmpty(item.getGuid())) {
             ret = Collections.singletonList(item.getGuid());
         } else if (StringUtils.equalsIgnoreCase(context.matchType, 
MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/bf240459/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
new file mode 100644
index 0000000..fddd60b
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.atlas.repository.impexp.AuditsWriter.getCurrentClusterName;
+
+@Component
+public class HdfsPathEntityCreator {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(HdfsPathEntityCreator.class);
+
+    public static final String HDFS_PATH_TYPE = "hdfs_path";
+    public static final String HDFS_PATH_ATTRIBUTE_NAME_NAME = "name";
+    public static final String HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME = 
"clusterName";
+    public static final String HDFS_PATH_ATTRIBUTE_NAME_PATH = "path";
+    public static final String HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
+
+    private static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+    private final String PATH_SEPARATOR = "/";
+
+    private AtlasTypeRegistry typeRegistry;
+    private AtlasEntityStoreV1 entityStore;
+
+    @Inject
+    public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, 
AtlasEntityStoreV1 entityStore) {
+        this.typeRegistry = typeRegistry;
+        this.entityStore = entityStore;
+    }
+
+    public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(AtlasObjectId 
item) throws AtlasBaseException {
+        
if(!item.getUniqueAttributes().containsKey(HDFS_PATH_ATTRIBUTE_NAME_PATH)) {
+            return null;
+        }
+
+        return getCreateEntity((String) 
item.getUniqueAttributes().get(HDFS_PATH_ATTRIBUTE_NAME_PATH));
+    }
+
+    public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path) 
throws AtlasBaseException {
+        return getCreateEntity(path, getCurrentClusterName());
+    }
+
+    public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path, 
String clusterName) throws AtlasBaseException {
+        String pathWithTrailingSeparator = getPathWithTrailingSeparator(path);
+        AtlasEntityType hdfsPathEntityType = getHdfsPathEntityType();
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
+        if(entityWithExtInfo != null) {
+            return entityWithExtInfo;
+        }
+
+        AtlasEntity entity = createHDFSPathEntity(hdfsPathEntityType, 
pathWithTrailingSeparator, clusterName);
+        AtlasEntityStream entityStream = new AtlasEntityStream(entity);
+        EntityMutationResponse entityMutationResponse = 
entityStore.createOrUpdate(entityStream, false);
+        if(entityMutationResponse.getCreatedEntities().size() == 0) {
+            return null;
+        }
+
+        return getHDFSPathEntity(hdfsPathEntityType, 
pathWithTrailingSeparator, clusterName);
+    }
+
+    private AtlasEntity createHDFSPathEntity(AtlasEntityType 
hdfsPathEntityType, String path, String clusterName) {
+        AtlasEntity entity = hdfsPathEntityType.createDefaultValue();
+
+        entity.setAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, 
getQualifiedName(path, clusterName));
+        entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_PATH, path);
+        entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME, path);
+        entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME, 
clusterName);
+
+        return entity;
+    }
+
+    private AtlasEntity.AtlasEntityWithExtInfo 
getHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String 
clusterName) {
+        try {
+            return entityStore.getByUniqueAttributes(hdfsPathEntityType, 
getUniqueAttributes(path, clusterName));
+        } catch (AtlasBaseException e) {
+            return null;
+        }
+    }
+
+    private AtlasEntityType getHdfsPathEntityType() throws AtlasBaseException {
+        return (AtlasEntityType) typeRegistry.getType(HDFS_PATH_TYPE);
+    }
+
+    private Map<String,Object> getUniqueAttributes(String path, String 
clusterName) {
+        Map<String,Object>  ret = new HashMap<String, Object>();
+        ret.put(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, 
clusterName));
+        return ret;
+    }
+
+    private String getPathWithTrailingSeparator(String path) {
+        if(path.endsWith(PATH_SEPARATOR)) {
+            return path;
+        }
+
+        return path + PATH_SEPARATOR;
+    }
+
+    public static String getQualifiedName(String path, String clusterName) {
+        return String.format(QUALIFIED_NAME_FORMAT, path, clusterName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/bf240459/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
new file mode 100644
index 0000000..1863b8d
--- /dev/null
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+import static 
org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME;
+import static 
org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_NAME;
+import static 
org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME;
+import static 
org.apache.atlas.repository.impexp.HdfsPathEntityCreator.getQualifiedName;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class HdfsPathEntityCreatorTest extends ExportImportTestBase {
+
+    @Inject
+    AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private HdfsPathEntityCreator hdfsPathEntityCreator;
+
+    private static final String expectedPath = 
"hdfs://server-name/warehouse/hr";
+    private static final String expectedClusterName = "cl1";
+
+    @BeforeClass
+    public void setup() throws IOException, AtlasBaseException {
+        basicSetup(typeDefStore, typeRegistry);
+        loadFsModel(typeDefStore, typeRegistry);
+    }
+
+    @Test
+    public void verifyCreate() throws AtlasBaseException {
+
+        String expectedQualifiedName = getQualifiedName(expectedPath + "/", 
expectedClusterName);
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName);
+
+        assertNotNull(entityWithExtInfo);
+        AtlasEntity entity = entityWithExtInfo.getEntity();
+        
assertEquals(entity.getAttribute(HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_PATH),
 expectedPath + "/");
+        
assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME),expectedQualifiedName);
+        assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME), 
expectedPath + "/");
+        
assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME), 
expectedClusterName);
+    }
+
+    @Test(dependsOnMethods = "verifyCreate")
+    public void verifyGet() throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName);
+
+        assertNotNull(entityWithExtInfo);
+    }
+}

Reply via email to