Replace ParDo with simpler transforms where possible There are a number of places in the Java SDK where we use ParDo.of(DoFn) when MapElements or other higher-level composites are applicable and readable. This change alters a number of those.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/236945d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/236945d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/236945d2 Branch: refs/heads/master Commit: 236945d2504b73de91f7292219e0b15a53e062f5 Parents: 89367cf Author: Kenneth Knowles <[email protected]> Authored: Wed Jul 27 14:23:15 2016 -0700 Committer: bchambers <[email protected]> Committed: Wed Aug 17 16:09:01 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/Combine.java | 28 ++++++++++---------- .../org/apache/beam/sdk/transforms/Count.java | 8 +++--- .../beam/sdk/transforms/FlatMapElements.java | 4 +-- .../org/apache/beam/sdk/transforms/Flatten.java | 12 ++++----- .../org/apache/beam/sdk/transforms/Keys.java | 8 +++--- .../org/apache/beam/sdk/transforms/KvSwap.java | 9 +++---- .../apache/beam/sdk/transforms/MapElements.java | 16 ++++++++--- .../beam/sdk/transforms/RemoveDuplicates.java | 8 +++--- .../org/apache/beam/sdk/transforms/Values.java | 8 +++--- .../apache/beam/sdk/transforms/WithKeys.java | 9 +++---- .../beam/sdk/transforms/windowing/Window.java | 11 ++++---- .../java/org/apache/beam/sdk/PipelineTest.java | 12 ++++----- .../java/org/apache/beam/sdk/io/WriteTest.java | 4 ++- .../beam/sdk/transforms/MapElementsTest.java | 8 +++--- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 ++++--- 15 files changed, 81 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 6ba3f8a..56c0bc4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2121,14 +2121,14 @@ public class Combine { inputCoder.getValueCoder())) .setWindowingStrategyInternal(preCombineStrategy) .apply("PreCombineHot", Combine.perKey(hotPreCombine)) - .apply("StripNonce", ParDo.of( - new DoFn<KV<KV<K, Integer>, AccumT>, - KV<K, InputOrAccum<InputT, AccumT>>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of( - c.element().getKey().getKey(), - InputOrAccum.<InputT, AccumT>accum(c.element().getValue()))); + .apply("StripNonce", MapElements.via( + new SimpleFunction<KV<KV<K, Integer>, AccumT>, + KV<K, InputOrAccum<InputT, AccumT>>>() { + @Override + public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<KV<K, Integer>, AccumT> elem) { + return KV.of( + elem.getKey().getKey(), + InputOrAccum.<InputT, AccumT>accum(elem.getValue())); } })) .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder)) @@ -2137,12 +2137,12 @@ public class Combine { PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split .get(cold) .setCoder(inputCoder) - .apply("PrepareCold", ParDo.of( - new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element().getKey(), - InputOrAccum.<InputT, AccumT>input(c.element().getValue()))); + .apply("PrepareCold", MapElements.via( + new SimpleFunction<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() { + @Override + public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<K, InputT> element) { + return KV.of(element.getKey(), + InputOrAccum.<InputT, AccumT>input(element.getValue())); } })) .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java index ac59c76..195c5d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java @@ -107,10 +107,10 @@ public class Count { public PCollection<KV<T, Long>> apply(PCollection<T> input) { return input - .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element(), (Void) null)); + .apply("Init", MapElements.via(new SimpleFunction<T, KV<T, Void>>() { + @Override + public KV<T, Void> apply(T element) { + return KV.of(element, (Void) null); } })) .apply(Count.<T, Void>perKey()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index 6f9e3d8..2837c40 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -29,7 +29,7 @@ import java.lang.reflect.ParameterizedType; * {@link PCollection} and merging the results. */ public class FlatMapElements<InputT, OutputT> -extends PTransform<PCollection<InputT>, PCollection<OutputT>> { +extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { /** * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, * returns a {@link PTransform} that applies {@code fn} to every element of the input @@ -130,7 +130,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { } @Override - public PCollection<OutputT> apply(PCollection<InputT> input) { + public PCollection<OutputT> apply(PCollection<? extends InputT> input) { return input.apply( "FlatMap", ParDo.of( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java index 7e09d7e..f3f4f88 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java @@ -173,13 +173,11 @@ public class Flatten { @SuppressWarnings("unchecked") Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder(); - return in.apply("FlattenIterables", ParDo.of( - new DoFn<Iterable<T>, T>() { - @ProcessElement - public void processElement(ProcessContext c) { - for (T i : c.element()) { - c.output(i); - } + return in.apply("FlattenIterables", FlatMapElements.via( + new SimpleFunction<Iterable<T>, Iterable<T>>() { + @Override + public Iterable<T> apply(Iterable<T> element) { + return element; } })) .setCoder(elemCoder); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java index 5ac1866..2405adf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java @@ -58,10 +58,10 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>, @Override public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) { return - in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().getKey()); + in.apply("Keys", MapElements.via(new SimpleFunction<KV<K, ?>, K>() { + @Override + public K apply(KV<K, ?> kv) { + return kv.getKey(); } })); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java index d4386d2..2b81ebf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java @@ -62,11 +62,10 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>, @Override public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) { return - in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() { - @ProcessElement - public void processElement(ProcessContext c) { - KV<K, V> e = c.element(); - c.output(KV.of(e.getValue(), e.getKey())); + in.apply("KvSwap", MapElements.via(new SimpleFunction<KV<K, V>, KV<V, K>>() { + @Override + public KV<V, K> apply(KV<K, V> kv) { + return KV.of(kv.getValue(), kv.getKey()); } })); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index 17ad6e7..73e4359 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; * {@code PTransform}s for mapping a simple function over the elements of a {@link PCollection}. */ public class MapElements<InputT, OutputT> -extends PTransform<PCollection<InputT>, PCollection<OutputT>> { +extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { /** * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor, @@ -44,8 +44,16 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { * descriptor need not be provided. */ public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT> - via(SerializableFunction<InputT, OutputT> fn) { - return new MissingOutputTypeDescriptor<>(fn); + via(SerializableFunction<? super InputT, OutputT> fn) { + + // TypeDescriptor interacts poorly with the wildcards needed to correctly express + // covariance and contravariance in Java, so instead we cast it to an invariant + // function here. + @SuppressWarnings("unchecked") // safe covariant cast + SerializableFunction<InputT, OutputT> simplerFn = + (SerializableFunction<InputT, OutputT>) fn; + + return new MissingOutputTypeDescriptor<>(simplerFn); } /** @@ -103,7 +111,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { } @Override - public PCollection<OutputT> apply(PCollection<InputT> input) { + public PCollection<OutputT> apply(PCollection<? extends InputT> input) { return input.apply( "Map", ParDo.of( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java index bba4b51..2744b14 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java @@ -85,10 +85,10 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>, @Override public PCollection<T> apply(PCollection<T> in) { return in - .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element(), (Void) null)); + .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() { + @Override + public KV<T, Void> apply(T element) { + return KV.of(element, (Void) null); } })) .apply(Combine.<T, Void>perKey( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java index 34342db..d21d100 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java @@ -58,10 +58,10 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>, @Override public PCollection<V> apply(PCollection<? extends KV<?, V>> in) { return - in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().getValue()); + in.apply("Values", MapElements.via(new SimpleFunction<KV<?, V>, V>() { + @Override + public V apply(KV<?, V> kv) { + return kv.getValue(); } })); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java index 2a44963..8b061f6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java @@ -113,11 +113,10 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>, @Override public PCollection<KV<K, V>> apply(PCollection<V> in) { PCollection<KV<K, V>> result = - in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(fn.apply(c.element()), - c.element())); + in.apply("AddKeys", MapElements.via(new SimpleFunction<V, KV<K, V>>() { + @Override + public KV<K, V> apply(V element) { + return KV.of(fn.apply(element), element); } })); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index c1b0237..9dd069c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -21,10 +21,10 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; @@ -645,10 +645,9 @@ public class Window { // We first apply a (trivial) transform to the input PCollection to produce a new // PCollection. This ensures that we don't modify the windowing strategy of the input // which may be used elsewhere. - .apply("Identity", ParDo.of(new DoFn<T, T>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element()); + .apply("Identity", MapElements.via(new SimpleFunction<T, T>() { + @Override public T apply(T element) { + return element; } })) // Then we modify the windowing strategy. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index 8b86499..d7b3ac5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -36,10 +36,10 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -146,10 +146,10 @@ public class PipelineTest { private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix( final String suffix) { - return ParDo.of(new DoFn<String, String>() { - @ProcessElement - public void processElement(DoFn<String, String>.ProcessContext c) { - c.output(c.element() + suffix); + return MapElements.via(new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return input + suffix; } }); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index f9bf472..b9ba53b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -91,7 +91,9 @@ public class WriteTest { // Static counts of the number of records per shard. private static List<Integer> recordsPerShard = new ArrayList<>(); - private static final MapElements<String, String> IDENTITY_MAP = + @SuppressWarnings("unchecked") // covariant cast + private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP = + (PTransform) MapElements.via(new SimpleFunction<String, String>() { @Override public String apply(String input) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index e86a128..7217bca 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -233,7 +233,7 @@ public class MapElementsTest implements Serializable { } @Test public void testSimpleFunctionDisplayData() { - SimpleFunction<?, ?> simpleFn = new SimpleFunction<Integer, Integer>() { + SimpleFunction<Integer, ?> simpleFn = new SimpleFunction<Integer, Integer>() { @Override public Integer apply(Integer input) { return input; @@ -255,17 +255,17 @@ public class MapElementsTest implements Serializable { @Test @Category(RunnableOnService.class) public void testPrimitiveDisplayData() { - SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() { + SimpleFunction<Integer, ?> mapFn = new SimpleFunction<Integer, Integer>() { @Override public Integer apply(Integer input) { return input; } }; - MapElements<?, ?> map = MapElements.via(mapFn); + MapElements<Integer, ?> map = MapElements.via(mapFn); DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(map); + Set<DisplayData> displayData = evaluator.<Integer>displayDataForPrimitiveTransforms(map); assertThat("MapElements should include the mapFn in its primitive display data", displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass()))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 2383105..8a0c788 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -34,9 +34,11 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.ExposedByteArrayInputStream; import org.apache.beam.sdk.values.KV; @@ -1314,10 +1316,10 @@ public class KafkaIO { public PDone apply(PCollection<V> input) { return input .apply("Kafka values with default key", - ParDo.of(new DoFn<V, KV<Void, V>>() { - @ProcessElement - public void processElement(ProcessContext ctx) throws Exception { - ctx.output(KV.<Void, V>of(null, ctx.element())); + MapElements.via(new SimpleFunction<V, KV<Void, V>>() { + @Override + public KV<Void, V> apply(V element) { + return KV.<Void, V>of(null, element); } })) .setCoder(KvCoder.of(VoidCoder.of(), kvWriteTransform.valueCoder))
