Hi,
I encountered a situation where I need different behaviour of my CombineFn
during combine & reduce phase.
Basically, I have a collection of avro records that I need to combine.
For some of these, I have so many records with same key that I need to
combine them first to make my job work (memory & timing constraints)
For others, I can't combine them, because I need all records together.
So, basically I would want to know in my function if it's combining or
reducing.
The only way to solve my problem in crunch right now seems to be to first
split my collection in 2 different collections, combine them separately &
union them again.
But this give a lot of overhead for something that would be supported by
native M/R.
I looked in the code and it seems that crunch internally has a NodeContext
object to indicate COMBINE or REDUCE, but this context is not accessible in
the DoFn.
As the (RT)Node object is an internal crunch object, it's also not a clean
solution to expose the NodeContext.
So, as a better solution, it would be possible to create a new method:
combineValues(combineFn, reduceFn) on PGroupedTable. The existing
combineValues(combineFn) is in that case just a convenience method for must
use cases, where the combineFn & reduceFn is the same function.
With this new method, I would be able to just create my combineFn twice &
pass a boolean in the constructor to indicate if it's combine or reduce.
I already made a patch to add this function, but as the procedure indicates
to discuss the change first, I'll write this mail first to check what you
think. (I also didn't test my patch yet, although all unit & IT still pass)
Thanks
Stefan
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
index 68085c6..71e627a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -35,6 +35,17 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
* @return A {@code PTable} where each key has a single value
*/
PTable<K, V> combineValues(CombineFn<K, V> combineFn);
+
+ /**
+ * Combines & Reduces the values of this grouping using the given {@code CombineFn}.
+ *
+ * @param combineFn
+ * The combiner function during combine phase
+ * @param combineFn
+ * The combiner function during reduce phase
+ * @return A {@code PTable} where each key has a single value
+ */
+ PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn);
/**
* Combine the values in each group using the given {@link Aggregator}.diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 12c17b6..1b2d663 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -106,9 +106,15 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
@Override
public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
- return parallelDo(combineFn, parent.getPTableType());
+ return combineValues(combineFn, combineFn);
}
-
+
+ @Override
+ public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
+ //return parallelDo(combineFn, parent.getPTableType()).groupByKey().parallelDo(reduceFn, parent.getPTableType());
+ return parallelDo(reduceFn, parent.getPTableType());
+ }
+
@Override
public PTable<K, V> combineValues(Aggregator<V> agg) {
return combineValues(Aggregators.<K, V>toCombineFn(agg));diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 28e2504..29ec1e9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableList;
public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> {
private final PCollectionImpl<?> parent;
+ private final DoFn<?, Pair<K, V>> combineFn;
private final DoFn<?, Pair<K, V>> fn;
private final PTableType<K, V> type;
@@ -42,10 +43,27 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V>
<S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype,
ParallelDoOptions options) {
- super(name, options);
- this.parent = parent;
- this.fn = fn;
- this.type = ntype;
+ this(name, parent, asCombineFn(fn), fn, ntype, options);
+ }
+
+ private static <S, K, V> DoFn<S, Pair<K, V>> asCombineFn(final DoFn<S, Pair<K, V>> fn) {
+ if (fn instanceof CombineFn) {
+ return fn;
+ }
+ return null;
+ }
+ <S> DoTableImpl(final String name, final PCollectionImpl<S> parent, final DoFn<S, Pair<K, V>> combineFn,
+ final DoFn<S, Pair<K, V>> fn, final PTableType<K, V> ntype) {
+ this(name, parent, combineFn, fn, ntype, ParallelDoOptions.builder().build());
+ }
+
+ <S> DoTableImpl(final String name, final PCollectionImpl<S> parent, final DoFn<S, Pair<K, V>> combineFn,
+ final DoFn<S, Pair<K, V>> fn, final PTableType<K, V> ntype, final ParallelDoOptions options) {
+ super(name, options);
+ this.parent = parent;
+ this.combineFn = combineFn;
+ this.fn = fn;
+ this.type = ntype;
}
@Override
@@ -77,9 +95,13 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V>
public DoNode createDoNode() {
return DoNode.createFnNode(getName(), fn, type, doOptions);
}
-
+
+ public DoNode createCombineNode() {
+ return DoNode.createFnNode(getName(), combineFn, type, doOptions);
+ }
+
public boolean hasCombineFn() {
- return fn instanceof CombineFn;
+ return combineFn != null;
}
@Overridediff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index 7eb2b09..95913f5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -87,8 +87,13 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
}
@Override
+ public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
+ return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, reduceFn, parent.getPTableType());
+ }
+
+ @Override
public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
- return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType());
+ return combineValues(combineFn, combineFn);
}
@Override
@@ -164,4 +169,5 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
// TODO This should be implemented in a cleaner way in the planner
return new PGroupedTableImpl<K, V>(parent, groupingOptions);
}
+
}diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 0699db5..a192a22 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -188,7 +188,7 @@ class JobPrototype {
if (combineFnTable != null) {
job.setCombinerClass(CrunchCombiner.class);
DoNode combinerInputNode = group.createDoNode();
- DoNode combineNode = combineFnTable.createDoNode();
+ DoNode combineNode = combineFnTable.createCombineNode();
combineNode.addChild(group.getGroupingNode());
combinerInputNode.addChild(combineNode);
serialize(ImmutableList.of(combinerInputNode), conf, workingPath, NodeContext.COMBINE);