Repository: incubator-gobblin Updated Branches: refs/heads/master e06fe8cf2 -> 804622251
[GOBBLIN-621] Add utilities Closes #2488 from zxcware/tu Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/80462225 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/80462225 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/80462225 Branch: refs/heads/master Commit: 8046222510de38b8a418e7417c2269978ed2da37 Parents: e06fe8c Author: zhchen <[email protected]> Authored: Fri Oct 26 16:18:23 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Oct 26 16:18:23 2018 -0700 ---------------------------------------------------------------------- .../org/apache/gobblin/util/FileListUtils.java | 36 ++++++++++++ .../apache/gobblin/util/test/TestIOUtils.java | 61 +++++++++++++++++++ .../apache/gobblin/util/FileListUtilsTest.java | 30 ++++++++++ .../gobblin/util/test/TestIOUtilsTest.java | 62 ++++++++++++++++++++ .../src/test/resources/test_data.avsc | 19 ++++++ .../src/test/resources/test_data.json | 2 + 6 files changed, 210 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java index 343da43..6971bd6 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Comparator; import java.util.List; +import java.util.Stack; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.fs.FileStatus; @@ -33,6 +34,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; +import org.apache.gobblin.util.filters.HiddenFilter; + /** * Utility class for listing files on a {@link FileSystem}. @@ -227,4 +230,37 @@ public class FileListUtils { } return files; } + + /** + * Get any data file, which is not hidden or a directory, from the given path + */ + public static FileStatus getAnyNonHiddenFile(FileSystem fs, Path path) + throws IOException { + HiddenFilter hiddenFilter = new HiddenFilter(); + + FileStatus root = fs.getFileStatus(path); + if (!root.isDirectory()) { + return hiddenFilter.accept(path) ? root : null; + } + + // DFS to get the first data file + Stack<FileStatus> folders = new Stack<>(); + folders.push(root); + while (!folders.empty()) { + FileStatus curFolder = folders.pop(); + try { + for (FileStatus status : fs.listStatus(curFolder.getPath(), hiddenFilter)) { + if (status.isDirectory()) { + folders.push(status); + } else { + return status; + } + } + } catch (FileNotFoundException exc) { + // continue + } + } + + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/main/java/org/apache/gobblin/util/test/TestIOUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/TestIOUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/TestIOUtils.java new file mode 100644 index 0000000..f9bae03 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/TestIOUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.test; + +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; + + +/** + * Test utils to read from and write records from a file + */ +public class TestIOUtils { + /** + * Reads all records from a json file as {@link GenericRecord}s + */ + public static List<GenericRecord> readAllRecords(String jsonDataPath, String schemaPath) + throws Exception { + List<GenericRecord> records = new ArrayList<>(); + File jsonDataFile = new File(jsonDataPath); + File schemaFile = new File(schemaPath); + + Schema schema = new Schema.Parser().parse(schemaFile); + GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); + + try (InputStream is = new FileInputStream(jsonDataFile)) { + Decoder decoder = DecoderFactory.get().jsonDecoder(schema, is); + while (true) { + records.add(datumReader.read(null, decoder)); + } + } catch (EOFException eof) { + // read all records + } + + return records; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java index e739e00..7151eaa 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.util; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Set; @@ -250,4 +251,33 @@ public class FileListUtilsTest { } } + @Test + public void testGetAnyNonHiddenFile() throws IOException { + final String file1 = "test1"; + + FileSystem localFs = FileSystem.getLocal(new Configuration()); + Path baseDir = new Path(FILE_UTILS_TEST_DIR, "anyFileDir"); + try { + if (localFs.exists(baseDir)) { + localFs.delete(baseDir, true); + } + localFs.mkdirs(baseDir); + Path emptySubDir = new Path(baseDir, "emptySubDir"); + localFs.mkdirs(emptySubDir); + + Path hiddenDir = new Path(baseDir, "_hidden"); + localFs.mkdirs(hiddenDir); + localFs.create(new Path(hiddenDir, file1)); + + Path dataDir = new Path(baseDir, "dataDir"); + localFs.mkdirs(dataDir); + File dataFile = new File(dataDir.toString(), file1); + localFs.create(new Path(dataDir, file1)); + FileStatus file = FileListUtils.getAnyNonHiddenFile(localFs, baseDir); + Assert.assertEquals(file.getPath().toString(), dataFile.toURI().toString()); + } finally { + localFs.delete(baseDir, true); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestIOUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestIOUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestIOUtilsTest.java new file mode 100644 index 0000000..6398538 --- /dev/null +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestIOUtilsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.test; + +import java.util.Collection; +import java.util.List; + +import org.apache.avro.generic.GenericRecord; +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Test cases for {@link TestIOUtils} + */ +public class TestIOUtilsTest { + + @Test + public void testReadAllRecords() + throws Exception { + + List<GenericRecord> testData = TestIOUtils.readAllRecords( + getClass().getResource("/test_data.json").getPath(), + getClass().getResource("/test_data.avsc").getPath()); + + Assert.assertEquals(testData.size(), 2); + Assert.assertEquals(find(testData, "string1", "string1").toString(), + "{\"string1\": \"string1\", \"long1\": 1234, \"double1\": 1234.12}"); + Assert.assertEquals(find(testData, "string1", "string2").toString(), + "{\"string1\": \"string2\", \"long1\": 4567, \"double1\": 4567.89}"); + } + + private static GenericRecord find(Collection<GenericRecord> records, String field, String value) { + + for (GenericRecord record : records) { + if (null == record.getSchema().getField(field)) { + continue; + } + + if (null != record.get(field) && record.get(field).toString().equals(value)) { + return record; + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/test/resources/test_data.avsc ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/resources/test_data.avsc b/gobblin-utility/src/test/resources/test_data.avsc new file mode 100644 index 0000000..82a7d7e --- /dev/null +++ b/gobblin-utility/src/test/resources/test_data.avsc @@ -0,0 +1,19 @@ +{ + "name":"data", + "namespace":"org.apache.gobblin.test", + "type":"record", + "fields":[ + { + "name":"string1", + "type":"string" + }, + { + "name":"long1", + "type":"long" + }, + { + "name":"double1", + "type":"double" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/80462225/gobblin-utility/src/test/resources/test_data.json ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/resources/test_data.json b/gobblin-utility/src/test/resources/test_data.json new file mode 100644 index 0000000..253210e --- /dev/null +++ b/gobblin-utility/src/test/resources/test_data.json @@ -0,0 +1,2 @@ +{"string1": "string1", "long1": 1234, "double1": 1234.12} +{"string1": "string2", "long1": 4567, "double1": 4567.89} \ No newline at end of file
