[
https://issues.apache.org/jira/browse/BEAM-5922?focusedWorklogId=173049&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173049
]
ASF GitHub Bot logged work on BEAM-5922:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Dec/18 19:48
Start Date: 07/Dec/18 19:48
Worklog Time Spent: 10m
Work Description: jklukas closed pull request #6892: [BEAM-5922] Refactor
Map, FlatMap, and Filter on MapperBase
URL: https://github.com/apache/beam/pull/6892
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 782091de0979..191fbc9942f3 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -81,14 +81,14 @@ public void debugBatchPipeline() {
"sparkContext.parallelize(Arrays.asList(...))\n"
+ "_.mapPartitions("
+ "new
org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
- + "_.mapPartitions(new
org.apache.beam.sdk.transforms.Contextful())\n"
+ + "_.mapPartitions(new
org.apache.beam.sdk.transforms.MapperBase$1())\n"
+ "_.combineByKey(..., new
org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
+ "_.groupByKey()\n"
+ "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
- + "_.mapPartitions(new
org.apache.beam.sdk.transforms.Contextful())\n"
+ + "_.mapPartitions(new
org.apache.beam.sdk.transforms.MapperBase$1())\n"
+ "sparkContext.union(...)\n"
+ "_.mapPartitions("
- + "new org.apache.beam.sdk.transforms.Contextful())\n"
+ + "new org.apache.beam.sdk.transforms.MapperBase$1())\n"
+ "_.<org.apache.beam.sdk.io.TextIO$Write>";
SparkRunnerDebugger.DebugSparkPipelineResult result =
@@ -139,11 +139,11 @@ public void debugStreamingPipeline() {
+ "_.map(new
org.apache.beam.sdk.transforms.windowing.FixedWindows())\n"
+ "_.mapPartitions(new org.apache.beam.runners.spark."
+ "SparkRunnerDebuggerTest$FormatKVFn())\n"
- + "_.mapPartitions(new
org.apache.beam.sdk.transforms.Contextful())\n"
+ + "_.mapPartitions(new
org.apache.beam.sdk.transforms.MapperBase$1())\n"
+ "_.groupByKey()\n"
+ "_.map(new
org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n"
+ "_.mapPartitions(new
org.apache.beam.sdk.transforms.Distinct$3())\n"
- + "_.mapPartitions(new
org.apache.beam.sdk.transforms.Contextful())\n"
+ + "_.mapPartitions(new
org.apache.beam.sdk.transforms.MapperBase$1())\n"
+ "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>";
SparkRunnerDebugger.DebugSparkPipelineResult result =
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 4bffeb6be3d0..45f870615144 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.transforms;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
/**
* {@code PTransform}s for filtering from a {@code PCollection} the elements
satisfying a predicate,
@@ -27,7 +29,7 @@
* @param <T> the type of the values in the input {@code PCollection}, and the
type of the elements
* in the output {@code PCollection}
*/
-public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
+public class Filter<T> extends MapperBase<T, Boolean, T> {
/**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>}
and returns a {@code
@@ -180,10 +182,23 @@ private Filter(SerializableFunction<T, Boolean>
predicate) {
}
private Filter(SerializableFunction<T, Boolean> predicate, String
predicateDescription) {
+ super(
+ null, // name is null for backwards compatibility
+ Contextful.fn(predicate),
+ null, // Filter implements custom DisplayData logic, so no originalFn
needed
+ TypeDescriptors.inputOf(predicate),
+ TypeDescriptors.inputOf(predicate));
this.predicate = predicate;
this.predicateDescription = predicateDescription;
}
+ @Override
+ public void emitOutput(T inputElement, Boolean predicateSatisfied,
OutputReceiver<T> receiver) {
+ if (predicateSatisfied) {
+ receiver.output(inputElement);
+ }
+ }
+
/**
* Returns a new {@link Filter} {@link PTransform} that's like this {@link
PTransform} but with
* the specified description for {@link DisplayData}. Does not modify this
{@link PTransform}.
@@ -192,22 +207,6 @@ private Filter(SerializableFunction<T, Boolean> predicate,
String predicateDescr
return new Filter<>(predicate, description);
}
- @Override
- public PCollection<T> expand(PCollection<T> input) {
- return input
- .apply(
- ParDo.of(
- new DoFn<T, T>() {
- @ProcessElement
- public void processElement(@Element T element,
OutputReceiver<T> r) {
- if (predicate.apply(element)) {
- r.output(element);
- }
- }
- }))
- .setCoder(input.getCoder());
- }
-
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index edf255affb68..6095f7f877ff 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -17,14 +17,10 @@
*/
package org.apache.beam.sdk.transforms;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.Contextful.Fn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
@@ -34,22 +30,7 @@
* {@link PCollection} and merging the results.
*/
public class FlatMapElements<InputT, OutputT>
- extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
- @Nullable private final transient TypeDescriptor<InputT> inputType;
- @Nullable private final transient TypeDescriptor<OutputT> outputType;
- @Nullable private final transient Object originalFnForDisplayData;
- @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
-
- private FlatMapElements(
- @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
- @Nullable Object originalFnForDisplayData,
- @Nullable TypeDescriptor<InputT> inputType,
- TypeDescriptor<OutputT> outputType) {
- this.fn = fn;
- this.originalFnForDisplayData = originalFnForDisplayData;
- this.inputType = inputType;
- this.outputType = outputType;
- }
+ extends MapperBase<InputT, Iterable<OutputT>, OutputT> {
/**
* For a {@code SimpleFunction<InputT, ? extends Iterable<OutputT>>} {@code
fn}, return a {@link
@@ -125,53 +106,21 @@ private FlatMapElements(
fn, fn.getClosure(), TypeDescriptors.inputOf(fn.getClosure()),
outputType);
}
- @Override
- public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
- checkArgument(fn != null, ".via() is required");
- return input.apply(
- "FlatMap",
- ParDo.of(
- new DoFn<InputT, OutputT>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws
Exception {
- Iterable<OutputT> res =
- fn.getClosure().apply(c.element(),
Fn.Context.wrapProcessContext(c));
- for (OutputT output : res) {
- c.output(output);
- }
- }
-
- @Override
- public TypeDescriptor<InputT> getInputTypeDescriptor() {
- return inputType;
- }
+
///////////////////////////////////////////////////////////////////////////////
- @Override
- public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- checkState(
- outputType != null,
- "%s output type descriptor was null; "
- + "this probably means that
getOutputTypeDescriptor() was called after "
- + "serialization/deserialization, but it is only
available prior to "
- + "serialization, for constructing a pipeline and
inferring coders",
- FlatMapElements.class.getSimpleName());
- return outputType;
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder)
{
- builder.delegate(FlatMapElements.this);
- }
- })
- .withSideInputs(fn.getRequirements().getSideInputs()));
+ private FlatMapElements(
+ @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
+ @Nullable Object originalFnForDisplayData,
+ @Nullable TypeDescriptor<InputT> inputType,
+ TypeDescriptor<OutputT> outputType) {
+ super("FlatMap", fn, originalFnForDisplayData, inputType, outputType);
}
@Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("class",
originalFnForDisplayData.getClass()));
- if (originalFnForDisplayData instanceof HasDisplayData) {
- builder.include("fn", (HasDisplayData) originalFnForDisplayData);
+ public void emitOutput(
+ InputT inputElement, Iterable<OutputT> outputElements,
OutputReceiver<OutputT> receiver) {
+ for (OutputT outputElement : outputElements) {
+ receiver.output(outputElement);
}
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 41aac41b9488..994dc38a78f7 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -17,37 +17,18 @@
*/
package org.apache.beam.sdk.transforms;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
+import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.transforms.Contextful.Fn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
/** {@code PTransform}s for mapping a simple function over the elements of a
{@link PCollection}. */
-public class MapElements<InputT, OutputT>
- extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
- @Nullable private final transient TypeDescriptor<InputT> inputType;
- @Nullable private final transient TypeDescriptor<OutputT> outputType;
- @Nullable private final transient Object originalFnForDisplayData;
- @Nullable private final Contextful<Fn<InputT, OutputT>> fn;
-
- private MapElements(
- @Nullable Contextful<Fn<InputT, OutputT>> fn,
- @Nullable Object originalFnForDisplayData,
- @Nullable TypeDescriptor<InputT> inputType,
- TypeDescriptor<OutputT> outputType) {
- this.fn = fn;
- this.originalFnForDisplayData = originalFnForDisplayData;
- this.inputType = inputType;
- this.outputType = outputType;
- }
+public class MapElements<InputT, OutputT> extends MapperBase<InputT, OutputT,
OutputT> {
/**
* For a {@code SimpleFunction<InputT, OutputT>} {@code fn}, returns a
{@code PTransform} that
@@ -114,52 +95,32 @@ private MapElements(
fn, fn.getClosure(), TypeDescriptors.inputOf(fn.getClosure()),
outputType);
}
- @Override
- public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
- checkNotNull(fn, "Must specify a function on MapElements using .via()");
- return input.apply(
- "Map",
- ParDo.of(
- new DoFn<InputT, OutputT>() {
- @ProcessElement
- public void processElement(
- @Element InputT element, OutputReceiver<OutputT>
receiver, ProcessContext c)
- throws Exception {
- receiver.output(
- fn.getClosure().apply(element,
Fn.Context.wrapProcessContext(c)));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder)
{
- builder.delegate(MapElements.this);
- }
-
- @Override
- public TypeDescriptor<InputT> getInputTypeDescriptor() {
- return inputType;
- }
+
///////////////////////////////////////////////////////////////////////////////
- @Override
- public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- checkState(
- outputType != null,
- "%s output type descriptor was null; "
- + "this probably means that
getOutputTypeDescriptor() was called after "
- + "serialization/deserialization, but it is only
available prior to "
- + "serialization, for constructing a pipeline and
inferring coders",
- MapElements.class.getSimpleName());
- return outputType;
- }
- })
- .withSideInputs(fn.getRequirements().getSideInputs()));
+ private MapElements(
+ @Nullable Contextful<Fn<InputT, OutputT>> fn,
+ @Nullable Object originalFnForDisplayData,
+ @Nullable TypeDescriptor<InputT> inputType,
+ TypeDescriptor<OutputT> outputType) {
+ super("Map", fn, originalFnForDisplayData, inputType, outputType);
}
@Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("class",
originalFnForDisplayData.getClass()));
- if (originalFnForDisplayData instanceof HasDisplayData) {
- builder.include("fn", (HasDisplayData) originalFnForDisplayData);
+ public void emitOutput(
+ InputT inputElement, OutputT outputElement, OutputReceiver<OutputT>
receiver) {
+ receiver.output(outputElement);
+ }
+
+ @Nullable
+ private static <InputT, OutputT> Contextful<Fn<InputT, Iterable<OutputT>>>
wrapResultAsIterable(
+ Contextful<Fn<InputT, OutputT>> fn) {
+ if (fn == null) {
+ return null;
+ } else {
+ return Contextful.fn(
+ (InputT element, Fn.Context c) ->
+ Collections.singletonList(fn.getClosure().apply(element, c)),
+ fn.getRequirements());
}
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapperBase.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapperBase.java
new file mode 100644
index 000000000000..d2b3a1311ac8
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapperBase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.Contextful.Fn.Context;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Abstract class providing a base for {@link PTransform}s that map a simple
function over elements.
+ *
+ * <p>Each subclass accepts a user-defined function that takes in {@code
InputT} and returns {@code
+ * IntermediateT}. Each subclass also defines an {@code emitOutput} function
that outputs one or
+ * more {@code OutputT} elements based on the input element and result of
applying the user-defined
+ * function. Each subclass defines some {@code IntermediateT}, accepting a
function that transforms
+ * input elements to {@code IntermediateT} and overriding the {@code
emitOutput} function for
+ * handling what should be sent to the output receiver based on input and a
particular {@code
+ * IntermediateT} value.
+ */
+abstract class MapperBase<InputT, IntermediateT, OutputT>
+ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
+ @Nullable private final transient TypeDescriptor<InputT> inputType;
+ @Nullable final transient TypeDescriptor<OutputT> outputType;
+ @Nullable private final transient Object originalFnForDisplayData;
+ @Nullable private final Contextful<Fn<InputT, IntermediateT>> fn;
+
+ MapperBase(
+ @Nullable String name,
+ @Nullable Contextful<Fn<InputT, IntermediateT>> fn,
+ @Nullable Object originalFnForDisplayData,
+ @Nullable TypeDescriptor<InputT> inputType,
+ TypeDescriptor<OutputT> outputType) {
+ super(name);
+ this.fn = fn;
+ this.originalFnForDisplayData = originalFnForDisplayData;
+ this.inputType = inputType;
+ this.outputType = outputType;
+ }
+
+ public abstract void emitOutput(
+ InputT inputElement, IntermediateT intermediate, OutputReceiver<OutputT>
receiver);
+
+ @Override
+ public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+ checkNotNull(
+ fn,
+ "Must specify a function on " + MapperBase.this.getClass().toString()
+ " using .via()");
+ return input.apply(
+ ParDo.of(
+ new DoFn<InputT, OutputT>() {
+ @ProcessElement
+ public void processElement(
+ @Element InputT element, OutputReceiver<OutputT>
receiver, ProcessContext c)
+ throws Exception {
+ IntermediateT intermediate =
+ fn.getClosure().apply(element,
Context.wrapProcessContext(c));
+ emitOutput(element, intermediate, receiver);
+ }
+
+ @Override
+ public TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return inputType;
+ }
+
+ @Override
+ public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ checkState(
+ outputType != null,
+ "%s output type descriptor was null; "
+ + "this probably means that
getOutputTypeDescriptor() was called after "
+ + "serialization/deserialization, but it is only
available prior to "
+ + "serialization, for constructing a pipeline and
inferring coders",
+ MapperBase.this.getClass().getSimpleName());
+ return outputType;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder)
{
+ builder.delegate(MapperBase.this);
+ }
+ })
+ .withSideInputs(fn.getRequirements().getSideInputs()));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ // Subclasses can opt to set this null and implement their own display
data logic.
+ if (originalFnForDisplayData != null) {
+ builder.add(DisplayData.item("class",
originalFnForDisplayData.getClass()));
+ }
+ if (originalFnForDisplayData instanceof HasDisplayData) {
+ builder.include("fn", (HasDisplayData) originalFnForDisplayData);
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 173049)
Time Spent: 1h 40m (was: 1.5h)
> Common base class for FlatMapElements, MapElements, and Filter
> --------------------------------------------------------------
>
> Key: BEAM-5922
> URL: https://issues.apache.org/jira/browse/BEAM-5922
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Jeff Klukas
> Assignee: Jeff Klukas
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> By having FlatMapElements, MapElements, and Filter all inherit from a common
> MapperBase, we can reduce code duplication between these three classes for
> better consistency in future changes. There should be no change in
> functionality or public interface with this change.
> In particular, this change will simplify the implementation of failure
> handling
> across these transforms in https://issues.apache.org/jira/browse/BEAM-5638
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)