Updated Branches: refs/heads/master 655df3c45 -> 09624fe3c
CRUNCH-287: Switch internal APIs and integration tests to use ReadableData. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/09624fe3 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/09624fe3 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/09624fe3 Branch: refs/heads/master Commit: 09624fe3c79fe8bdd94b1449b394911e9d931e12 Parents: 655df3c Author: Josh Wills <[email protected]> Authored: Fri Oct 25 12:56:05 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Fri Oct 25 12:56:05 2013 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/DependentSourcesIT.java | 7 +- .../apache/crunch/LongPipelinePlannerIT.java | 4 +- .../lib/join/BloomFilterJoinStrategy.java | 80 +++++++------------- 3 files changed, 33 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/09624fe3/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java b/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java index 36bd7a7..ab2fed6 100644 --- a/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java @@ -23,6 +23,7 @@ import static org.apache.crunch.types.avro.Avros.tableOf; import java.util.List; import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRJob; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.MRPipelineExecution; @@ -47,7 +48,7 @@ public class DependentSourcesIT { tmpDir.copyResourcePath("shakes.txt"), tmpDir.getFileName("out")); } - + public static void run(MRPipeline p, Path inputPath, String out) throws Exception { PCollection<String> in = p.read(From.textFile(inputPath)); PTable<String, String> op = in.parallelDo("op1", new DoFn<String, Pair<String, String>>() { @@ -59,10 +60,10 @@ public class DependentSourcesIT { } }, tableOf(strings(), strings())); - SourceTarget src = (SourceTarget)((MaterializableIterable<Pair<String, String>>) op.materialize()).getSource(); + ReadableData<Pair<String, String>> rd = op.asReadable(true); op = op.parallelDo("op2", IdentityFn.<Pair<String,String>>getInstance(), tableOf(strings(), strings()), - ParallelDoOptions.builder().sourceTargets(src).build()); + ParallelDoOptions.builder().sourceTargets(rd.getSourceTargets()).build()); PCollection<String> output = op.values(); output.write(To.textFile(out)); http://git-wip-us.apache.org/repos/asf/crunch/blob/09624fe3/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java b/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java index 2cd63f2..5e0b423 100644 --- a/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java @@ -68,8 +68,8 @@ public class LongPipelinePlannerIT { } }, strings()); - MaterializableIterable matIt = (MaterializableIterable)iso.materialize(); - ParallelDoOptions.Builder builder = ParallelDoOptions.builder().sourceTargets((SourceTarget)matIt.getSource()); + ReadableData<String> isoRD = iso.asReadable(true); + ParallelDoOptions.Builder builder = ParallelDoOptions.builder().sourceTargets(isoRD.getSourceTargets()); PTable<Integer, String> splitMap = keyedLower.parallelDo("split-map", new MapFn<Pair<Integer, String>, Pair<Integer, String>>() { http://git-wip-us.apache.org/repos/asf/crunch/blob/09624fe3/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java index 6faef56..a62c39e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java @@ -34,10 +34,8 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; -import org.apache.crunch.SourceTarget; +import org.apache.crunch.ReadableData; import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.io.ReadableSourceTarget; -import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroType; @@ -46,9 +44,7 @@ import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableType; import org.apache.crunch.types.writable.WritableTypeFamily; import org.apache.crunch.types.writable.Writables; -import org.apache.crunch.util.DistCache; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.bloom.BloomFilter; @@ -127,40 +123,30 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { @Override public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) { - + if (joinType != JoinType.INNER_JOIN && joinType != JoinType.LEFT_OUTER_JOIN) { throw new IllegalStateException("JoinType " + joinType + " is not supported for BloomFilter joins"); } - + PTable<K,V> filteredRightSide; - if (left.getPipeline() instanceof MRPipeline) { - PType<BloomFilter> bloomFilterType = getBloomFilterType(left.getTypeFamily()); - PCollection<BloomFilter> bloomFilters = left.keys().parallelDo( - "Create bloom filters", - new CreateBloomFilterFn(vectorSize, nbHash, left.getKeyType()), - bloomFilterType); - - MaterializableIterable<BloomFilter> materializableIterable = (MaterializableIterable<BloomFilter>) bloomFilters.materialize(); - FilterKeysWithBloomFilterFn<K, V> filterKeysFn = new FilterKeysWithBloomFilterFn<K, V>( - materializableIterable.getPath().toString(), - vectorSize, nbHash, - left.getKeyType(), bloomFilterType); - - ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder(); - if (materializableIterable.isSourceTarget()) { - optionsBuilder.sourceTargets((SourceTarget) materializableIterable.getSource()); - } - - filteredRightSide = right.parallelDo("Filter right side with BloomFilters", - filterKeysFn, right.getPTableType(), optionsBuilder.build()); - - // TODO This shouldn't be necessary due to the ParallelDoOptions, but it seems to be needed somehow - left.getPipeline().run(); - } else { - LOG.warn("Not using Bloom filters outside of MapReduce context"); - filteredRightSide = right; - } - + PType<BloomFilter> bloomFilterType = getBloomFilterType(left.getTypeFamily()); + PCollection<BloomFilter> bloomFilters = left.keys().parallelDo( + "Create bloom filters", + new CreateBloomFilterFn(vectorSize, nbHash, left.getKeyType()), + bloomFilterType); + + ReadableData<BloomFilter> bloomData = bloomFilters.asReadable(true); + FilterKeysWithBloomFilterFn<K, V> filterKeysFn = new FilterKeysWithBloomFilterFn<K, V>( + bloomData, + vectorSize, nbHash, + left.getKeyType()); + + ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder(); + optionsBuilder.sourceTargets(bloomData.getSourceTargets()); + + filteredRightSide = right.parallelDo("Filter right side with BloomFilters", + filterKeysFn, right.getPTableType(), optionsBuilder.build()); + return delegateJoinStrategy.join(left, filteredRightSide, joinType); } @@ -206,50 +192,38 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { */ private static class FilterKeysWithBloomFilterFn<K,V> extends FilterFn<Pair<K, V>> { - private String inputPath; private int vectorSize; private int nbHash; private PType<K> keyType; private PType<BloomFilter> bloomFilterPType; private BloomFilter bloomFilter; private transient MapFn<K,byte[]> keyToBytesFn; - - FilterKeysWithBloomFilterFn(String inputPath, int vectorSize, int nbHash, PType<K> keyType, PType<BloomFilter> bloomFilterPtype) { - this.inputPath = inputPath; + private ReadableData<BloomFilter> bloomData; + + FilterKeysWithBloomFilterFn(ReadableData<BloomFilter> bloomData, int vectorSize, int nbHash, PType<K> keyType) { + this.bloomData = bloomData; this.vectorSize = vectorSize; this.nbHash = nbHash; this.keyType = keyType; - this.bloomFilterPType = bloomFilterPtype; } - private Path getCacheFilePath() { - Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration()); - if (local == null) { - throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'"); - } - return local; - } - @Override public void configure(Configuration conf) { - DistCache.addCacheFile(new Path(inputPath), conf); + bloomData.configure(conf); } @Override public void initialize() { super.initialize(); - bloomFilterPType.initialize(getConfiguration()); keyType.initialize(getConfiguration()); keyToBytesFn = getKeyToBytesMapFn(keyType, getConfiguration()); - ReadableSourceTarget<BloomFilter> sourceTarget = bloomFilterPType.getDefaultFileSource( - getCacheFilePath()); Iterable<BloomFilter> iterable; try { - iterable = sourceTarget.read(getConfiguration()); + iterable = bloomData.read(getContext()); } catch (IOException e) { throw new CrunchRuntimeException("Error reading right-side of map side join: ", e); }
