[ https://issues.apache.org/jira/browse/BEAM-6150?focusedWorklogId=175569&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-175569 ]
ASF GitHub Bot logged work on BEAM-6150: ---------------------------------------- Author: ASF GitHub Bot Created on: 14/Dec/18 20:13 Start Date: 14/Dec/18 20:13 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #7160: [BEAM-6150] Superinterface for SerializableFunction allowing declared exceptions URL: https://github.com/apache/beam/pull/7160 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java index 7e788cf05866..97a994f3727e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java @@ -104,11 +104,11 @@ public String toString() { } /** - * Wraps a {@link SerializableFunction} as a {@link Contextful} of {@link Fn} with empty {@link + * Wraps a {@link ProcessFunction} as a {@link Contextful} of {@link Fn} with empty {@link * Requirements}. */ public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn( - final SerializableFunction<InputT, OutputT> fn) { + final ProcessFunction<InputT, OutputT> fn) { return new Contextful<>((element, c) -> fn.apply(element), Requirements.empty()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java index 4bffeb6be3d0..aa9d2cd38100 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java @@ -32,7 +32,7 @@ /** * Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code * PCollection<T>} with elements that satisfy the given predicate. The predicate must be a {@code - * SerializableFunction<T, Boolean>}. + * ProcessFunction<T, Boolean>}. * * <p>Example of use: * @@ -46,7 +46,7 @@ * #greaterThanEq}, which return elements satisfying various inequalities with the specified value * based on the elements' natural ordering. */ - public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T> by( + public static <T, PredicateT extends ProcessFunction<T, Boolean>> Filter<T> by( PredicateT predicate) { return new Filter<>(predicate); } @@ -71,7 +71,7 @@ * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> lessThan(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) < 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) < 0) .described(String.format("x < %s", value)); } @@ -95,7 +95,7 @@ * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) > 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) > 0) .described(String.format("x > %s", value)); } @@ -119,7 +119,7 @@ * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) <= 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) <= 0) .described(String.format("x ≤ %s", value)); } @@ -143,7 +143,7 @@ * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) >= 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) >= 0) .described(String.format("x ≥ %s", value)); } @@ -166,20 +166,20 @@ * <p>See also {@link #by}, which returns elements that satisfy the given predicate. */ public static <T extends Comparable<T>> Filter<T> equal(final T value) { - return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) == 0) + return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) == 0) .described(String.format("x == %s", value)); } /////////////////////////////////////////////////////////////////////////////// - private SerializableFunction<T, Boolean> predicate; + private ProcessFunction<T, Boolean> predicate; private String predicateDescription; - private Filter(SerializableFunction<T, Boolean> predicate) { + private Filter(ProcessFunction<T, Boolean> predicate) { this(predicate, "Filter.predicate"); } - private Filter(SerializableFunction<T, Boolean> predicate, String predicateDescription) { + private Filter(ProcessFunction<T, Boolean> predicate, String predicateDescription) { this.predicate = predicate; this.predicateDescription = predicateDescription; } @@ -199,7 +199,8 @@ private Filter(SerializableFunction<T, Boolean> predicate, String predicateDescr ParDo.of( new DoFn<T, T>() { @ProcessElement - public void processElement(@Element T element, OutputReceiver<T> r) { + public void processElement(@Element T element, OutputReceiver<T> r) + throws Exception { if (predicate.apply(element)) { r.output(element); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index edf255affb68..93fc85ac43bf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -52,29 +52,29 @@ private FlatMapElements( } /** - * For a {@code SimpleFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, return a {@link - * PTransform} that applies {@code fn} to every element of the input {@code PCollection<InputT>} - * and outputs all of the elements to the output {@code PCollection<OutputT>}. + * For a {@code InferableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, return a + * {@link PTransform} that applies {@code fn} to every element of the input {@code + * PCollection<InputT>} and outputs all of the elements to the output {@code + * PCollection<OutputT>}. * - * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload {@link - * #via(SerializableFunction)} supports use of lambda for greater concision. + * <p>{@link InferableFunction} has the advantage of providing type descriptor information, but it + * is generally more convenient to specify output type via {@link #into(TypeDescriptor)}, and + * provide the mapping as a lambda expression to {@link #via(ProcessFunction)}. * - * <p>Example of use in Java 7: + * <p>Example usage: * * <pre>{@code * PCollection<String> lines = ...; * PCollection<String> words = lines.apply(FlatMapElements.via( - * new SimpleFunction<String, List<String>>() { - * public Integer apply(String line) { + * new InferableFunction<String, List<String>>() { + * public Integer apply(String line) throws Exception { * return Arrays.asList(line.split(" ")); * } * }); * }</pre> - * - * <p>To use a Java 8 lambda, see {@link #via(SerializableFunction)}. */ public static <InputT, OutputT> FlatMapElements<InputT, OutputT> via( - SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) { + InferableFunction<? super InputT, ? extends Iterable<OutputT>> fn) { Contextful<Fn<InputT, Iterable<OutputT>>> wrapped = (Contextful) Contextful.fn(fn); TypeDescriptor<OutputT> outputType = TypeDescriptors.extractFromTypeParameters( @@ -87,7 +87,7 @@ private FlatMapElements( /** * Returns a new {@link FlatMapElements} transform with the given type descriptor for the output - * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}. + * type, but the mapping function yet to be specified using {@link #via(ProcessFunction)}. */ public static <OutputT> FlatMapElements<?, OutputT> into( final TypeDescriptor<OutputT> outputType) { @@ -95,29 +95,25 @@ private FlatMapElements( } /** - * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, returns a - * {@link PTransform} that applies {@code fn} to every element of the input {@code - * PCollection<InputT>} and outputs all of the elements to the output {@code - * PCollection<OutputT>}. + * For a {@code ProcessFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, returns a {@link + * PTransform} that applies {@code fn} to every element of the input {@code PCollection<InputT>} + * and outputs all of the elements to the output {@code PCollection<OutputT>}. * - * <p>Example of use in Java 8: + * <p>Example usage: * * <pre>{@code * PCollection<String> words = lines.apply( * FlatMapElements.into(TypeDescriptors.strings()) * .via((String line) -> Arrays.asList(line.split(" "))) * }</pre> - * - * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type - * descriptor need not be provided. */ public <NewInputT> FlatMapElements<NewInputT, OutputT> via( - SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) { + ProcessFunction<NewInputT, ? extends Iterable<OutputT>> fn) { return new FlatMapElements<>( (Contextful) Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType); } - /** Like {@link #via(SerializableFunction)}, but allows access to additional context. */ + /** Like {@link #via(ProcessFunction)}, but allows access to additional context. */ @Experimental(Experimental.Kind.CONTEXTFUL) public <NewInputT> FlatMapElements<NewInputT, OutputT> via( Contextful<Fn<NewInputT, Iterable<OutputT>>> fn) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java new file mode 100644 index 000000000000..d9dc86431177 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.lang.reflect.Method; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * A {@link ProcessFunction} which is not a <i>functional interface</i>. Concrete subclasses allow + * us to infer type information, which in turn aids {@link org.apache.beam.sdk.coders.Coder Coder} + * inference. + * + * <p>See {@link SimpleFunction} for providing robust type information where a {@link + * SerializableFunction} is required. + */ +public abstract class InferableFunction<InputT, OutputT> + implements ProcessFunction<InputT, OutputT>, HasDisplayData { + + @Nullable private final ProcessFunction<InputT, OutputT> fn; + + protected InferableFunction() { + this.fn = null; + // A subclass must override apply if using this constructor. Check that via + // reflection. + try { + Method methodThatMustBeOverridden = + InferableFunction.class.getDeclaredMethod("apply", new Class[] {Object.class}); + Method methodOnSubclass = getClass().getMethod("apply", new Class[] {Object.class}); + + if (methodOnSubclass.equals(methodThatMustBeOverridden)) { + throw new IllegalStateException( + "Subclass of InferableFunction must override 'apply' method" + + " or pass a ProcessFunction to the constructor," + + " usually via a lambda or method reference."); + } + + } catch (NoSuchMethodException exc) { + throw new RuntimeException("Impossible state: missing 'apply' method entirely", exc); + } + } + + protected InferableFunction(ProcessFunction<InputT, OutputT> fn) { + this.fn = fn; + } + + @Override + public OutputT apply(InputT input) throws Exception { + return fn.apply(input); + } + + public static <InputT, OutputT> + InferableFunction<InputT, OutputT> fromProcessFunctionWithOutputType( + ProcessFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) { + return new InferableFunctionWithOutputType<>(fn, outputType); + } + + /** + * Returns a {@link TypeDescriptor} capturing what is known statically about the input type of + * this {@link InferableFunction} instance's most-derived class. + * + * <p>See {@link #getOutputTypeDescriptor} for more discussion. + */ + public TypeDescriptor<InputT> getInputTypeDescriptor() { + return new TypeDescriptor<InputT>(this) {}; + } + + /** + * Returns a {@link TypeDescriptor} capturing what is known statically about the output type of + * this {@link InferableFunction} instance's most-derived class. + * + * <p>In the normal case of a concrete {@link InferableFunction} subclass with no generic type + * parameters of its own (including anonymous inner classes), this will be a complete non-generic + * type, which is good for choosing a default output {@code Coder<OutputT>} for the output {@code + * PCollection<OutputT>}. + */ + public TypeDescriptor<OutputT> getOutputTypeDescriptor() { + return new TypeDescriptor<OutputT>(this) {}; + } + + /** + * {@inheritDoc} + * + * <p>By default, does not register any display data. Implementors may override this method to + * provide their own display data. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) {} + + /** + * A {@link InferableFunction} built from a {@link ProcessFunction}, having a known output type + * that is explicitly set. + */ + private static class InferableFunctionWithOutputType<InputT, OutputT> + extends InferableFunction<InputT, OutputT> { + + private final TypeDescriptor<OutputT> outputType; + + public InferableFunctionWithOutputType( + ProcessFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) { + super(fn); + this.outputType = outputType; + } + + @Override + public TypeDescriptor<OutputT> getOutputTypeDescriptor() { + return outputType; + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index 41aac41b9488..dc73cf8d9bd5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -50,64 +50,58 @@ private MapElements( } /** - * For a {@code SimpleFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that + * For {@code InferableFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that * takes an input {@code PCollection<InputT>} and returns a {@code PCollection<OutputT>} * containing {@code fn.apply(v)} for every element {@code v} in the input. * - * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload {@link - * #via(SerializableFunction)} supports use of lambda for greater concision. + * <p>{@link InferableFunction} has the advantage of providing type descriptor information, but it + * is generally more convenient to specify output type via {@link #into(TypeDescriptor)}, and + * provide the mapping as a lambda expression to {@link #via(ProcessFunction)}. * - * <p>Example of use in Java 7: + * <p>Example usage: * * <pre>{@code * PCollection<String> words = ...; * PCollection<Integer> wordsPerLine = words.apply(MapElements.via( - * new SimpleFunction<String, Integer>() { - * public Integer apply(String word) { + * new InferableFunction<String, Integer>() { + * public Integer apply(String word) throws Exception { * return word.length(); * } * })); * }</pre> */ public static <InputT, OutputT> MapElements<InputT, OutputT> via( - final SimpleFunction<InputT, OutputT> fn) { + final InferableFunction<InputT, OutputT> fn) { return new MapElements<>( Contextful.fn(fn), fn, fn.getInputTypeDescriptor(), fn.getOutputTypeDescriptor()); } /** * Returns a new {@link MapElements} transform with the given type descriptor for the output type, - * but the mapping function yet to be specified using {@link #via(SerializableFunction)}. + * but the mapping function yet to be specified using {@link #via(ProcessFunction)}. */ public static <OutputT> MapElements<?, OutputT> into(final TypeDescriptor<OutputT> outputType) { return new MapElements<>(null, null, null, outputType); } /** - * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor, - * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a - * {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the - * input. + * For a {@code ProcessFunction<InputT, OutputT>} {@code fn} and output type descriptor, returns a + * {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a {@code + * PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the input. * - * <p>Example of use in Java 8: + * <p>Example usage: * * <pre>{@code * PCollection<Integer> wordLengths = words.apply( * MapElements.into(TypeDescriptors.integers()) * .via((String word) -> word.length())); * }</pre> - * - * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type - * descriptor need not be provided. */ - public <NewInputT> MapElements<NewInputT, OutputT> via( - SerializableFunction<NewInputT, OutputT> fn) { + public <NewInputT> MapElements<NewInputT, OutputT> via(ProcessFunction<NewInputT, OutputT> fn) { return new MapElements<>(Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType); } - /** - * Like {@link #via(SerializableFunction)}, but supports access to context, such as side inputs. - */ + /** Like {@link #via(ProcessFunction)}, but supports access to context, such as side inputs. */ @Experimental(Kind.CONTEXTFUL) public <NewInputT> MapElements<NewInputT, OutputT> via(Contextful<Fn<NewInputT, OutputT>> fn) { return new MapElements<>( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java new file mode 100644 index 000000000000..b0e8807f860f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; + +/** + * A function that computes an output value of type {@code OutputT} from an input value of type + * {@code InputT} and is {@link Serializable}. + * + * <p>This is the most general function type provided in this SDK, allowing arbitrary {@code + * Exception}s to be thrown, and matching Java's expectations of a <i>functional interface</i> that + * can be supplied as a lambda expression or method reference. It is named {@code ProcessFunction} + * because it is particularly appropriate anywhere a user needs to provide code that will eventually + * be executed as part of a {@link DoFn} {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement + * ProcessElement} function, which is allowed to declare throwing {@code Exception}. If you need to + * execute user code in a context where arbitrary checked exceptions should not be allowed, require + * that users implement the subinterface {@link SerializableFunction} instead. + * + * <p>For more robust {@link org.apache.beam.sdk.coders.Coder Coder} inference, consider extending + * {@link InferableFunction} rather than implementing this interface directly. + * + * @param <InputT> input value type + * @param <OutputT> output value type + */ +@FunctionalInterface +public interface ProcessFunction<InputT, OutputT> extends Serializable { + /** Returns the result of invoking this function on the given input. */ + OutputT apply(InputT input) throws Exception; +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java index b2ac9ede135b..a1dba9e688fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java @@ -21,12 +21,19 @@ /** * A function that computes an output value of type {@code OutputT} from an input value of type - * {@code InputT} and is {@link Serializable}. + * {@code InputT}, is {@link Serializable}, and does not allow checked exceptions to be declared. + * + * <p>To allow checked exceptions, implement the superinterface {@link ProcessFunction} instead. To + * allow more robust {@link org.apache.beam.sdk.coders.Coder Coder} inference, see {@link + * InferableFunction}. * * @param <InputT> input value type * @param <OutputT> output value type */ -public interface SerializableFunction<InputT, OutputT> extends Serializable { +@FunctionalInterface +public interface SerializableFunction<InputT, OutputT> + extends ProcessFunction<InputT, OutputT>, Serializable { /** Returns the result of invoking this function on the given input. */ + @Override OutputT apply(InputT input); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java index e3f3cc86350b..0e192721e191 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java @@ -19,8 +19,6 @@ import java.lang.reflect.Method; import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -28,8 +26,8 @@ * allow us to infer type information, which in turn aids {@link org.apache.beam.sdk.coders.Coder * Coder} inference. */ -public abstract class SimpleFunction<InputT, OutputT> - implements SerializableFunction<InputT, OutputT>, HasDisplayData { +public abstract class SimpleFunction<InputT, OutputT> extends InferableFunction<InputT, OutputT> + implements SerializableFunction<InputT, OutputT> { @Nullable private final SerializableFunction<InputT, OutputT> fn; @@ -69,38 +67,6 @@ public OutputT apply(InputT input) { return new SimpleFunctionWithOutputType<>(fn, outputType); } - /** - * Returns a {@link TypeDescriptor} capturing what is known statically about the input type of - * this {@link SimpleFunction} instance's most-derived class. - * - * <p>See {@link #getOutputTypeDescriptor} for more discussion. - */ - public TypeDescriptor<InputT> getInputTypeDescriptor() { - return new TypeDescriptor<InputT>(this) {}; - } - - /** - * Returns a {@link TypeDescriptor} capturing what is known statically about the output type of - * this {@link SimpleFunction} instance's most-derived class. - * - * <p>In the normal case of a concrete {@link SimpleFunction} subclass with no generic type - * parameters of its own (including anonymous inner classes), this will be a complete non-generic - * type, which is good for choosing a default output {@code Coder<OutputT>} for the output {@code - * PCollection<OutputT>}. - */ - public TypeDescriptor<OutputT> getOutputTypeDescriptor() { - return new TypeDescriptor<OutputT>(this) {}; - } - - /** - * {@inheritDoc} - * - * <p>By default, does not register any display data. Implementors may override this method to - * provide their own display data. - */ - @Override - public void populateDisplayData(DisplayData.Builder builder) {} - /** * A {@link SimpleFunction} built from a {@link SerializableFunction}, having a known output type * that is explicitly set. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java index 5f214f3b7093..ad11cd013d49 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java @@ -27,7 +27,7 @@ * PCollection<Iterable<?>>} to a {@link PCollection PCollection<String>}. * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own - * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)} */ public final class ToString { private ToString() { @@ -96,7 +96,7 @@ private ToString() { * }</pre> * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own - * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)} */ private static final class Elements extends PTransform<PCollection<?>, PCollection<String>> { @Override @@ -125,7 +125,7 @@ public String apply(Object input) { * }</pre> * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own - * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)} */ private static final class KVs extends PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> { @@ -160,7 +160,7 @@ public String apply(KV<?, ?> input) { * }</pre> * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own - * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)} */ private static final class Iterables extends PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java index d125356356a3..e605f7cdbd4d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.transforms.Contextful; -import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.ProcessFunction; /** * A utility class for creating {@link TypeDescriptor} objects for different types, such as Java @@ -339,16 +339,16 @@ * * <pre>{@code * class Foo<BarT> { - * private SerializableFunction<BarT, String> fn; + * private ProcessFunction<BarT, String> fn; * * TypeDescriptor<BarT> inferBarTypeDescriptorFromFn() { * return TypeDescriptors.extractFromTypeParameters( * fn, - * SerializableFunction.class, + * ProcessFunction.class, * // The actual type of "fn" is matched against the input type of the extractor, * // and the obtained values of type variables of the superclass are substituted * // into the output type of the extractor. - * new TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>() {}); + * new TypeVariableExtractor<ProcessFunction<BarT, String>, BarT>() {}); * } * } * }</pre> @@ -374,20 +374,20 @@ public static <T, V> TypeDescriptor<V> extractFromTypeParameters( TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) { // Get the type signature of the extractor, e.g. - // TypeVariableExtractor<SerializableFunction<BarT, String>, BarT> + // TypeVariableExtractor<ProcessFunction<BarT, String>, BarT> TypeDescriptor<TypeVariableExtractor<T, V>> extractorSupertype = (TypeDescriptor<TypeVariableExtractor<T, V>>) TypeDescriptor.of(extractor.getClass()).getSupertype(TypeVariableExtractor.class); - // Get the actual type argument, e.g. SerializableFunction<BarT, String> + // Get the actual type argument, e.g. ProcessFunction<BarT, String> Type inputT = ((ParameterizedType) extractorSupertype.getType()).getActualTypeArguments()[0]; // Get the actual supertype of the type being analyzed, hopefully with all type parameters - // resolved, e.g. SerializableFunction<Integer, String> + // resolved, e.g. ProcessFunction<Integer, String> TypeDescriptor supertypeDescriptor = type.getSupertype(supertype); // Substitute actual supertype into the extractor, e.g. - // TypeVariableExtractor<SerializableFunction<Integer, String>, Integer> + // TypeVariableExtractor<ProcessFunction<Integer, String>, Integer> TypeDescriptor<TypeVariableExtractor<T, V>> extractorT = extractorSupertype.where(inputT, supertypeDescriptor.getType()); @@ -397,30 +397,30 @@ } /** - * Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to - * Java type erasure: may contain unresolved type variables if the type was erased. + * Returns a type descriptor for the input of the given {@link ProcessFunction}, subject to Java + * type erasure: may contain unresolved type variables if the type was erased. */ public static <InputT, OutputT> TypeDescriptor<InputT> inputOf( - SerializableFunction<InputT, OutputT> fn) { + ProcessFunction<InputT, OutputT> fn) { return extractFromTypeParameters( fn, - SerializableFunction.class, - new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, InputT>() {}); + ProcessFunction.class, + new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, InputT>() {}); } /** - * Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to - * Java type erasure: may contain unresolved type variables if the type was erased. + * Returns a type descriptor for the output of the given {@link ProcessFunction}, subject to Java + * type erasure: may contain unresolved type variables if the type was erased. */ public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf( - SerializableFunction<InputT, OutputT> fn) { + ProcessFunction<InputT, OutputT> fn) { return extractFromTypeParameters( fn, - SerializableFunction.class, - new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {}); + ProcessFunction.class, + new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, OutputT>() {}); } - /** Like {@link #inputOf(SerializableFunction)} but for {@link Contextful.Fn}. */ + /** Like {@link #inputOf(ProcessFunction)} but for {@link Contextful.Fn}. */ public static <InputT, OutputT> TypeDescriptor<InputT> inputOf( Contextful.Fn<InputT, OutputT> fn) { return TypeDescriptors.extractFromTypeParameters( @@ -429,7 +429,7 @@ new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, InputT>() {}); } - /** Like {@link #outputOf(SerializableFunction)} but for {@link Contextful.Fn}. */ + /** Like {@link #outputOf(ProcessFunction)} but for {@link Contextful.Fn}. */ public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf( Contextful.Fn<InputT, OutputT> fn) { return TypeDescriptors.extractFromTypeParameters( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java index a05b7ebea5cd..091c28fc240a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java @@ -58,6 +58,13 @@ public Boolean apply(Integer elem) { } } + static class EvenProcessFn implements ProcessFunction<Integer, Boolean> { + @Override + public Boolean apply(Integer elem) throws Exception { + return elem % 2 == 0; + } + } + @Rule public final TestPipeline p = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -93,6 +100,16 @@ public void testFilterByPredicate() { p.run(); } + @Test + @Category(NeedsRunner.class) + public void testFilterByProcessFunction() { + PCollection<Integer> output = + p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenProcessFn())); + + PAssert.that(output).containsInAnyOrder(2, 4, 6); + p.run(); + } + @Test @Category(NeedsRunner.class) public void testFilterLessThan() { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java index 061765aa5b2d..a927bf90d260 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java @@ -58,12 +58,12 @@ /** Basic test of {@link FlatMapElements} with a {@link SimpleFunction}. */ @Test @Category(NeedsRunner.class) - public void testFlatMapBasic() throws Exception { + public void testFlatMapSimpleFunction() throws Exception { PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) - // Note that FlatMapElements takes a SimpleFunction<InputT, ? extends Iterable<OutputT>> + // Note that FlatMapElements takes an InferableFunction<InputT, ? extends Iterable<OutputT>> // so the use of List<Integer> here (as opposed to Iterable<Integer>) deliberately exercises // the use of an upper bound. .apply( @@ -79,6 +79,26 @@ public void testFlatMapBasic() throws Exception { pipeline.run(); } + /** Basic test of {@link FlatMapElements} with an {@link InferableFunction}. */ + @Test + @Category(NeedsRunner.class) + public void testFlatMapInferableFunction() throws Exception { + PCollection<Integer> output = + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + FlatMapElements.via( + new InferableFunction<Integer, List<Integer>>() { + @Override + public List<Integer> apply(Integer input) throws Exception { + return ImmutableList.of(-input, input); + } + })); + + PAssert.that(output).containsInAnyOrder(1, -2, -1, -3, 2, 3); + pipeline.run(); + } + /** Basic test of {@link FlatMapElements} with a {@link Fn} and a side input. */ @Test @Category(NeedsRunner.class) @@ -181,6 +201,20 @@ public void testSimpleFunctionClassDisplayData() { assertThat(DisplayData.from(simpleMap), hasDisplayItem("class", simpleFn.getClass())); } + @Test + public void testInferableFunctionClassDisplayData() { + InferableFunction<Integer, List<Integer>> inferableFn = + new InferableFunction<Integer, List<Integer>>() { + @Override + public List<Integer> apply(Integer input) { + return Collections.emptyList(); + } + }; + + FlatMapElements<?, ?> inferableMap = FlatMapElements.via(inferableFn); + assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass())); + } + @Test public void testSimpleFunctionDisplayData() { SimpleFunction<Integer, List<Integer>> simpleFn = @@ -201,6 +235,26 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat(DisplayData.from(simpleFlatMap), hasDisplayItem("foo", "baz")); } + @Test + public void testInferableFunctionDisplayData() { + InferableFunction<Integer, List<Integer>> inferableFn = + new InferableFunction<Integer, List<Integer>>() { + @Override + public List<Integer> apply(Integer input) { + return Collections.emptyList(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "baz")); + } + }; + + FlatMapElements<?, ?> inferableFlatMap = FlatMapElements.via(inferableFn); + assertThat(DisplayData.from(inferableFlatMap), hasDisplayItem("class", inferableFn.getClass())); + assertThat(DisplayData.from(inferableFlatMap), hasDisplayItem("foo", "baz")); + } + @Test @Category(NeedsRunner.class) public void testVoidValues() throws Exception { @@ -230,7 +284,7 @@ public void testVoidValues() throws Exception { /** * Basic test of {@link FlatMapElements} with a lambda (which is instantiated as a {@link - * SerializableFunction}). + * ProcessFunction}). */ @Test @Category(NeedsRunner.class) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index c10b06e0fa61..952885a68485 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -76,10 +76,33 @@ public T apply(T input) { } } + /** + * An {@link InferableFunction} to test that the coder registry can propagate coders that are + * bound to type variables. + */ + private static class PolymorphicInferableFunction<T> extends InferableFunction<T, T> { + @Override + public T apply(T input) throws Exception { + return input; + } + } + + /** + * An {@link InferableFunction} to test that the coder registry can propagate coders that are + * bound to type variables, when the variable appears nested in the output. + */ + private static class NestedPolymorphicInferableFunction<T> + extends InferableFunction<T, KV<T, String>> { + @Override + public KV<T, String> apply(T input) throws Exception { + return KV.of(input, "hello"); + } + } + /** Basic test of {@link MapElements} with a {@link SimpleFunction}. */ @Test @Category(NeedsRunner.class) - public void testMapBasic() throws Exception { + public void testMapSimpleFunction() throws Exception { PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) @@ -96,6 +119,26 @@ public Integer apply(Integer input) { pipeline.run(); } + /** Basic test of {@link MapElements} with an {@link InferableFunction}. */ + @Test + @Category(NeedsRunner.class) + public void testMapInferableFunction() throws Exception { + PCollection<Integer> output = + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + MapElements.via( + new InferableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) throws Exception { + return -input; + } + })); + + PAssert.that(output).containsInAnyOrder(-2, -1, -3); + pipeline.run(); + } + /** Basic test of {@link MapElements} with a {@link Fn} and a side input. */ @Test @Category(NeedsRunner.class) @@ -139,6 +182,28 @@ public Integer apply(Integer input) { })); } + /** + * Basic test of {@link MapElements} coder propagation with a parametric {@link + * InferableFunction}. + */ + @Test + public void testPolymorphicInferableFunction() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + + pipeline + .apply(Create.of(1, 2, 3)) + .apply("Polymorphic Identity", MapElements.via(new PolymorphicInferableFunction<>())) + .apply( + "Test Consumer", + MapElements.via( + new InferableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) throws Exception { + return input; + } + })); + } + /** * Test of {@link MapElements} coder propagation with a parametric {@link SimpleFunction} where * the type variable occurs nested within other concrete type constructors. @@ -166,12 +231,31 @@ public Integer apply(KV<Integer, String> input) { } /** - * Basic test of {@link MapElements} with a {@link SerializableFunction}. This style is generally - * discouraged in Java 7, in favor of {@link SimpleFunction}. + * Test of {@link MapElements} coder propagation with a parametric {@link InferableFunction} where + * the type variable occurs nested within other concrete type constructors. */ @Test + public void testNestedPolymorphicInferableFunction() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + + pipeline + .apply(Create.of(1, 2, 3)) + .apply("Polymorphic Identity", MapElements.via(new NestedPolymorphicInferableFunction<>())) + .apply( + "Test Consumer", + MapElements.via( + new InferableFunction<KV<Integer, String>, Integer>() { + @Override + public Integer apply(KV<Integer, String> input) throws Exception { + return 42; + } + })); + } + + /** Basic test of {@link MapElements} with a {@link ProcessFunction}. */ + @Test @Category(NeedsRunner.class) - public void testMapBasicSerializableFunction() throws Exception { + public void testMapBasicProcessFunction() throws Exception { PCollection<Integer> output = pipeline.apply(Create.of(1, 2, 3)).apply(MapElements.into(integers()).via(input -> -input)); @@ -208,6 +292,35 @@ public String apply(String input) { pipeline.run(); } + /** + * Tests that when built with a concrete subclass of {@link InferableFunction}, the type + * descriptor of the output reflects its static type. + */ + @Test + @Category(NeedsRunner.class) + public void testInferableFunctionOutputTypeDescriptor() throws Exception { + PCollection<String> output = + pipeline + .apply(Create.of("hello")) + .apply( + MapElements.via( + new InferableFunction<String, String>() { + @Override + public String apply(String input) throws Exception { + return input; + } + })); + assertThat( + output.getTypeDescriptor(), + equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {})); + assertThat( + pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()), + equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {}))); + + // Make sure the pipeline runs too + pipeline.run(); + } + @Test @Category(NeedsRunner.class) public void testVoidValues() throws Exception { @@ -228,6 +341,14 @@ public void testSerializableFunctionDisplayData() { DisplayData.from(serializableMap), hasDisplayItem("class", serializableFn.getClass())); } + @Test + public void testProcessFunctionDisplayData() { + ProcessFunction<Integer, Integer> processFn = input -> input; + + MapElements<?, ?> processMap = MapElements.into(integers()).via(processFn); + assertThat(DisplayData.from(processMap), hasDisplayItem("class", processFn.getClass())); + } + @Test public void testSimpleFunctionClassDisplayData() { SimpleFunction<?, ?> simpleFn = @@ -242,6 +363,20 @@ public Integer apply(Integer input) { assertThat(DisplayData.from(simpleMap), hasDisplayItem("class", simpleFn.getClass())); } + @Test + public void testInferableFunctionClassDisplayData() { + InferableFunction<?, ?> inferableFn = + new InferableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) throws Exception { + return input; + } + }; + + MapElements<?, ?> inferableMap = MapElements.via(inferableFn); + assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass())); + } + @Test public void testSimpleFunctionDisplayData() { SimpleFunction<Integer, ?> simpleFn = @@ -262,6 +397,26 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat(DisplayData.from(simpleMap), hasDisplayItem("foo", "baz")); } + @Test + public void testInferableFunctionDisplayData() { + InferableFunction<Integer, ?> inferableFn = + new InferableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) { + return input; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "baz")); + } + }; + + MapElements<?, ?> inferableMap = MapElements.via(inferableFn); + assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass())); + assertThat(DisplayData.from(inferableMap), hasDisplayItem("foo", "baz")); + } + @Test @Category(ValidatesRunner.class) public void testPrimitiveDisplayData() { diff --git a/website/src/contribute/ptransform-style-guide.md b/website/src/contribute/ptransform-style-guide.md index 4cdcb7b98330..d07529cdb58b 100644 --- a/website/src/contribute/ptransform-style-guide.md +++ b/website/src/contribute/ptransform-style-guide.md @@ -395,8 +395,8 @@ If the transform has an aspect of behavior to be customized by a user's code, ma Do: -* If possible, just use PTransform composition as an extensibility device - i.e. if the same effect can be achieved by the user applying the transform in their pipeline and composing it with another `PTransform`, then the transform itself should not be extensible. E.g., a transform that writes JSON objects to a third-party system should take a `PCollection<JsonObject>` (assuming it is possible to provide a `Coder` for `JsonObject`), rather than taking a generic `PCollection<T>` and a `SerializableFunction<T, JsonObject>` (anti-example that should be fixed: `TextIO`). -* If extensibility by user code is necessary inside the transform, pass the user code as a `SerializableFunction` or define your own serializable function-like type (ideally single-method, for interoperability with Java 8 lambdas). Because Java erases the types of lambdas, you should be sure to have adequate type information even if a raw-type `SerializableFunction` is provided by the user. See `MapElements` and `FlatMapElements` for examples of how to use `SimpleFunction` and `SerializableFunction` in tandem to support Java 7 and Java 8 well. +* If possible, just use PTransform composition as an extensibility device - i.e. if the same effect can be achieved by the user applying the transform in their pipeline and composing it with another `PTransform`, then the transform itself should not be extensible. E.g., a transform that writes JSON objects to a third-party system should take a `PCollection<JsonObject>` (assuming it is possible to provide a `Coder` for `JsonObject`), rather than taking a generic `PCollection<T>` and a `ProcessFunction<T, JsonObject>` (anti-example that should be fixed: `TextIO`). +* If extensibility by user code is necessary inside the transform, pass the user code as a `ProcessFunction` or define your own serializable function-like type (ideally single-method, for interoperability with Java 8 lambdas). Because Java erases the types of lambdas, you should be sure to have adequate type information even if a raw-type `ProcessFunction` is provided by the user. See `MapElements` and `FlatMapElements` for examples of how to use `ProcessFunction` and `InferableFunction` in tandem to provide good support for both lambdas and concrete subclasses with type information. Do not: ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 175569) Time Spent: 1.5h (was: 1h 20m) > Provide alternatives to SerializableFunction and SimpleFunction that may > declare exceptions > ------------------------------------------------------------------------------------------- > > Key: BEAM-6150 > URL: https://issues.apache.org/jira/browse/BEAM-6150 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core > Reporter: Jeff Klukas > Assignee: Jeff Klukas > Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > Contextful.Fn allows subclasses to declare checked exceptions, but neither > SerializableFunction nor SimpleFunction do. We want to add a new entry in > each of those hierarchies where checked exceptions are allowed. We can then > change existing method signatures to accept the new superinterfaces in > contexts where allowing user code to throw checked exceptions is acceptable, > such as in ProcessElement methods. > Discussed on the dev mailing list: > https://lists.apache.org/thread.html/eecd8dea8b47710098ec67d73b87cf9b4e2926c444c3fee1a6b9a743@%3Cdev.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)