http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java index 8f99a0b..67c624d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java @@ -26,10 +26,9 @@ import com.google.common.collect.Maps; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; -import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; -import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; +import org.apache.crunch.impl.dist.collect.BaseGroupedTable; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -65,17 +64,6 @@ class Edge { return paths; } - private static boolean readWriteOutput(PCollectionImpl<?> pc, Map<PCollectionImpl<?>, Set<Target>> outputs) { - if (outputs.containsKey(pc)) { - for (Target t : outputs.get(pc)) { - if (t instanceof SourceTarget || t.asSourceTarget(pc.getPType()) != null) { - return true; - } - } - } - return false; - } - public Map<NodePath, PCollectionImpl> getSplitPoints(Map<PCollectionImpl<?>, Set<Target>> outputs) { List<NodePath> np = Lists.newArrayList(paths); List<PCollectionImpl<?>> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size()); @@ -86,7 +74,7 @@ class Edge { boolean breakpoint = false; PCollectionImpl<?> best = null; for (PCollectionImpl<?> pc : np.get(i)) { - if (!(pc instanceof PGroupedTableImpl)) { + if (!(pc instanceof BaseGroupedTable)) { if (pc.isBreakpoint()) { if (!breakpoint || pc.getSize() < bestSize) { best = pc;
http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java index ce0a847..220bf19 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java @@ -23,7 +23,7 @@ import java.util.Map; import java.util.Set; import org.apache.crunch.Pair; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java index 925c39a..a18cda0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java @@ -17,12 +17,12 @@ */ package org.apache.crunch.impl.mr.plan; -import org.apache.crunch.impl.mr.collect.DoCollectionImpl; -import org.apache.crunch.impl.mr.collect.DoTableImpl; -import org.apache.crunch.impl.mr.collect.InputCollection; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; -import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; -import org.apache.crunch.impl.mr.collect.UnionCollection; +import org.apache.crunch.impl.dist.collect.BaseDoCollection; +import org.apache.crunch.impl.dist.collect.BaseDoTable; +import org.apache.crunch.impl.dist.collect.BaseGroupedTable; +import org.apache.crunch.impl.dist.collect.BaseInputCollection; +import org.apache.crunch.impl.dist.collect.BaseUnionCollection; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; /** * @@ -44,13 +44,13 @@ class GraphBuilder implements PCollectionImpl.Visitor { } @Override - public void visitInputCollection(InputCollection<?> collection) { + public void visitInputCollection(BaseInputCollection<?> collection) { Vertex v = graph.addVertex(collection, false); graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection)); } @Override - public void visitUnionCollection(UnionCollection<?> collection) { + public void visitUnionCollection(BaseUnionCollection<?> collection) { Vertex baseVertex = workingVertex; NodePath basePath = workingPath; for (PCollectionImpl<?> parent : collection.getParents()) { @@ -61,19 +61,19 @@ class GraphBuilder implements PCollectionImpl.Visitor { } @Override - public void visitDoFnCollection(DoCollectionImpl<?> collection) { + public void visitDoCollection(BaseDoCollection<?> collection) { workingPath.push(collection); processParent(collection.getOnlyParent()); } @Override - public void visitDoTable(DoTableImpl<?, ?> collection) { + public void visitDoTable(BaseDoTable<?, ?> collection) { workingPath.push(collection); processParent(collection.getOnlyParent()); } @Override - public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) { + public void visitGroupedTable(BaseGroupedTable<?, ?> collection) { Vertex v = graph.addVertex(collection, false); graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection)); workingVertex = v; http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index a192a22..e7a1e17 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -26,8 +26,9 @@ import java.util.Set; import org.apache.crunch.Pipeline; import org.apache.crunch.Target; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; -import org.apache.crunch.impl.mr.collect.DoTableImpl; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.DoTable; +import org.apache.crunch.impl.dist.collect.MRCollection; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; import org.apache.crunch.impl.mr.exec.CrunchJobHooks; import org.apache.crunch.impl.mr.run.CrunchCombiner; @@ -69,7 +70,7 @@ class JobPrototype { private HashMultimap<Target, NodePath> mapSideNodePaths; private HashMultimap<Target, NodePath> targetsToNodePaths; - private DoTableImpl<?, ?> combineFnTable; + private DoTable<?, ?> combineFnTable; private CrunchControlledJob job; @@ -200,7 +201,7 @@ class JobPrototype { Set<DoNode> mapNodes = Sets.newHashSet(mapSideNodes); for (NodePath nodePath : mapNodePaths) { // Advance these one step, since we've already configured - // the grouping node, and the PGroupedTableImpl is the tail + // the grouping node, and the BaseGroupedTable is the tail // of the NodePath. Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator(); iter.next(); @@ -256,11 +257,11 @@ class JobPrototype { PCollectionImpl<?> collect = iter.next(); if (combineFnTable != null && !(collect instanceof PGroupedTableImpl)) { combineFnTable = null; - } else if (collect instanceof DoTableImpl && ((DoTableImpl<?, ?>) collect).hasCombineFn()) { - combineFnTable = (DoTableImpl<?, ?>) collect; + } else if (collect instanceof DoTable && ((DoTable<?, ?>) collect).hasCombineFn()) { + combineFnTable = (DoTable<?, ?>) collect; } if (!nodes.containsKey(collect)) { - nodes.put(collect, collect.createDoNode()); + nodes.put(collect, ((MRCollection) collect).createDoNode()); } DoNode parent = nodes.get(collect); parent.addChild(working); http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 96c9125..97ac866 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -26,9 +26,9 @@ import java.util.TreeMap; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.collect.InputCollection; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.materialize.MaterializableIterable; @@ -57,7 +57,7 @@ public class MSCRPlanner { // Used to ensure that we always build pipelines starting from the deepest // outputs, which helps ensure that we handle intermediate outputs correctly. - private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() { + static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() { @Override public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) { int cmp = right.getDepth() - left.getDepth(); http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java index a090d93..03d39af 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java @@ -20,7 +20,7 @@ package org.apache.crunch.impl.mr.plan; import java.util.Iterator; import java.util.LinkedList; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; import com.google.common.collect.Lists; @@ -41,7 +41,7 @@ class NodePath implements Iterable<PCollectionImpl<?>> { } public void push(PCollectionImpl<?> stage) { - this.path.push((PCollectionImpl<?>) stage); + this.path.push(stage); } public NodePath close(PCollectionImpl<?> head) { http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java index f4aa668..1a77e58 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java @@ -23,9 +23,9 @@ import java.util.Set; import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.crunch.Source; -import org.apache.crunch.impl.mr.collect.InputCollection; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; -import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; +import org.apache.crunch.impl.dist.collect.BaseGroupedTable; +import org.apache.crunch.impl.dist.collect.BaseInputCollection; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -51,11 +51,11 @@ class Vertex { } public boolean isInput() { - return impl instanceof InputCollection; + return impl instanceof BaseInputCollection; } public boolean isGBK() { - return impl instanceof PGroupedTableImpl; + return impl instanceof BaseGroupedTable; } public void setOutput() { @@ -68,7 +68,7 @@ class Vertex { public Source getSource() { if (isInput()) { - return ((InputCollection) impl).getSource(); + return ((BaseInputCollection) impl).getSource(); } return null; } @@ -92,10 +92,6 @@ class Vertex { return n; } - public Set<Edge> getAllEdges() { - return Sets.union(incoming, outgoing); - } - public Set<Edge> getIncomingEdges() { return incoming; } http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java index 1f542df..bda6f1a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java @@ -20,8 +20,8 @@ package org.apache.crunch.impl.mr.run; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import org.apache.crunch.io.FormatBundle; import org.apache.hadoop.conf.Configurable; @@ -98,27 +98,40 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable { } public void readFields(DataInput in) throws IOException { + if (conf == null) { + conf = new Configuration(); + } nodeIndex = in.readInt(); bundle = new FormatBundle(); bundle.setConf(conf); bundle.readFields(in); bundle.configure(conf); // yay bootstrap! Class<? extends InputSplit> inputSplitClass = readClass(in); - inputSplit = ReflectionUtils.newInstance(inputSplitClass, conf); - SerializationFactory factory = new SerializationFactory(conf); - Deserializer deserializer = factory.getDeserializer(inputSplitClass); - deserializer.open((DataInputStream) in); - inputSplit = (InputSplit) deserializer.deserialize(inputSplit); + inputSplit = (InputSplit) ReflectionUtils.newInstance(inputSplitClass, conf); + if (inputSplit instanceof Writable) { + ((Writable) inputSplit).readFields(in); + } else { + SerializationFactory factory = new SerializationFactory(conf); + Deserializer deserializer = factory.getDeserializer(inputSplitClass); + deserializer.open((DataInputStream) in); + inputSplit = (InputSplit) deserializer.deserialize(inputSplit); + deserializer.close(); + } } public void write(DataOutput out) throws IOException { out.writeInt(nodeIndex); bundle.write(out); Text.writeString(out, inputSplit.getClass().getName()); - SerializationFactory factory = new SerializationFactory(conf); - Serializer serializer = factory.getSerializer(inputSplit.getClass()); - serializer.open((DataOutputStream) out); - serializer.serialize(inputSplit); + if (inputSplit instanceof Writable) { + ((Writable) inputSplit).write(out); + } else { + SerializationFactory factory = new SerializationFactory(conf); + Serializer serializer = factory.getSerializer(inputSplit.getClass()); + serializer.open((OutputStream) out); + serializer.serialize(inputSplit); + serializer.close(); + } } private Class readClass(DataInput in) throws IOException { http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index 7472e3d..f3fd397 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -144,7 +144,11 @@ public class FileTargetImpl implements PathTarget { } protected Path getSourcePattern(Path workingPath, int index) { - return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*"); + if (index < 0) { + return new Path(workingPath, "part-*"); + } else { + return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*"); + } } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java index 32bff38..1c89cb9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java @@ -103,6 +103,7 @@ public class SecondarySort { PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(), ptf.collections(input.getValueType())); GroupingOptions.Builder gob = GroupingOptions.builder() + .requireSortedKeys() .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf)) .partitionerClass(JoinUtils.getPartitionerClass(ptf)); if (numReducers > 0) { http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java index 011d9cd..3d4f68b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java @@ -240,6 +240,7 @@ public class Sort { } else if (tf == AvroTypeFamily.getInstance()) { builder.conf("crunch.schema", ((AvroType<K>) ptype).getSchema().toString()); } + builder.requireSortedKeys(); configureReducers(builder, ptable, conf, numReducers); return builder.build(); } @@ -270,6 +271,7 @@ public class Sort { } else { throw new RuntimeException("Unrecognized type family: " + tf); } + builder.requireSortedKeys(); configureReducers(builder, ptable, conf, numReducers); return builder.build(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java index bfc8ab3..580510b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java @@ -99,6 +99,7 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { }, ptt); GroupingOptions.Builder optionsBuilder = GroupingOptions.builder(); + optionsBuilder.requireSortedKeys(); optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf)); if (numReducers > 0) { optionsBuilder.numReducers(numReducers); http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java index 3f031cb..c717ab7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java @@ -91,10 +91,17 @@ public class JoinUtils { } } - public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> { + public static class AvroIndexedRecordPartitioner extends Partitioner<Object, Object> { @Override - public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) { - IndexedRecord record = (IndexedRecord) key.datum(); + public int getPartition(Object key, Object value, int numPartitions) { + IndexedRecord record; + if (key instanceof AvroWrapper) { + record = (IndexedRecord) ((AvroWrapper) key).datum(); + } else if (key instanceof IndexedRecord) { + record = (IndexedRecord) key; + } else { + throw new UnsupportedOperationException("Unknown avro key type: " + key); + } return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/util/DelegatingReadableData.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DelegatingReadableData.java b/crunch-core/src/main/java/org/apache/crunch/util/DelegatingReadableData.java new file mode 100644 index 0000000..d852c4a --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/util/DelegatingReadableData.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import org.apache.crunch.DoFn; +import org.apache.crunch.ReadableData; +import org.apache.crunch.SourceTarget; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; + +/** + * Implements the {@code ReadableData<T>} interface by delegating to an {@code ReadableData<S>} instance + * and passing its contents through a {@code DoFn<S, T>}. + */ +public class DelegatingReadableData<S, T> implements ReadableData<T> { + + private final ReadableData<S> delegate; + private final DoFn<S, T> fn; + + public DelegatingReadableData(ReadableData<S> delegate, DoFn<S, T> fn) { + this.delegate = delegate; + this.fn = fn; + } + + @Override + public Set<SourceTarget<?>> getSourceTargets() { + return delegate.getSourceTargets(); + } + + @Override + public void configure(Configuration conf) { + delegate.configure(conf); + fn.configure(conf); + } + + @Override + public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException { + fn.setContext(context); + fn.initialize(); + final Iterable<S> delegateIterable = delegate.read(context); + return new Iterable<T>() { + @Override + public Iterator<T> iterator() { + return new DoFnIterator<S, T>(delegateIterable.iterator(), fn); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java b/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java new file mode 100644 index 0000000..0877a8f --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import com.google.common.collect.Lists; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; + +import java.util.Iterator; +import java.util.LinkedList; + +/** + * An {@code Iterator<T>} that combines a delegate {@code Iterator<S>} and a {@code DoFn<S, T>}, generating + * data by passing the contents of the iterator through the function. Note that the input {@code DoFn} should + * have both its {@code setContext} and {@code initialize} functions called <b>before</b> it is passed to + * the constructor. + * + * @param <S> The type of the delegate iterator + * @param <T> The returned type + */ +public class DoFnIterator<S, T> implements Iterator<T> { + + private final Iterator<S> iter; + private final DoFn<S, T> fn; + private CacheEmitter<T> cache; + private boolean cleanup; + + public DoFnIterator(Iterator<S> iter, DoFn<S, T> fn) { + this.iter = iter; + this.fn = fn; + this.cache = new CacheEmitter<T>(); + this.cleanup = false; + } + + @Override + public boolean hasNext() { + while (cache.isEmpty() && iter.hasNext()) { + fn.process(iter.next(), cache); + } + if (cache.isEmpty() && !cleanup) { + fn.cleanup(cache); + cleanup = true; + } + return !cache.isEmpty(); + } + + @Override + public T next() { + return cache.poll(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private static class CacheEmitter<T> implements Emitter<T> { + + private final LinkedList<T> cache; + + private CacheEmitter() { + this.cache = Lists.newLinkedList(); + } + + public boolean isEmpty() { + return cache.isEmpty(); + } + + public T poll() { + return cache.poll(); + } + + @Override + public void emit(T emitted) { + cache.add(emitted); + } + + @Override + public void flush() { + // No-op + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/util/UnionReadableData.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/UnionReadableData.java b/crunch-core/src/main/java/org/apache/crunch/util/UnionReadableData.java new file mode 100644 index 0000000..7d6f65b --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/util/UnionReadableData.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.crunch.ReadableData; +import org.apache.crunch.SourceTarget; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class UnionReadableData<T> implements ReadableData<T> { + + private final List<ReadableData<T>> data; + + public UnionReadableData(List<ReadableData<T>> data) { + this.data = data; + } + + @Override + public Set<SourceTarget<?>> getSourceTargets() { + Set<SourceTarget<?>> srcTargets = Sets.newHashSet(); + for (ReadableData<T> rd: data) { + srcTargets.addAll(rd.getSourceTargets()); + } + return srcTargets; + } + + @Override + public void configure(Configuration conf) { + for (ReadableData<T> rd : data) { + rd.configure(conf); + } + } + + @Override + public Iterable<T> read(final TaskInputOutputContext<?, ?, ?, ?> context) throws IOException { + List<Iterable<T>> iterables = Lists.newArrayList(); + for (ReadableData<T> rd : data) { + iterables.add(rd.read(context)); + } + return Iterables.concat(iterables); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoCollectionTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoCollectionTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoCollectionTest.java new file mode 100644 index 0000000..75b2c2c --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoCollectionTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.dist.collect; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.ReadableData; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.junit.Test; + +public class DoCollectionTest { + + @Test + public void testGetSizeInternal_NoScaleFactor() { + runScaleTest(100L, 1.0f, 100L); + } + + @Test + public void testGetSizeInternal_ScaleFactorBelowZero() { + runScaleTest(100L, 0.5f, 50L); + } + + @Test + public void testGetSizeInternal_ScaleFactorAboveZero() { + runScaleTest(100L, 1.5f, 150L); + } + + private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) { + PCollectionImpl<String> parentCollection = new SizedPCollectionImpl("Sized collection", inputSize); + + BaseDoCollection<String> doCollection = new BaseDoCollection<String>("Scaled collection", parentCollection, + new ScaledFunction(scaleFactor), Writables.strings(), ParallelDoOptions.builder().build()); + + assertEquals(expectedScaledSize, doCollection.getSizeInternal()); + } + + static class ScaledFunction extends DoFn<String, String> { + + private float scaleFactor; + + public ScaledFunction(float scaleFactor) { + this.scaleFactor = scaleFactor; + } + + @Override + public void process(String input, Emitter<String> emitter) { + emitter.emit(input); + } + + @Override + public float scaleFactor() { + return scaleFactor; + } + + } + + static class SizedPCollectionImpl extends PCollectionImpl<String> { + + private long internalSize; + + public SizedPCollectionImpl(String name, long internalSize) { + super(name, null); + this.internalSize = internalSize; + } + + @Override + public PType getPType() { + return null; + } + + @Override + public List getParents() { + return null; + } + + @Override + protected void acceptInternal(Visitor visitor) { + } + + @Override + protected ReadableData<String> getReadableDataInternal() { + return null; + } + + @Override + protected long getSizeInternal() { + return internalSize; + } + + @Override + public long getLastModifiedAt() { + return -1; + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoTableImplTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoTableImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoTableImplTest.java new file mode 100644 index 0000000..0fd557f --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoTableImplTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.dist.collect; + +import static org.apache.crunch.types.writable.Writables.strings; +import static org.apache.crunch.types.writable.Writables.tableOf; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.Pair; +import org.apache.crunch.ParallelDoOptions; +import org.junit.Test; + +public class DoTableImplTest { + + @Test + public void testGetSizeInternal_NoScaleFactor() { + runScaleTest(100L, 1.0f, 100L); + } + + @Test + public void testGetSizeInternal_ScaleFactorBelowZero() { + runScaleTest(100L, 0.5f, 50L); + } + + @Test + public void testGetSizeInternal_ScaleFactorAboveZero() { + runScaleTest(100L, 1.5f, 150L); + } + + private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) { + + @SuppressWarnings("unchecked") + PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class); + when(parentCollection.getPipeline()).thenReturn(null); + when(parentCollection.getSize()).thenReturn(inputSize); + + BaseDoTable<String, String> doTable = new BaseDoTable<String, String>("Scalled table collection", + parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(), strings()), + ParallelDoOptions.builder().build()); + + assertEquals(expectedScaledSize, doTable.getSizeInternal()); + verify(parentCollection).getPipeline(); + verify(parentCollection).getSize(); + + verifyNoMoreInteractions(parentCollection); + } + + static class TableScaledFunction extends DoFn<String, Pair<String, String>> { + + private float scaleFactor; + + public TableScaledFunction(float scaleFactor) { + this.scaleFactor = scaleFactor; + } + + @Override + public float scaleFactor() { + return scaleFactor; + } + + @Override + public void process(String input, Emitter<Pair<String, String>> emitter) { + emitter.emit(Pair.of(input, input)); + + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java index 9ed7a46..d04b62b 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java @@ -25,7 +25,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import org.apache.crunch.SourceTarget; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.types.avro.Avros; http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java deleted file mode 100644 index 4b607b1..0000000 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.collect; - -import static org.junit.Assert.assertEquals; - -import java.util.List; - -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; -import org.apache.crunch.ReadableData; -import org.apache.crunch.impl.mr.plan.DoNode; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.writable.Writables; -import org.junit.Test; - -public class DoCollectionImplTest { - - @Test - public void testGetSizeInternal_NoScaleFactor() { - runScaleTest(100L, 1.0f, 100L); - } - - @Test - public void testGetSizeInternal_ScaleFactorBelowZero() { - runScaleTest(100L, 0.5f, 50L); - } - - @Test - public void testGetSizeInternal_ScaleFactorAboveZero() { - runScaleTest(100L, 1.5f, 150L); - } - - private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) { - PCollectionImpl<String> parentCollection = new SizedPCollectionImpl("Sized collection", inputSize); - - DoCollectionImpl<String> doCollectionImpl = new DoCollectionImpl<String>("Scaled collection", parentCollection, - new ScaledFunction(scaleFactor), Writables.strings()); - - assertEquals(expectedScaledSize, doCollectionImpl.getSizeInternal()); - } - - static class ScaledFunction extends DoFn<String, String> { - - private float scaleFactor; - - public ScaledFunction(float scaleFactor) { - this.scaleFactor = scaleFactor; - } - - @Override - public void process(String input, Emitter<String> emitter) { - emitter.emit(input); - } - - @Override - public float scaleFactor() { - return scaleFactor; - } - - } - - static class SizedPCollectionImpl extends PCollectionImpl<String> { - - private long internalSize; - - public SizedPCollectionImpl(String name, long internalSize) { - super(name); - this.internalSize = internalSize; - } - - @Override - public PType getPType() { - return null; - } - - @Override - public DoNode createDoNode() { - return null; - } - - @Override - public List getParents() { - return null; - } - - @Override - protected void acceptInternal(Visitor visitor) { - } - - @Override - protected ReadableData<String> getReadableDataInternal() { - return null; - } - - @Override - protected long getSizeInternal() { - return internalSize; - } - - @Override - public long getLastModifiedAt() { - return -1; - } - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java deleted file mode 100644 index 89b9944..0000000 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.collect; - -import static org.apache.crunch.types.writable.Writables.strings; -import static org.apache.crunch.types.writable.Writables.tableOf; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; -import org.apache.crunch.Pair; -import org.junit.Test; - -public class DoTableImplTest { - - @Test - public void testGetSizeInternal_NoScaleFactor() { - runScaleTest(100L, 1.0f, 100L); - } - - @Test - public void testGetSizeInternal_ScaleFactorBelowZero() { - runScaleTest(100L, 0.5f, 50L); - } - - @Test - public void testGetSizeInternal_ScaleFactorAboveZero() { - runScaleTest(100L, 1.5f, 150L); - } - - private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) { - - @SuppressWarnings("unchecked") - PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class); - - when(parentCollection.getSize()).thenReturn(inputSize); - - DoTableImpl<String, String> doTableImpl = new DoTableImpl<String, String>("Scalled table collection", - parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(), strings())); - - assertEquals(expectedScaledSize, doTableImpl.getSizeInternal()); - - verify(parentCollection).getSize(); - - verifyNoMoreInteractions(parentCollection); - } - - static class TableScaledFunction extends DoFn<String, Pair<String, String>> { - - private float scaleFactor; - - public TableScaledFunction(float scaleFactor) { - this.scaleFactor = scaleFactor; - } - - @Override - public float scaleFactor() { - return scaleFactor; - } - - @Override - public void process(String input, Emitter<Pair<String, String>> emitter) { - emitter.emit(Pair.of(input, input)); - - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java index e85419c..4b183ac 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java @@ -27,8 +27,8 @@ import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.InputCollection; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.plan.DotfileWriter.MRTaskType; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index 2428c16..96a9931 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -32,7 +32,7 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; -import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.lib.sort.TotalOrderPartitioner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -389,7 +389,7 @@ public final class HFileUtils { } }, tableOf(writables(KeyValue.class), nulls())); List <KeyValue> splitPoints = getSplitPoints(table); - Path partitionFile = new Path(((MRPipeline) kvs.getPipeline()).createTempPath(), "partition"); + Path partitionFile = new Path(((DistributedPipeline) kvs.getPipeline()).createTempPath(), "partition"); writePartitionInfo(conf, partitionFile, splitPoints); GroupingOptions options = GroupingOptions.builder() .partitionerClass(TotalOrderPartitioner.class)
