Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e06fe8cf2 -> 804622251


[GOBBLIN-621] Add utilities

Closes #2488 from zxcware/tu


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/80462225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/80462225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/80462225

Branch: refs/heads/master
Commit: 8046222510de38b8a418e7417c2269978ed2da37
Parents: e06fe8c
Author: zhchen <[email protected]>
Authored: Fri Oct 26 16:18:23 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Fri Oct 26 16:18:23 2018 -0700

----------------------------------------------------------------------
 .../org/apache/gobblin/util/FileListUtils.java  | 36 ++++++++++++
 .../apache/gobblin/util/test/TestIOUtils.java   | 61 +++++++++++++++++++
 .../apache/gobblin/util/FileListUtilsTest.java  | 30 ++++++++++
 .../gobblin/util/test/TestIOUtilsTest.java      | 62 ++++++++++++++++++++
 .../src/test/resources/test_data.avsc           | 19 ++++++
 .../src/test/resources/test_data.json           |  2 +
 6 files changed, 210 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
index 343da43..6971bd6 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Stack;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.fs.FileStatus;
@@ -33,6 +34,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 
+import org.apache.gobblin.util.filters.HiddenFilter;
+
 
 /**
  * Utility class for listing files on a {@link FileSystem}.
@@ -227,4 +230,37 @@ public class FileListUtils {
     }
     return files;
   }
+
+  /**
+   * Get any data file, which is not hidden or a directory, from the given path
+   */
+  public static FileStatus getAnyNonHiddenFile(FileSystem fs, Path path)
+      throws IOException {
+    HiddenFilter hiddenFilter = new HiddenFilter();
+
+    FileStatus root = fs.getFileStatus(path);
+    if (!root.isDirectory()) {
+      return hiddenFilter.accept(path) ? root : null;
+    }
+
+    // DFS to get the first data file
+    Stack<FileStatus> folders = new Stack<>();
+    folders.push(root);
+    while (!folders.empty()) {
+      FileStatus curFolder = folders.pop();
+      try {
+        for (FileStatus status : fs.listStatus(curFolder.getPath(), 
hiddenFilter)) {
+          if (status.isDirectory()) {
+            folders.push(status);
+          } else {
+            return status;
+          }
+        }
+      } catch (FileNotFoundException exc) {
+        // continue
+      }
+    }
+
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/main/java/org/apache/gobblin/util/test/TestIOUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/TestIOUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/TestIOUtils.java
new file mode 100644
index 0000000..f9bae03
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/TestIOUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.test;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+
+
+/**
+ * Test utils to read from and write records from a file
+ */
+public class TestIOUtils {
+  /**
+   * Reads all records from a json file as {@link GenericRecord}s
+   */
+  public static List<GenericRecord> readAllRecords(String jsonDataPath, String 
schemaPath)
+      throws Exception {
+    List<GenericRecord> records = new ArrayList<>();
+    File jsonDataFile = new File(jsonDataPath);
+    File schemaFile = new File(schemaPath);
+
+    Schema schema = new Schema.Parser().parse(schemaFile);
+    GenericDatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>(schema);
+
+    try (InputStream is = new FileInputStream(jsonDataFile)) {
+      Decoder decoder = DecoderFactory.get().jsonDecoder(schema, is);
+      while (true) {
+        records.add(datumReader.read(null, decoder));
+      }
+    } catch (EOFException eof) {
+      // read all records
+    }
+
+    return records;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
index e739e00..7151eaa 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.util;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
@@ -250,4 +251,33 @@ public class FileListUtilsTest {
     }
   }
 
+  @Test
+  public void testGetAnyNonHiddenFile() throws IOException {
+    final String file1 = "test1";
+
+    FileSystem localFs = FileSystem.getLocal(new Configuration());
+    Path baseDir = new Path(FILE_UTILS_TEST_DIR, "anyFileDir");
+    try {
+      if (localFs.exists(baseDir)) {
+        localFs.delete(baseDir, true);
+      }
+      localFs.mkdirs(baseDir);
+      Path emptySubDir = new Path(baseDir, "emptySubDir");
+      localFs.mkdirs(emptySubDir);
+
+      Path hiddenDir = new Path(baseDir, "_hidden");
+      localFs.mkdirs(hiddenDir);
+      localFs.create(new Path(hiddenDir, file1));
+
+      Path dataDir = new Path(baseDir, "dataDir");
+      localFs.mkdirs(dataDir);
+      File dataFile = new File(dataDir.toString(), file1);
+      localFs.create(new Path(dataDir, file1));
+      FileStatus file = FileListUtils.getAnyNonHiddenFile(localFs, baseDir);
+      Assert.assertEquals(file.getPath().toString(), 
dataFile.toURI().toString());
+    } finally {
+      localFs.delete(baseDir, true);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestIOUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestIOUtilsTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestIOUtilsTest.java
new file mode 100644
index 0000000..6398538
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestIOUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.test;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.avro.generic.GenericRecord;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test cases for {@link TestIOUtils}
+ */
+public class TestIOUtilsTest {
+
+  @Test
+  public void testReadAllRecords()
+      throws Exception {
+
+    List<GenericRecord> testData = TestIOUtils.readAllRecords(
+        getClass().getResource("/test_data.json").getPath(),
+        getClass().getResource("/test_data.avsc").getPath());
+
+    Assert.assertEquals(testData.size(), 2);
+    Assert.assertEquals(find(testData, "string1", "string1").toString(),
+      "{\"string1\": \"string1\", \"long1\": 1234, \"double1\": 1234.12}");
+    Assert.assertEquals(find(testData, "string1", "string2").toString(),
+        "{\"string1\": \"string2\", \"long1\": 4567, \"double1\": 4567.89}");
+  }
+
+  private static GenericRecord find(Collection<GenericRecord> records, String 
field, String value) {
+
+    for (GenericRecord record : records) {
+      if (null == record.getSchema().getField(field)) {
+        continue;
+      }
+
+      if (null != record.get(field) && 
record.get(field).toString().equals(value)) {
+        return record;
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/test/resources/test_data.avsc
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/resources/test_data.avsc 
b/gobblin-utility/src/test/resources/test_data.avsc
new file mode 100644
index 0000000..82a7d7e
--- /dev/null
+++ b/gobblin-utility/src/test/resources/test_data.avsc
@@ -0,0 +1,19 @@
+{
+  "name":"data",
+  "namespace":"org.apache.gobblin.test",
+  "type":"record",
+  "fields":[
+    {
+      "name":"string1",
+      "type":"string"
+    },
+    {
+      "name":"long1",
+      "type":"long"
+    },
+    {
+      "name":"double1",
+      "type":"double"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/test/resources/test_data.json
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/resources/test_data.json 
b/gobblin-utility/src/test/resources/test_data.json
new file mode 100644
index 0000000..253210e
--- /dev/null
+++ b/gobblin-utility/src/test/resources/test_data.json
@@ -0,0 +1,2 @@
+{"string1": "string1", "long1": 1234, "double1": 1234.12}
+{"string1": "string2", "long1": 4567, "double1": 4567.89}
\ No newline at end of file

Reply via email to