Updated Branches: refs/heads/master 6b994e3bb -> 8d47b3b50
CRUNCH-256: Cache intermediate file IDs for the sequential naming scheme, which is now a singleton. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8d47b3b5 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8d47b3b5 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8d47b3b5 Branch: refs/heads/master Commit: 8d47b3b5097a82c7d0aac62d4ed6859b89cf85cf Parents: 6b994e3 Author: Josh Wills <[email protected]> Authored: Thu Aug 22 15:05:09 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Fri Aug 23 10:57:06 2013 -0700 ---------------------------------------------------------------------- .../crunch/io/SequentialFileNamingScheme.java | 31 +++++++++++++++++--- .../src/main/java/org/apache/crunch/io/To.java | 2 +- .../crunch/io/avro/AvroFileSourceTarget.java | 4 +-- .../apache/crunch/io/avro/AvroFileTarget.java | 2 +- .../io/avro/trevni/TrevniKeySourceTarget.java | 2 +- .../crunch/io/avro/trevni/TrevniKeyTarget.java | 2 +- .../io/impl/TableSourcePathTargetImpl.java | 2 +- .../crunch/io/seq/SeqFileSourceTarget.java | 2 +- .../crunch/io/seq/SeqFileTableSourceTarget.java | 2 +- .../org/apache/crunch/io/seq/SeqFileTarget.java | 2 +- .../crunch/io/text/TextFileSourceTarget.java | 2 +- .../io/text/TextFileTableSourceTarget.java | 2 +- .../apache/crunch/io/text/TextFileTarget.java | 2 +- .../io/SequentialFileNamingSchemeTest.java | 2 +- .../org/apache/crunch/io/hbase/HFileTarget.java | 2 +- 15 files changed, 42 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java b/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java index bdda8e6..55eb9fc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java @@ -18,7 +18,9 @@ package org.apache.crunch.io; import java.io.IOException; +import java.util.Map; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,6 +31,18 @@ import org.apache.hadoop.fs.Path; */ public class SequentialFileNamingScheme implements FileNamingScheme { + private static final SequentialFileNamingScheme INSTANCE = new SequentialFileNamingScheme(); + + public static SequentialFileNamingScheme getInstance() { + return INSTANCE; + } + + private final Map<Path, Integer> cache; + + private SequentialFileNamingScheme() { + this.cache = Maps.newHashMap(); + } + @Override public String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException { return getSequentialFileName(configuration, outputDirectory, "m"); @@ -42,10 +56,19 @@ public class SequentialFileNamingScheme implements FileNamingScheme { private String getSequentialFileName(Configuration configuration, Path outputDirectory, String jobTypeName) throws IOException { - FileSystem fileSystem = outputDirectory.getFileSystem(configuration); - int fileSequenceNumber = fileSystem.listStatus(outputDirectory).length; - - return String.format("part-%s-%05d", jobTypeName, fileSequenceNumber); + return String.format("part-%s-%05d", jobTypeName, getSequenceNumber(configuration, outputDirectory)); } + private synchronized int getSequenceNumber(Configuration conf, Path outputDirectory) throws IOException { + if (cache.containsKey(outputDirectory)) { + int next = cache.get(outputDirectory); + cache.put(outputDirectory, next + 1); + return next; + } else { + FileSystem fileSystem = outputDirectory.getFileSystem(conf); + int next = fileSystem.listStatus(outputDirectory).length; + cache.put(outputDirectory, next + 1); + return next; + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/To.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/To.java b/crunch-core/src/main/java/org/apache/crunch/io/To.java index d62d294..3bc5412 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/To.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/To.java @@ -82,7 +82,7 @@ public class To { */ public static <K extends Writable, V extends Writable> Target formattedFile( Path path, Class<? extends FileOutputFormat<K, V>> formatClass) { - return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme()); + return new FileTargetImpl(path, formatClass, SequentialFileNamingScheme.getInstance()); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java index 9aa650a..eec600f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java @@ -26,11 +26,11 @@ import org.apache.hadoop.fs.Path; public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { public AvroFileSourceTarget(Path path, AvroType<T> atype) { - this(path, atype, new SequentialFileNamingScheme()); + this(path, atype, SequentialFileNamingScheme.getInstance()); } public AvroFileSourceTarget(Path path, AvroType<T> atype, DatumReader<T> reader) { - this(path, atype, reader, new SequentialFileNamingScheme()); + this(path, atype, reader, SequentialFileNamingScheme.getInstance()); } public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java index 3a9e42c..ea0179f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java @@ -39,7 +39,7 @@ public class AvroFileTarget extends FileTargetImpl { } public AvroFileTarget(Path path) { - this(path, new SequentialFileNamingScheme()); + this(path, SequentialFileNamingScheme.getInstance()); } public AvroFileTarget(Path path, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java index 72a0fd3..376d2ba 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path; public class TrevniKeySourceTarget<T> extends ReadableSourcePathTargetImpl<T> { public TrevniKeySourceTarget(Path path, AvroType<T> atype) { - this(path, atype, new SequentialFileNamingScheme()); + this(path, atype, SequentialFileNamingScheme.getInstance()); } public TrevniKeySourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java index 2fefa59..e1f2ab1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java @@ -52,7 +52,7 @@ public class TrevniKeyTarget extends FileTargetImpl { } public TrevniKeyTarget(Path path) { - this(path, new SequentialFileNamingScheme()); + this(path, SequentialFileNamingScheme.getInstance()); } public TrevniKeyTarget(Path path, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java index a8ff639..c3a1fdc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java @@ -27,7 +27,7 @@ import org.apache.crunch.types.PTableType; public class TableSourcePathTargetImpl<K, V> extends SourcePathTargetImpl<Pair<K, V>> implements TableSource<K, V> { public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target) { - this(source, target, new SequentialFileNamingScheme()); + this(source, target, SequentialFileNamingScheme.getInstance()); } public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java index adc739f..8e020cd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java @@ -30,7 +30,7 @@ public class SeqFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { } public SeqFileSourceTarget(Path path, PType<T> ptype) { - this(path, ptype, new SequentialFileNamingScheme()); + this(path, ptype, SequentialFileNamingScheme.getInstance()); } public SeqFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java index ebdf319..3cfd8cf 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java @@ -34,7 +34,7 @@ public class SeqFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl } public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType) { - this(path, tableType, new SequentialFileNamingScheme()); + this(path, tableType, SequentialFileNamingScheme.getInstance()); } public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java index 60e4739..b23f358 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java @@ -32,7 +32,7 @@ public class SeqFileTarget extends FileTargetImpl { } public SeqFileTarget(Path path) { - this(path, new SequentialFileNamingScheme()); + this(path, SequentialFileNamingScheme.getInstance()); } public SeqFileTarget(Path path, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java index 1d1211e..59f9270 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java @@ -30,7 +30,7 @@ public class TextFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { } public TextFileSourceTarget(Path path, PType<T> ptype) { - this(path, ptype, new SequentialFileNamingScheme()); + this(path, ptype, SequentialFileNamingScheme.getInstance()); } public TextFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java index dec97e5..746b57c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSourceTarget.java @@ -41,7 +41,7 @@ public class TextFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImp } public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType) { - this(path, tableType, new SequentialFileNamingScheme()); + this(path, tableType, SequentialFileNamingScheme.getInstance()); } public TextFileTableSourceTarget(Path path, PTableType<K, V> tableType, http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java index 0c3e6a4..17ae7a6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java @@ -50,7 +50,7 @@ public class TextFileTarget extends FileTargetImpl { } public <T> TextFileTarget(Path path) { - this(path, new SequentialFileNamingScheme()); + this(path, SequentialFileNamingScheme.getInstance()); } public <T> TextFileTarget(Path path, FileNamingScheme fileNamingScheme) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java b/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java index 467da15..e429c18 100644 --- a/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java @@ -44,7 +44,7 @@ public class SequentialFileNamingSchemeTest { @Before public void setUp() throws IOException { configuration = new Configuration(); - namingScheme = new SequentialFileNamingScheme(); + namingScheme = SequentialFileNamingScheme.getInstance(); } @Test http://git-wip-us.apache.org/repos/asf/crunch/blob/8d47b3b5/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java index bc51b2c..1cef4fa 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java @@ -44,7 +44,7 @@ public class HFileTarget extends FileTargetImpl { } public HFileTarget(Path path, HColumnDescriptor hcol) { - super(path, HFileOutputFormatForCrunch.class, new SequentialFileNamingScheme()); + super(path, HFileOutputFormatForCrunch.class, SequentialFileNamingScheme.getInstance()); this.hcol = Preconditions.checkNotNull(hcol); }
