Repository: crunch Updated Branches: refs/heads/master 7157c0a1f -> ebb1b2e32
CRUNCH-494: Add Pipeline.union methods to avoid need to chain long unions of PCollections Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ebb1b2e3 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ebb1b2e3 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ebb1b2e3 Branch: refs/heads/master Commit: ebb1b2e32a901caaa2858e9d12536ce1095c1696 Parents: 7157c0a Author: Josh Wills <[email protected]> Authored: Thu Jan 29 23:21:03 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Jan 30 16:57:04 2015 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/Pipeline.java | 6 +++++ .../crunch/impl/dist/DistributedPipeline.java | 26 ++++++++++++++++++++ .../org/apache/crunch/impl/mem/MemPipeline.java | 20 +++++++++++++++ .../crunch/impl/mem/collect/MemCollection.java | 10 ++------ .../crunch/impl/mem/collect/MemTable.java | 11 +++------ 5 files changed, 57 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/Pipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java index ee11fee..cd4ce03 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java @@ -21,6 +21,8 @@ import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; +import java.util.List; + /** * Manages the state of a pipeline execution. * @@ -189,6 +191,10 @@ public interface Pipeline { */ <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype, CreateOptions options); + <S> PCollection<S> union(List<PCollection<S>> collections); + + <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables); + /** * Executes the given {@code PipelineCallable} on the client after the {@code Targets} * that the PipelineCallable depends on (if any) have been created by other pipeline http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java index 61c01f1..88da5a6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java @@ -17,7 +17,9 @@ */ package org.apache.crunch.impl.dist; +import com.google.common.base.Function; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.crunch.CreateOptions; @@ -44,6 +46,7 @@ import org.apache.crunch.impl.dist.collect.EmptyPCollection; import org.apache.crunch.impl.dist.collect.EmptyPTable; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.dist.collect.PCollectionFactory; +import org.apache.crunch.impl.dist.collect.PTableBase; import org.apache.crunch.io.From; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; @@ -59,6 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -126,6 +130,28 @@ public abstract class DistributedPipeline implements Pipeline { } @Override + public <S> PCollection<S> union(List<PCollection<S>> collections) { + return factory.createUnionCollection( + Lists.transform(collections, new Function<PCollection<S>, PCollectionImpl<S>>() { + @Override + public PCollectionImpl<S> apply(PCollection<S> in) { + return (PCollectionImpl<S>) in; + } + })); + } + + @Override + public <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables) { + return factory.createUnionTable( + Lists.transform(tables, new Function<PTable<K, V>, PTableBase<K, V>>() { + @Override + public PTableBase<K, V> apply(PTable<K, V> in) { + return (PTableBase<K, V>) in; + } + })); + } + + @Override public <Output> Output sequentialDo(PipelineCallable<Output> pipelineCallable) { allPipelineCallables.put(pipelineCallable, getDependencies(pipelineCallable)); PipelineCallable last = currentPipelineCallable; http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index e61b6dc..4d4d5dd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -18,6 +18,7 @@ package org.apache.crunch.impl.mem; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -27,6 +28,7 @@ import java.util.concurrent.TimeoutException; import com.google.common.base.Charsets; +import com.google.common.collect.Iterables; import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.crunch.CachingOptions; @@ -364,6 +366,24 @@ public class MemPipeline implements Pipeline { } @Override + public <S> PCollection<S> union(List<PCollection<S>> collections) { + List<S> output = Lists.newArrayList(); + for (PCollection<S> pcollect : collections) { + Iterables.addAll(output, pcollect.materialize()); + } + return new MemCollection<S>(output, collections.get(0).getPType()); + } + + @Override + public <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables) { + List<Pair<K, V>> values = Lists.newArrayList(); + for (PTable<K, V> table : tables) { + Iterables.addAll(values, table.materialize()); + } + return new MemTable<K, V>(values, tables.get(0).getPTableType(), null); + } + + @Override public <Output> Output sequentialDo(PipelineCallable<Output> callable) { Output out = callable.generateOutput(this); try { http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index eaaab59..55b7821 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -93,14 +93,8 @@ public class MemCollection<S> implements PCollection<S> { @Override public PCollection<S> union(PCollection<S>... collections) { - Collection<S> output = Lists.newArrayList(); - for (PCollection<S> pcollect : collections) { - for (S s : pcollect.materialize()) { - output.add(s); - } - } - output.addAll(collect); - return new MemCollection<S>(output, collections[0].getPType()); + return getPipeline().union( + ImmutableList.<PCollection<S>>builder().add(this).add(collections).build()); } private <S, T> DoFn<S, T> verifySerializable(String name, DoFn<S, T> doFn) { http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java index 60279a9..3f3bd77 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableList; import org.apache.crunch.CachingOptions; import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; @@ -62,14 +63,8 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable< @Override public PTable<K, V> union(PTable<K, V>... others) { - List<Pair<K, V>> values = Lists.newArrayList(); - values.addAll(getCollection()); - for (PTable<K, V> ptable : others) { - for (Pair<K, V> p : ptable.materialize()) { - values.add(p); - } - } - return new MemTable<K, V>(values, others[0].getPTableType(), null); + return getPipeline().unionTables( + ImmutableList.<PTable<K, V>>builder().add(this).add(others).build()); } @Override
