This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


The following commit(s) were added to refs/heads/master by this push:
     new 39f670ac fix(loader): support file name with prefix for hdfs source 
(#571)
39f670ac is described below

commit 39f670ac21a72c25b6f43213aeb81761c23c954d
Author: YangJiaqi <[email protected]>
AuthorDate: Tue Mar 19 16:41:21 2024 +0800

    fix(loader): support file name with prefix for hdfs source (#571)
    
    * use iterator to scan files
    
    ---------
    
    Co-authored-by: jacky.yang <[email protected]>
    Co-authored-by: imbajin <[email protected]>
---
 .../loader/reader/hdfs/HDFSFileReader.java         | 41 ++++++++++++++++------
 .../loader/test/functional/HDFSLoadTest.java       | 27 ++++++++++++++
 .../resources/hdfs_file_with_prefix/core-site.xml  | 22 ++++++++++++
 .../resources/hdfs_file_with_prefix/schema.groovy  | 33 +++++++++++++++++
 .../hdfs_file_with_prefix/struct_hdfs.json         | 19 ++++++++++
 5 files changed, 132 insertions(+), 10 deletions(-)

diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/hdfs/HDFSFileReader.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/hdfs/HDFSFileReader.java
index 93f8b545..26e769d6 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/hdfs/HDFSFileReader.java
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/hdfs/HDFSFileReader.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hugegraph.loader.constant.Constants;
 import org.apache.hugegraph.loader.exception.LoadException;
@@ -52,6 +52,8 @@ public class HDFSFileReader extends FileReader {
 
     private final FileSystem hdfs;
     private final Configuration conf;
+    private String prefix;
+    private String input_path;
 
     public HDFSFileReader(HDFSSource source) {
         super(source);
@@ -62,7 +64,22 @@ public class HDFSFileReader extends FileReader {
         } catch (IOException e) {
             throw new LoadException("Failed to create HDFS file system", e);
         }
-        Path path = new Path(source.path());
+
+        String input = source.path();
+        if (input.contains("*")) {
+            int lastSlashIndex = input.lastIndexOf('/');
+            if (lastSlashIndex != -1) {
+                input_path = input.substring(0, lastSlashIndex);
+                // TODO: support multiple prefix in uri?
+                prefix = input.substring(lastSlashIndex + 1, input.length() - 
1);
+            } else {
+                LOG.error("File path format error!");
+            }
+        } else {
+            input_path = input;
+        }
+
+        Path path = new Path(input_path);
         checkExist(this.hdfs, path);
     }
 
@@ -98,22 +115,26 @@ public class HDFSFileReader extends FileReader {
 
     @Override
     protected List<Readable> scanReadables() throws IOException {
-        Path path = new Path(this.source().path());
+        Path path = new Path(input_path);
         FileFilter filter = this.source().filter();
         List<Readable> paths = new ArrayList<>();
-        if (this.hdfs.isFile(path)) {
+        FileStatus status = this.hdfs.getFileStatus(path);
+
+        if (status.isFile()) {
             if (!filter.reserved(path.getName())) {
                 throw new LoadException("Please check path name and 
extensions, ensure that " +
                                         "at least one path is available for 
reading");
             }
             paths.add(new HDFSFile(this.hdfs, path));
         } else {
-            assert this.hdfs.isDirectory(path);
-            FileStatus[] statuses = this.hdfs.listStatus(path);
-            Path[] subPaths = FileUtil.stat2Paths(statuses);
-            for (Path subPath : subPaths) {
-                if (filter.reserved(subPath.getName())) {
-                    paths.add(new HDFSFile(this.hdfs, subPath));
+            assert status.isDirectory();
+            RemoteIterator<FileStatus> iter = 
this.hdfs.listStatusIterator(path);
+            while (iter.hasNext()) {
+                FileStatus subStatus = iter.next();
+                // check file/dirname StartWith prefiex & passed filter
+                if ((prefix == null || prefix.isEmpty() || 
subStatus.getPath().getName().startsWith(prefix)) &&
+                        filter.reserved(subStatus.getPath().getName())) {
+                    paths.add(new HDFSFile(this.hdfs, subStatus.getPath()));
                 }
             }
         }
diff --git 
a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java
 
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java
index 5a51e17b..4a00c5bf 100644
--- 
a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java
+++ 
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java
@@ -72,6 +72,33 @@ public class HDFSLoadTest extends FileLoadTest {
         Assert.assertEquals(5, vertices.size());
     }
 
+    @Test
+    public void testHDFSWithFilePrefix() {
+        ioUtil.write("vertex_person_0.csv",
+                "name,age,city",
+                "marko,29,Beijing");
+
+        ioUtil.write("vertex_person_1.csv",
+                "name,age,city",
+                "vadas,27,Hongkong",
+                "josh,32,Beijing",
+                "peter,35,Shanghai",
+                "\"li,nary\",26,\"Wu,han\"");
+
+        String[] args = new String[]{
+                "-f", structPath("hdfs_file_with_prefix/struct.json"),
+                "-s", configPath("hdfs_file_with_prefix/schema.groovy"),
+                "-g", GRAPH,
+                "-h", SERVER,
+                "--batch-insert-threads", "2",
+                "--test-mode", "true"
+        };
+        HugeGraphLoader loader = new HugeGraphLoader(args);
+        loader.load();
+        List<Vertex> vertices = CLIENT.graph().listVertices();
+        Assert.assertEquals(5, vertices.size());
+    }
+
     @Test
     public void testHDFSWithCoreSitePathEmpty() {
         ioUtil.write("vertex_person.csv",
diff --git 
a/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/core-site.xml 
b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/core-site.xml
new file mode 100644
index 00000000..7d02c144
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/core-site.xml
@@ -0,0 +1,22 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with this
+  work for additional information regarding copyright ownership. The ASF
+  licenses this file to You under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and limitations
+  under the License.
+  -->
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://localhost:8020</value>
+    </property>
+</configuration>
diff --git 
a/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/schema.groovy 
b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/schema.groovy
new file mode 100644
index 00000000..8571b435
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/schema.groovy
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("weight").asDouble().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("date").asText().ifNotExist().create();
+schema.propertyKey("price").asDouble().ifNotExist().create();
+schema.propertyKey("feel").asText().valueList().ifNotExist().create();
+schema.propertyKey("time").asText().valueSet().ifNotExist().create();
+
+schema.vertexLabel("person").properties("name", "age", 
"city").primaryKeys("name").ifNotExist().create();
+schema.vertexLabel("software").properties("name", "lang", 
"price").primaryKeys("name").ifNotExist().create();
+
+schema.edgeLabel("knows").sourceLabel("person").targetLabel("person").properties("date",
 "weight").ifNotExist().create();
+schema.edgeLabel("created").sourceLabel("person").targetLabel("software").properties("date",
 "weight").ifNotExist().create();
+schema.edgeLabel("use").sourceLabel("person").targetLabel("software").properties("feel",
 "time").nullableKeys("feel", "time").ifNotExist().create();
diff --git 
a/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/struct_hdfs.json 
b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/struct_hdfs.json
new file mode 100644
index 00000000..2b2d54d0
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/struct_hdfs.json
@@ -0,0 +1,19 @@
+{
+  "vertices": [
+    {
+      "label": "person",
+      "input": {
+        "type": "hdfs",
+        "path": "${store_path}/vertex_*",
+        "core_site_path": 
"src/test/resources/hdfs_with_core_site_path/core-site.xml",
+        "format": "CSV",
+        "charset": "UTF-8"
+      },
+      "field_mapping": {
+        "name": "name",
+        "age": "age",
+        "city": "city"
+      }
+    }
+  ]
+}

Reply via email to