Repository: crunch Updated Branches: refs/heads/master 4df441907 -> 5ef1c4ed2
CRUNCH-668: Support globbing patterns in From#avroFile Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5ef1c4ed Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5ef1c4ed Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5ef1c4ed Branch: refs/heads/master Commit: 5ef1c4ed2a09c2a4672450518be71db029f91433 Parents: 4df4419 Author: Clément MATHIEU <[email protected]> Authored: Tue Mar 27 17:55:15 2018 +0200 Committer: Josh Wills <[email protected]> Committed: Fri Mar 30 10:31:44 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/io/From.java | 30 +++++++-- .../java/org/apache/crunch/io/FromTest.java | 69 ++++++++++++++++++++ 2 files changed, 95 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5ef1c4ed/crunch-core/src/main/java/org/apache/crunch/io/From.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java index f15f309..7b0c9dc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/From.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java @@ -347,19 +347,41 @@ public class From { DataFileReader reader = null; try { FileSystem fs = path.getFileSystem(conf); + if (!fs.isFile(path)) { - FileStatus[] fstat = fs.listStatus(path, new PathFilter() { + PathFilter ignoreHidden = new PathFilter() { @Override public boolean accept(Path path) { String name = path.getName(); return !name.startsWith("_") && !name.startsWith("."); } - }); - if (fstat == null || fstat.length == 0) { + }; + + FileStatus[] globStatus = fs.globStatus(path, ignoreHidden); + if (globStatus == null) { throw new IllegalArgumentException("No valid files found in directory: " + path); } - path = fstat[0].getPath(); + + Path newPath = null; + for (FileStatus status : globStatus) { + if (status.isFile()) { + newPath = status.getPath(); + break; + } else { + FileStatus[] listStatus = fs.listStatus(path, ignoreHidden); + if (listStatus != null && listStatus.length > 0) { + newPath = listStatus[0].getPath(); + break; + } + } + } + + if (newPath == null) { + throw new IllegalArgumentException("No valid files found in directory: " + path); + } + path = newPath; } + reader = new DataFileReader(new FsInput(path, conf), new GenericDatumReader<GenericRecord>()); return reader.getSchema(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/crunch/blob/5ef1c4ed/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java b/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java index 06ef6cd..ca2f6e4 100644 --- a/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java @@ -17,15 +17,33 @@ */ package org.apache.crunch.io; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + import com.google.common.collect.ImmutableList; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.crunch.Source; +import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; public class FromTest { + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + @Test(expected=IllegalArgumentException.class) public void testAvroFile_EmptyPathListNotAllowed() { From.avroFile(ImmutableList.<Path>of()); @@ -45,4 +63,55 @@ public class FromTest { public void testSequenceFile_EmptyPathListNotAllowed() { From.sequenceFile(ImmutableList.<Path>of(), LongWritable.class, Text.class); } + + @Test + public void testAvroFile_GlobWithSchemaInferenceIsSupported() throws IOException { + Schema schema = SchemaBuilder.record("record") + .fields() + .endRecord(); + + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) { + writer.create(schema, tmp.newFile("1")); + writer.append(new GenericData.Record(schema)); + } + + Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString() + "/*")); + + assertEquals(source.getType(), Avros.generics(schema)); + } + + @Test + public void testAvroFile_DirectoryWithSchemaInferenceIsSupported() throws IOException { + Schema schema = SchemaBuilder.record("record") + .fields() + .endRecord(); + + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) { + writer.create(schema, tmp.newFile("1")); + writer.append(new GenericData.Record(schema)); + } + + Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString())); + + assertEquals(source.getType(), Avros.generics(schema)); + } + + @Test + public void testAvroFile_FileWithSchemaInferenceIsSupported() throws IOException { + Schema schema = SchemaBuilder.record("record") + .fields() + .endRecord(); + + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) { + writer.create(schema, tmp.newFile("1")); + writer.append(new GenericData.Record(schema)); + } + + Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString(), "1")); + + assertEquals(source.getType(), Avros.generics(schema)); + } } \ No newline at end of file
