Updated Branches: refs/heads/master 7a8af2865 -> d2a979ca6
CRUNCH-274: Add extra configuration arguments for ParallelDoOptions Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9b5e1080 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9b5e1080 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9b5e1080 Branch: refs/heads/master Commit: 9b5e10808a1203fe873f38a9a9224d261e95b530 Parents: 7a8af28 Author: Josh Wills <[email protected]> Authored: Sun Oct 6 12:59:26 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Oct 6 15:44:22 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/ConfigurationIT.java | 70 ++++++++++++++++++++ .../org/apache/crunch/ParallelDoOptions.java | 40 +++++++++-- .../impl/mr/collect/DoCollectionImpl.java | 2 +- .../crunch/impl/mr/collect/DoTableImpl.java | 2 +- .../crunch/impl/mr/collect/PCollectionImpl.java | 8 +-- .../impl/mr/collect/PGroupedTableImpl.java | 2 +- .../org/apache/crunch/impl/mr/plan/DoNode.java | 22 +++--- 7 files changed, 124 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java new file mode 100644 index 0000000..0f65d8f --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java @@ -0,0 +1,70 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.crunch; + +import junit.framework.Assert; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.junit.Rule; +import org.junit.Test; + +public class ConfigurationIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + private static final String KEY = "key"; + + private static DoFn<String, String> CONFIG_FN = new DoFn<String, String>() { + private String value; + + @Override + public void configure(Configuration conf) { + this.value = conf.get(KEY, "none"); + } + + @Override + public void process(String input, Emitter<String> emitter) { + emitter.emit(value); + } + }; + + @Test + public void testRun() throws Exception { + run(new MRPipeline(ConfigurationIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("set1.txt"), "testapalooza"); + } + + private static void run(Pipeline p, String input, String expected) throws Exception { + Iterable<String> mat = p.read(From.textFile(input)) + .parallelDo("conf", CONFIG_FN, Writables.strings(), ParallelDoOptions.builder().conf(KEY, expected).build()) + .materialize(); + for (String v : mat) { + if (!expected.equals(v)) { + Assert.fail("Unexpected value: " + v); + } + } + p.done(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java index b02fc9c..4c5411d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java +++ b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java @@ -19,9 +19,12 @@ package org.apache.crunch; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.Set; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; /** * Container class that includes optional information about a {@code parallelDo} operation @@ -31,24 +34,39 @@ import com.google.common.collect.Sets; */ public class ParallelDoOptions { private final Set<SourceTarget<?>> sourceTargets; - - private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets) { + private final Map<String, String> extraConf; + + private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets, Map<String, String> extraConf) { this.sourceTargets = sourceTargets; + this.extraConf = extraConf; } public Set<SourceTarget<?>> getSourceTargets() { return sourceTargets; } - + + /** + * Applies the key-value pairs that were associated with this instance to the given {@code Configuration} + * object. This is called just before the {@code configure} method on the {@code DoFn} corresponding to this + * instance is called, so it is possible for the {@code DoFn} to see (and possibly override) these settings. + */ + public void configure(Configuration conf) { + for (Map.Entry<String, String> e : extraConf.entrySet()) { + conf.set(e.getKey(), e.getValue()); + } + } + public static Builder builder() { return new Builder(); } public static class Builder { private Set<SourceTarget<?>> sourceTargets; - + private Map<String, String> extraConf; + public Builder() { this.sourceTargets = Sets.newHashSet(); + this.extraConf = Maps.newHashMap(); } public Builder sourceTargets(SourceTarget<?>... sourceTargets) { @@ -61,8 +79,20 @@ public class ParallelDoOptions { return this; } + /** + * Specifies key-value pairs that should be added to the {@code Configuration} object associated with the + * {@code Job} that includes these options. + * @param confKey The key + * @param confValue The value + * @return This {@code Builder} instance + */ + public Builder conf(String confKey, String confValue) { + this.extraConf.put(confKey, confValue); + return this; + } + public ParallelDoOptions build() { - return new ParallelDoOptions(sourceTargets); + return new ParallelDoOptions(sourceTargets, extraConf); } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java index 917ef65..50afb75 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java @@ -66,7 +66,7 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S> { @Override public DoNode createDoNode() { - return DoNode.createFnNode(getName(), fn, ntype); + return DoNode.createFnNode(getName(), fn, ntype, doOptions); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java index 5329c7a..28e2504 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java @@ -75,7 +75,7 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> @Override public DoNode createDoNode() { - return DoNode.createFnNode(getName(), fn, type); + return DoNode.createFnNode(getName(), fn, type, doOptions); } public boolean hasCombineFn() { http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index b5f1cef..43711fc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -55,15 +55,15 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { private final String name; protected MRPipeline pipeline; protected SourceTarget<S> materializedAt; - private final ParallelDoOptions options; + protected final ParallelDoOptions doOptions; public PCollectionImpl(String name) { this(name, ParallelDoOptions.builder().build()); } - public PCollectionImpl(String name, ParallelDoOptions options) { + public PCollectionImpl(String name, ParallelDoOptions doOptions) { this.name = name; - this.options = options; + this.doOptions = doOptions; } @Override @@ -234,7 +234,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } public Set<SourceTarget<?>> getTargetDependencies() { - Set<SourceTarget<?>> targetDeps = options.getSourceTargets(); + Set<SourceTarget<?>> targetDeps = doOptions.getSourceTargets(); for (PCollectionImpl<?> parent : getParents()) { targetDeps = Sets.union(targetDeps, parent.getTargetDependencies()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java index c385e16..e0e24ed 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java @@ -146,7 +146,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V> @Override public DoNode createDoNode() { - return DoNode.createFnNode(getName(), ptype.getInputMapFn(), ptype); + return DoNode.createFnNode(getName(), ptype.getInputMapFn(), ptype, doOptions); } public DoNode getGroupingNode() { http://git-wip-us.apache.org/repos/asf/crunch/blob/9b5e1080/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java index 87d0a5b..7c64ab4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.DoFn; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Source; import org.apache.crunch.impl.mr.run.NodeContext; import org.apache.crunch.impl.mr.run.RTNode; @@ -42,16 +43,18 @@ public class DoNode { private final List<DoNode> children; private final Converter outputConverter; private final Source<?> source; + private final ParallelDoOptions options; private String outputName; private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children, Converter outputConverter, - Source<?> source) { + Source<?> source, ParallelDoOptions options) { this.fn = fn; this.name = name; this.ptype = ptype; this.children = children; this.outputConverter = outputConverter; this.source = source; + this.options = options; } private static List<DoNode> allowsChildren() { @@ -60,26 +63,22 @@ public class DoNode { public static <K, V> DoNode createGroupingNode(String name, PGroupedTableType<K, V> ptype) { DoFn<?, ?> fn = ptype.getOutputMapFn(); - return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null); + return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null, null); } public static DoNode createOutputNode(String name, Converter outputConverter, PType<?> ptype) { DoFn<?, ?> fn = ptype.getOutputMapFn(); - return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null); + return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null, null); } - public static DoNode createFnNode(String name, DoFn<?, ?> function, PType<?> ptype) { - return new DoNode(function, name, ptype, allowsChildren(), null, null); + public static DoNode createFnNode(String name, DoFn<?, ?> function, PType<?> ptype, ParallelDoOptions options) { + return new DoNode(function, name, ptype, allowsChildren(), null, null, options); } public static <S> DoNode createInputNode(Source<S> source) { PType<?> ptype = source.getType(); DoFn<?, ?> fn = ptype.getInputMapFn(); - return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, source); - } - - public boolean isInputNode() { - return source != null; + return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, source, null); } public boolean isOutputNode() { @@ -126,6 +125,9 @@ public class DoNode { public RTNode toRTNode(boolean inputNode, Configuration conf, NodeContext nodeContext) { List<RTNode> childRTNodes = Lists.newArrayList(); + if (options != null) { + options.configure(conf); + } fn.configure(conf); for (DoNode child : children) { childRTNodes.add(child.toRTNode(false, conf, nodeContext));
