Updated Branches: refs/heads/master b7781ca08 -> 92888ff42
CRUNCH-258: Support multiple output channels from a DoFn. Contributed by Brandon Inman. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/92888ff4 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/92888ff4 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/92888ff4 Branch: refs/heads/master Commit: 92888ff4260d1417baba66323e2dd2779f0105e4 Parents: b7781ca Author: Josh Wills <[email protected]> Authored: Mon Sep 2 09:22:32 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Sep 2 09:22:32 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/Channels.java | 102 +++++++++++++++++++ .../org/apache/crunch/lib/ChannelsTest.java | 59 +++++++++++ 2 files changed, 161 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/92888ff4/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java new file mode 100644 index 0000000..568ca20 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.lib.Channels.FirstEmittingDoFn.SecondEmittingDoFn; +import org.apache.crunch.types.PType; + +/** + * Utilities for splitting {@link Pair} instances emitted by {@link DoFn} into + * separate {@link PCollection} instances. A typical motivation for this might + * be to separate standard output from error output of a DoFn. + * + * @author Brandon Inman + * + */ +public class Channels { + + /** + * Splits a {@link PCollection} of any {@link Pair} of objects into a Pair of + * PCollection}, to allow for the output of a DoFn to be handled using + * separate channels. + * + * @param pCollection The {@code PCollection} to split + */ + public static <T, U> Pair<PCollection<T>, PCollection<U>> split(PCollection<Pair<T, U>> pCollection) { + PType<Pair<T, U>> pt = pCollection.getPType(); + return split(pCollection, pt.getSubTypes().get(0), pt.getSubTypes().get(1)); + } + + /** + * Splits a {@link PCollection} of any {@link Pair} of objects into a Pair of + * PCollection}, to allow for the output of a DoFn to be handled using + * separate channels. + * + * @param pCollection The {@code PCollection} to split + * @param firstPType The {@code PType} for the first collection + * @param secondPType The {@code PType} for the second collection + * @return {@link Pair} of {@link PCollection} + */ + public static <T, U> Pair<PCollection<T>, PCollection<U>> split(PCollection<Pair<T, U>> pCollection, + PType<T> firstPType, PType<U> secondPType) { + + PCollection<T> first = pCollection.parallelDo(new FirstEmittingDoFn<T, U>(), firstPType); + PCollection<U> second = pCollection.parallelDo(new SecondEmittingDoFn<T, U>(), secondPType); + return Pair.of(first, second); + } + + /** + * DoFn that emits non-null first values in a {@link Pair}. + * + * @author Brandon Inman + * @param <T> + * @param <U> + */ + static class FirstEmittingDoFn<T, U> extends DoFn<Pair<T, U>, T> { + + @Override + public void process(Pair<T, U> input, Emitter<T> emitter) { + T first = input.first(); + if (first != null) { + emitter.emit(first); + } + } + + /** + * DoFn that emits non-null second values in a {@link Pair}. + * + * @author Brandon Inman + * @param <T> + * @param <U> + */ + static class SecondEmittingDoFn<T, U> extends DoFn<Pair<T, U>, U> { + + @Override + public void process(Pair<T, U> input, Emitter<U> emitter) { + U second = input.second(); + if (second != null) { + emitter.emit(second); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/92888ff4/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java new file mode 100644 index 0000000..f278278 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/ChannelsTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collection; + +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.types.writable.Writables; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +public class ChannelsTest { + + /** + * Test that non-null values in a PCollection<Pair<>> are split properly into + * a Pair<PCollection<>,PCollection<>> + */ + @Test + public void split() { + // Test that any combination of values and nulls are handled properly + final PCollection<Pair<String, String>> pCollection = MemPipeline.typedCollectionOf( + Writables.pairs(Writables.strings(), Writables.strings()), + ImmutableList.of(Pair.of("One", (String) null), Pair.of((String) null, "Two"), Pair.of("Three", "Four"), + Pair.of((String) null, (String) null))); + + final Pair<PCollection<String>, PCollection<String>> splitPCollection = Channels.split(pCollection); + + final Collection<String> firstCollection = splitPCollection.first().asCollection().getValue(); + assertEquals(2, firstCollection.size()); + assertTrue(firstCollection.contains("One")); + assertTrue(firstCollection.contains("Three")); + + final Collection<String> secondCollection = splitPCollection.second().asCollection().getValue(); + assertEquals(2, secondCollection.size()); + assertTrue(secondCollection.contains("Two")); + assertTrue(secondCollection.contains("Four")); + } +}
