[ 
https://issues.apache.org/jira/browse/BEAM-6150?focusedWorklogId=175569&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-175569
 ]

ASF GitHub Bot logged work on BEAM-6150:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Dec/18 20:13
            Start Date: 14/Dec/18 20:13
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #7160: [BEAM-6150] 
Superinterface for SerializableFunction allowing declared exceptions
URL: https://github.com/apache/beam/pull/7160
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
index 7e788cf05866..97a994f3727e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
@@ -104,11 +104,11 @@ public String toString() {
   }
 
   /**
-   * Wraps a {@link SerializableFunction} as a {@link Contextful} of {@link 
Fn} with empty {@link
+   * Wraps a {@link ProcessFunction} as a {@link Contextful} of {@link Fn} 
with empty {@link
    * Requirements}.
    */
   public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
-      final SerializableFunction<InputT, OutputT> fn) {
+      final ProcessFunction<InputT, OutputT> fn) {
     return new Contextful<>((element, c) -> fn.apply(element), 
Requirements.empty());
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 4bffeb6be3d0..aa9d2cd38100 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -32,7 +32,7 @@
   /**
    * Returns a {@code PTransform} that takes an input {@code PCollection<T>} 
and returns a {@code
    * PCollection<T>} with elements that satisfy the given predicate. The 
predicate must be a {@code
-   * SerializableFunction<T, Boolean>}.
+   * ProcessFunction<T, Boolean>}.
    *
    * <p>Example of use:
    *
@@ -46,7 +46,7 @@
    * #greaterThanEq}, which return elements satisfying various inequalities 
with the specified value
    * based on the elements' natural ordering.
    */
-  public static <T, PredicateT extends SerializableFunction<T, Boolean>> 
Filter<T> by(
+  public static <T, PredicateT extends ProcessFunction<T, Boolean>> Filter<T> 
by(
       PredicateT predicate) {
     return new Filter<>(predicate);
   }
@@ -71,7 +71,7 @@
    * <p>See also {@link #by}, which returns elements that satisfy the given 
predicate.
    */
   public static <T extends Comparable<T>> Filter<T> lessThan(final T value) {
-    return by((SerializableFunction<T, Boolean>) input -> 
input.compareTo(value) < 0)
+    return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) < 
0)
         .described(String.format("x < %s", value));
   }
 
@@ -95,7 +95,7 @@
    * <p>See also {@link #by}, which returns elements that satisfy the given 
predicate.
    */
   public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) 
{
-    return by((SerializableFunction<T, Boolean>) input -> 
input.compareTo(value) > 0)
+    return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) > 
0)
         .described(String.format("x > %s", value));
   }
 
@@ -119,7 +119,7 @@
    * <p>See also {@link #by}, which returns elements that satisfy the given 
predicate.
    */
   public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) {
-    return by((SerializableFunction<T, Boolean>) input -> 
input.compareTo(value) <= 0)
+    return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) <= 
0)
         .described(String.format("x ≤ %s", value));
   }
 
@@ -143,7 +143,7 @@
    * <p>See also {@link #by}, which returns elements that satisfy the given 
predicate.
    */
   public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T 
value) {
-    return by((SerializableFunction<T, Boolean>) input -> 
input.compareTo(value) >= 0)
+    return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) >= 
0)
         .described(String.format("x ≥ %s", value));
   }
 
@@ -166,20 +166,20 @@
    * <p>See also {@link #by}, which returns elements that satisfy the given 
predicate.
    */
   public static <T extends Comparable<T>> Filter<T> equal(final T value) {
-    return by((SerializableFunction<T, Boolean>) input -> 
input.compareTo(value) == 0)
+    return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) == 
0)
         .described(String.format("x == %s", value));
   }
 
   
///////////////////////////////////////////////////////////////////////////////
 
-  private SerializableFunction<T, Boolean> predicate;
+  private ProcessFunction<T, Boolean> predicate;
   private String predicateDescription;
 
-  private Filter(SerializableFunction<T, Boolean> predicate) {
+  private Filter(ProcessFunction<T, Boolean> predicate) {
     this(predicate, "Filter.predicate");
   }
 
-  private Filter(SerializableFunction<T, Boolean> predicate, String 
predicateDescription) {
+  private Filter(ProcessFunction<T, Boolean> predicate, String 
predicateDescription) {
     this.predicate = predicate;
     this.predicateDescription = predicateDescription;
   }
@@ -199,7 +199,8 @@ private Filter(SerializableFunction<T, Boolean> predicate, 
String predicateDescr
             ParDo.of(
                 new DoFn<T, T>() {
                   @ProcessElement
-                  public void processElement(@Element T element, 
OutputReceiver<T> r) {
+                  public void processElement(@Element T element, 
OutputReceiver<T> r)
+                      throws Exception {
                     if (predicate.apply(element)) {
                       r.output(element);
                     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index edf255affb68..93fc85ac43bf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -52,29 +52,29 @@ private FlatMapElements(
   }
 
   /**
-   * For a {@code SimpleFunction<InputT, ? extends Iterable<OutputT>>} {@code 
fn}, return a {@link
-   * PTransform} that applies {@code fn} to every element of the input {@code 
PCollection<InputT>}
-   * and outputs all of the elements to the output {@code 
PCollection<OutputT>}.
+   * For a {@code InferableFunction<InputT, ? extends Iterable<OutputT>>} 
{@code fn}, return a
+   * {@link PTransform} that applies {@code fn} to every element of the input 
{@code
+   * PCollection<InputT>} and outputs all of the elements to the output {@code
+   * PCollection<OutputT>}.
    *
-   * <p>This overload is intended primarily for use in Java 7. In Java 8, the 
overload {@link
-   * #via(SerializableFunction)} supports use of lambda for greater concision.
+   * <p>{@link InferableFunction} has the advantage of providing type 
descriptor information, but it
+   * is generally more convenient to specify output type via {@link 
#into(TypeDescriptor)}, and
+   * provide the mapping as a lambda expression to {@link 
#via(ProcessFunction)}.
    *
-   * <p>Example of use in Java 7:
+   * <p>Example usage:
    *
    * <pre>{@code
    * PCollection<String> lines = ...;
    * PCollection<String> words = lines.apply(FlatMapElements.via(
-   *     new SimpleFunction<String, List<String>>() {
-   *       public Integer apply(String line) {
+   *     new InferableFunction<String, List<String>>() {
+   *       public Integer apply(String line) throws Exception {
    *         return Arrays.asList(line.split(" "));
    *       }
    *     });
    * }</pre>
-   *
-   * <p>To use a Java 8 lambda, see {@link #via(SerializableFunction)}.
    */
   public static <InputT, OutputT> FlatMapElements<InputT, OutputT> via(
-      SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
+      InferableFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
     Contextful<Fn<InputT, Iterable<OutputT>>> wrapped = (Contextful) 
Contextful.fn(fn);
     TypeDescriptor<OutputT> outputType =
         TypeDescriptors.extractFromTypeParameters(
@@ -87,7 +87,7 @@ private FlatMapElements(
 
   /**
    * Returns a new {@link FlatMapElements} transform with the given type 
descriptor for the output
-   * type, but the mapping function yet to be specified using {@link 
#via(SerializableFunction)}.
+   * type, but the mapping function yet to be specified using {@link 
#via(ProcessFunction)}.
    */
   public static <OutputT> FlatMapElements<?, OutputT> into(
       final TypeDescriptor<OutputT> outputType) {
@@ -95,29 +95,25 @@ private FlatMapElements(
   }
 
   /**
-   * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} 
{@code fn}, returns a
-   * {@link PTransform} that applies {@code fn} to every element of the input 
{@code
-   * PCollection<InputT>} and outputs all of the elements to the output {@code
-   * PCollection<OutputT>}.
+   * For a {@code ProcessFunction<InputT, ? extends Iterable<OutputT>>} {@code 
fn}, returns a {@link
+   * PTransform} that applies {@code fn} to every element of the input {@code 
PCollection<InputT>}
+   * and outputs all of the elements to the output {@code 
PCollection<OutputT>}.
    *
-   * <p>Example of use in Java 8:
+   * <p>Example usage:
    *
    * <pre>{@code
    * PCollection<String> words = lines.apply(
    *     FlatMapElements.into(TypeDescriptors.strings())
    *                    .via((String line) -> Arrays.asList(line.split(" ")))
    * }</pre>
-   *
-   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise 
as the output type
-   * descriptor need not be provided.
    */
   public <NewInputT> FlatMapElements<NewInputT, OutputT> via(
-      SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
+      ProcessFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
     return new FlatMapElements<>(
         (Contextful) Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), 
outputType);
   }
 
-  /** Like {@link #via(SerializableFunction)}, but allows access to additional 
context. */
+  /** Like {@link #via(ProcessFunction)}, but allows access to additional 
context. */
   @Experimental(Experimental.Kind.CONTEXTFUL)
   public <NewInputT> FlatMapElements<NewInputT, OutputT> via(
       Contextful<Fn<NewInputT, Iterable<OutputT>>> fn) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java
new file mode 100644
index 000000000000..d9dc86431177
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.lang.reflect.Method;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link ProcessFunction} which is not a <i>functional interface</i>. 
Concrete subclasses allow
+ * us to infer type information, which in turn aids {@link 
org.apache.beam.sdk.coders.Coder Coder}
+ * inference.
+ *
+ * <p>See {@link SimpleFunction} for providing robust type information where a 
{@link
+ * SerializableFunction} is required.
+ */
+public abstract class InferableFunction<InputT, OutputT>
+    implements ProcessFunction<InputT, OutputT>, HasDisplayData {
+
+  @Nullable private final ProcessFunction<InputT, OutputT> fn;
+
+  protected InferableFunction() {
+    this.fn = null;
+    // A subclass must override apply if using this constructor. Check that via
+    // reflection.
+    try {
+      Method methodThatMustBeOverridden =
+          InferableFunction.class.getDeclaredMethod("apply", new Class[] 
{Object.class});
+      Method methodOnSubclass = getClass().getMethod("apply", new Class[] 
{Object.class});
+
+      if (methodOnSubclass.equals(methodThatMustBeOverridden)) {
+        throw new IllegalStateException(
+            "Subclass of InferableFunction must override 'apply' method"
+                + " or pass a ProcessFunction to the constructor,"
+                + " usually via a lambda or method reference.");
+      }
+
+    } catch (NoSuchMethodException exc) {
+      throw new RuntimeException("Impossible state: missing 'apply' method 
entirely", exc);
+    }
+  }
+
+  protected InferableFunction(ProcessFunction<InputT, OutputT> fn) {
+    this.fn = fn;
+  }
+
+  @Override
+  public OutputT apply(InputT input) throws Exception {
+    return fn.apply(input);
+  }
+
+  public static <InputT, OutputT>
+      InferableFunction<InputT, OutputT> fromProcessFunctionWithOutputType(
+          ProcessFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> 
outputType) {
+    return new InferableFunctionWithOutputType<>(fn, outputType);
+  }
+
+  /**
+   * Returns a {@link TypeDescriptor} capturing what is known statically about 
the input type of
+   * this {@link InferableFunction} instance's most-derived class.
+   *
+   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
+   */
+  public TypeDescriptor<InputT> getInputTypeDescriptor() {
+    return new TypeDescriptor<InputT>(this) {};
+  }
+
+  /**
+   * Returns a {@link TypeDescriptor} capturing what is known statically about 
the output type of
+   * this {@link InferableFunction} instance's most-derived class.
+   *
+   * <p>In the normal case of a concrete {@link InferableFunction} subclass 
with no generic type
+   * parameters of its own (including anonymous inner classes), this will be a 
complete non-generic
+   * type, which is good for choosing a default output {@code Coder<OutputT>} 
for the output {@code
+   * PCollection<OutputT>}.
+   */
+  public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+    return new TypeDescriptor<OutputT>(this) {};
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>By default, does not register any display data. Implementors may 
override this method to
+   * provide their own display data.
+   */
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {}
+
+  /**
+   * A {@link InferableFunction} built from a {@link ProcessFunction}, having 
a known output type
+   * that is explicitly set.
+   */
+  private static class InferableFunctionWithOutputType<InputT, OutputT>
+      extends InferableFunction<InputT, OutputT> {
+
+    private final TypeDescriptor<OutputT> outputType;
+
+    public InferableFunctionWithOutputType(
+        ProcessFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> 
outputType) {
+      super(fn);
+      this.outputType = outputType;
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+      return outputType;
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 41aac41b9488..dc73cf8d9bd5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -50,64 +50,58 @@ private MapElements(
   }
 
   /**
-   * For a {@code SimpleFunction<InputT, OutputT>} {@code fn}, returns a 
{@code PTransform} that
+   * For {@code InferableFunction<InputT, OutputT>} {@code fn}, returns a 
{@code PTransform} that
    * takes an input {@code PCollection<InputT>} and returns a {@code 
PCollection<OutputT>}
    * containing {@code fn.apply(v)} for every element {@code v} in the input.
    *
-   * <p>This overload is intended primarily for use in Java 7. In Java 8, the 
overload {@link
-   * #via(SerializableFunction)} supports use of lambda for greater concision.
+   * <p>{@link InferableFunction} has the advantage of providing type 
descriptor information, but it
+   * is generally more convenient to specify output type via {@link 
#into(TypeDescriptor)}, and
+   * provide the mapping as a lambda expression to {@link 
#via(ProcessFunction)}.
    *
-   * <p>Example of use in Java 7:
+   * <p>Example usage:
    *
    * <pre>{@code
    * PCollection<String> words = ...;
    * PCollection<Integer> wordsPerLine = words.apply(MapElements.via(
-   *     new SimpleFunction<String, Integer>() {
-   *       public Integer apply(String word) {
+   *     new InferableFunction<String, Integer>() {
+   *       public Integer apply(String word) throws Exception {
    *         return word.length();
    *       }
    *     }));
    * }</pre>
    */
   public static <InputT, OutputT> MapElements<InputT, OutputT> via(
-      final SimpleFunction<InputT, OutputT> fn) {
+      final InferableFunction<InputT, OutputT> fn) {
     return new MapElements<>(
         Contextful.fn(fn), fn, fn.getInputTypeDescriptor(), 
fn.getOutputTypeDescriptor());
   }
 
   /**
    * Returns a new {@link MapElements} transform with the given type 
descriptor for the output type,
-   * but the mapping function yet to be specified using {@link 
#via(SerializableFunction)}.
+   * but the mapping function yet to be specified using {@link 
#via(ProcessFunction)}.
    */
   public static <OutputT> MapElements<?, OutputT> into(final 
TypeDescriptor<OutputT> outputType) {
     return new MapElements<>(null, null, null, outputType);
   }
 
   /**
-   * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output 
type descriptor,
-   * returns a {@code PTransform} that takes an input {@code 
PCollection<InputT>} and returns a
-   * {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every 
element {@code v} in the
-   * input.
+   * For a {@code ProcessFunction<InputT, OutputT>} {@code fn} and output type 
descriptor, returns a
+   * {@code PTransform} that takes an input {@code PCollection<InputT>} and 
returns a {@code
+   * PCollection<OutputT>} containing {@code fn.apply(v)} for every element 
{@code v} in the input.
    *
-   * <p>Example of use in Java 8:
+   * <p>Example usage:
    *
    * <pre>{@code
    * PCollection<Integer> wordLengths = words.apply(
    *     MapElements.into(TypeDescriptors.integers())
    *                .via((String word) -> word.length()));
    * }</pre>
-   *
-   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise 
as the output type
-   * descriptor need not be provided.
    */
-  public <NewInputT> MapElements<NewInputT, OutputT> via(
-      SerializableFunction<NewInputT, OutputT> fn) {
+  public <NewInputT> MapElements<NewInputT, OutputT> 
via(ProcessFunction<NewInputT, OutputT> fn) {
     return new MapElements<>(Contextful.fn(fn), fn, 
TypeDescriptors.inputOf(fn), outputType);
   }
 
-  /**
-   * Like {@link #via(SerializableFunction)}, but supports access to context, 
such as side inputs.
-   */
+  /** Like {@link #via(ProcessFunction)}, but supports access to context, such 
as side inputs. */
   @Experimental(Kind.CONTEXTFUL)
   public <NewInputT> MapElements<NewInputT, OutputT> 
via(Contextful<Fn<NewInputT, OutputT>> fn) {
     return new MapElements<>(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java
new file mode 100644
index 000000000000..b0e8807f860f
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.io.Serializable;
+
+/**
+ * A function that computes an output value of type {@code OutputT} from an 
input value of type
+ * {@code InputT} and is {@link Serializable}.
+ *
+ * <p>This is the most general function type provided in this SDK, allowing 
arbitrary {@code
+ * Exception}s to be thrown, and matching Java's expectations of a 
<i>functional interface</i> that
+ * can be supplied as a lambda expression or method reference. It is named 
{@code ProcessFunction}
+ * because it is particularly appropriate anywhere a user needs to provide 
code that will eventually
+ * be executed as part of a {@link DoFn} {@link 
org.apache.beam.sdk.transforms.DoFn.ProcessElement
+ * ProcessElement} function, which is allowed to declare throwing {@code 
Exception}. If you need to
+ * execute user code in a context where arbitrary checked exceptions should 
not be allowed, require
+ * that users implement the subinterface {@link SerializableFunction} instead.
+ *
+ * <p>For more robust {@link org.apache.beam.sdk.coders.Coder Coder} 
inference, consider extending
+ * {@link InferableFunction} rather than implementing this interface directly.
+ *
+ * @param <InputT> input value type
+ * @param <OutputT> output value type
+ */
+@FunctionalInterface
+public interface ProcessFunction<InputT, OutputT> extends Serializable {
+  /** Returns the result of invoking this function on the given input. */
+  OutputT apply(InputT input) throws Exception;
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
index b2ac9ede135b..a1dba9e688fe 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
@@ -21,12 +21,19 @@
 
 /**
  * A function that computes an output value of type {@code OutputT} from an 
input value of type
- * {@code InputT} and is {@link Serializable}.
+ * {@code InputT}, is {@link Serializable}, and does not allow checked 
exceptions to be declared.
+ *
+ * <p>To allow checked exceptions, implement the superinterface {@link 
ProcessFunction} instead. To
+ * allow more robust {@link org.apache.beam.sdk.coders.Coder Coder} inference, 
see {@link
+ * InferableFunction}.
  *
  * @param <InputT> input value type
  * @param <OutputT> output value type
  */
-public interface SerializableFunction<InputT, OutputT> extends Serializable {
+@FunctionalInterface
+public interface SerializableFunction<InputT, OutputT>
+    extends ProcessFunction<InputT, OutputT>, Serializable {
   /** Returns the result of invoking this function on the given input. */
+  @Override
   OutputT apply(InputT input);
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
index e3f3cc86350b..0e192721e191 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
@@ -19,8 +19,6 @@
 
 import java.lang.reflect.Method;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -28,8 +26,8 @@
  * allow us to infer type information, which in turn aids {@link 
org.apache.beam.sdk.coders.Coder
  * Coder} inference.
  */
-public abstract class SimpleFunction<InputT, OutputT>
-    implements SerializableFunction<InputT, OutputT>, HasDisplayData {
+public abstract class SimpleFunction<InputT, OutputT> extends 
InferableFunction<InputT, OutputT>
+    implements SerializableFunction<InputT, OutputT> {
 
   @Nullable private final SerializableFunction<InputT, OutputT> fn;
 
@@ -69,38 +67,6 @@ public OutputT apply(InputT input) {
     return new SimpleFunctionWithOutputType<>(fn, outputType);
   }
 
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically about 
the input type of
-   * this {@link SimpleFunction} instance's most-derived class.
-   *
-   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
-   */
-  public TypeDescriptor<InputT> getInputTypeDescriptor() {
-    return new TypeDescriptor<InputT>(this) {};
-  }
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically about 
the output type of
-   * this {@link SimpleFunction} instance's most-derived class.
-   *
-   * <p>In the normal case of a concrete {@link SimpleFunction} subclass with 
no generic type
-   * parameters of its own (including anonymous inner classes), this will be a 
complete non-generic
-   * type, which is good for choosing a default output {@code Coder<OutputT>} 
for the output {@code
-   * PCollection<OutputT>}.
-   */
-  public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-    return new TypeDescriptor<OutputT>(this) {};
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may 
override this method to
-   * provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {}
-
   /**
    * A {@link SimpleFunction} built from a {@link SerializableFunction}, 
having a known output type
    * that is explicitly set.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
index 5f214f3b7093..ad11cd013d49 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
@@ -27,7 +27,7 @@
  * PCollection&lt;Iterable&lt;?&gt;&gt;} to a {@link PCollection 
PCollection&lt;String&gt;}.
  *
  * <p><b>Note</b>: For any custom string conversion and formatting, we 
recommend applying your own
- * {@link SerializableFunction} using {@link 
MapElements#via(SerializableFunction)}
+ * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)}
  */
 public final class ToString {
   private ToString() {
@@ -96,7 +96,7 @@ private ToString() {
    * }</pre>
    *
    * <p><b>Note</b>: For any custom string conversion and formatting, we 
recommend applying your own
-   * {@link SerializableFunction} using {@link 
MapElements#via(SerializableFunction)}
+   * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)}
    */
   private static final class Elements extends PTransform<PCollection<?>, 
PCollection<String>> {
     @Override
@@ -125,7 +125,7 @@ public String apply(Object input) {
    * }</pre>
    *
    * <p><b>Note</b>: For any custom string conversion and formatting, we 
recommend applying your own
-   * {@link SerializableFunction} using {@link 
MapElements#via(SerializableFunction)}
+   * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)}
    */
   private static final class KVs
       extends PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> 
{
@@ -160,7 +160,7 @@ public String apply(KV<?, ?> input) {
    * }</pre>
    *
    * <p><b>Note</b>: For any custom string conversion and formatting, we 
recommend applying your own
-   * {@link SerializableFunction} using {@link 
MapElements#via(SerializableFunction)}
+   * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)}
    */
   private static final class Iterables
       extends PTransform<PCollection<? extends Iterable<?>>, 
PCollection<String>> {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index d125356356a3..e605f7cdbd4d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -25,7 +25,7 @@
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.transforms.Contextful;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.ProcessFunction;
 
 /**
  * A utility class for creating {@link TypeDescriptor} objects for different 
types, such as Java
@@ -339,16 +339,16 @@
    *
    * <pre>{@code
    * class Foo<BarT> {
-   *   private SerializableFunction<BarT, String> fn;
+   *   private ProcessFunction<BarT, String> fn;
    *
    *   TypeDescriptor<BarT> inferBarTypeDescriptorFromFn() {
    *     return TypeDescriptors.extractFromTypeParameters(
    *       fn,
-   *       SerializableFunction.class,
+   *       ProcessFunction.class,
    *       // The actual type of "fn" is matched against the input type of the 
extractor,
    *       // and the obtained values of type variables of the superclass are 
substituted
    *       // into the output type of the extractor.
-   *       new TypeVariableExtractor<SerializableFunction<BarT, String>, 
BarT>() {});
+   *       new TypeVariableExtractor<ProcessFunction<BarT, String>, BarT>() 
{});
    *   }
    * }
    * }</pre>
@@ -374,20 +374,20 @@
   public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
       TypeDescriptor<T> type, Class<? super T> supertype, 
TypeVariableExtractor<T, V> extractor) {
     // Get the type signature of the extractor, e.g.
-    // TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>
+    // TypeVariableExtractor<ProcessFunction<BarT, String>, BarT>
     TypeDescriptor<TypeVariableExtractor<T, V>> extractorSupertype =
         (TypeDescriptor<TypeVariableExtractor<T, V>>)
             
TypeDescriptor.of(extractor.getClass()).getSupertype(TypeVariableExtractor.class);
 
-    // Get the actual type argument, e.g. SerializableFunction<BarT, String>
+    // Get the actual type argument, e.g. ProcessFunction<BarT, String>
     Type inputT = ((ParameterizedType) 
extractorSupertype.getType()).getActualTypeArguments()[0];
 
     // Get the actual supertype of the type being analyzed, hopefully with all 
type parameters
-    // resolved, e.g. SerializableFunction<Integer, String>
+    // resolved, e.g. ProcessFunction<Integer, String>
     TypeDescriptor supertypeDescriptor = type.getSupertype(supertype);
 
     // Substitute actual supertype into the extractor, e.g.
-    // TypeVariableExtractor<SerializableFunction<Integer, String>, Integer>
+    // TypeVariableExtractor<ProcessFunction<Integer, String>, Integer>
     TypeDescriptor<TypeVariableExtractor<T, V>> extractorT =
         extractorSupertype.where(inputT, supertypeDescriptor.getType());
 
@@ -397,30 +397,30 @@
   }
 
   /**
-   * Returns a type descriptor for the input of the given {@link 
SerializableFunction}, subject to
-   * Java type erasure: may contain unresolved type variables if the type was 
erased.
+   * Returns a type descriptor for the input of the given {@link 
ProcessFunction}, subject to Java
+   * type erasure: may contain unresolved type variables if the type was 
erased.
    */
   public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
-      SerializableFunction<InputT, OutputT> fn) {
+      ProcessFunction<InputT, OutputT> fn) {
     return extractFromTypeParameters(
         fn,
-        SerializableFunction.class,
-        new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, 
InputT>() {});
+        ProcessFunction.class,
+        new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, InputT>() 
{});
   }
 
   /**
-   * Returns a type descriptor for the output of the given {@link 
SerializableFunction}, subject to
-   * Java type erasure: may contain unresolved type variables if the type was 
erased.
+   * Returns a type descriptor for the output of the given {@link 
ProcessFunction}, subject to Java
+   * type erasure: may contain unresolved type variables if the type was 
erased.
    */
   public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
-      SerializableFunction<InputT, OutputT> fn) {
+      ProcessFunction<InputT, OutputT> fn) {
     return extractFromTypeParameters(
         fn,
-        SerializableFunction.class,
-        new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, 
OutputT>() {});
+        ProcessFunction.class,
+        new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, OutputT>() 
{});
   }
 
-  /** Like {@link #inputOf(SerializableFunction)} but for {@link 
Contextful.Fn}. */
+  /** Like {@link #inputOf(ProcessFunction)} but for {@link Contextful.Fn}. */
   public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
       Contextful.Fn<InputT, OutputT> fn) {
     return TypeDescriptors.extractFromTypeParameters(
@@ -429,7 +429,7 @@
         new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, 
OutputT>, InputT>() {});
   }
 
-  /** Like {@link #outputOf(SerializableFunction)} but for {@link 
Contextful.Fn}. */
+  /** Like {@link #outputOf(ProcessFunction)} but for {@link Contextful.Fn}. */
   public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
       Contextful.Fn<InputT, OutputT> fn) {
     return TypeDescriptors.extractFromTypeParameters(
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index a05b7ebea5cd..091c28fc240a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -58,6 +58,13 @@ public Boolean apply(Integer elem) {
     }
   }
 
+  static class EvenProcessFn implements ProcessFunction<Integer, Boolean> {
+    @Override
+    public Boolean apply(Integer elem) throws Exception {
+      return elem % 2 == 0;
+    }
+  }
+
   @Rule public final TestPipeline p = TestPipeline.create();
 
   @Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -93,6 +100,16 @@ public void testFilterByPredicate() {
     p.run();
   }
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFilterByProcessFunction() {
+    PCollection<Integer> output =
+        p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new 
EvenProcessFn()));
+
+    PAssert.that(output).containsInAnyOrder(2, 4, 6);
+    p.run();
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testFilterLessThan() {
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index 061765aa5b2d..a927bf90d260 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -58,12 +58,12 @@
   /** Basic test of {@link FlatMapElements} with a {@link SimpleFunction}. */
   @Test
   @Category(NeedsRunner.class)
-  public void testFlatMapBasic() throws Exception {
+  public void testFlatMapSimpleFunction() throws Exception {
     PCollection<Integer> output =
         pipeline
             .apply(Create.of(1, 2, 3))
 
-            // Note that FlatMapElements takes a SimpleFunction<InputT, ? 
extends Iterable<OutputT>>
+            // Note that FlatMapElements takes an InferableFunction<InputT, ? 
extends Iterable<OutputT>>
             // so the use of List<Integer> here (as opposed to 
Iterable<Integer>) deliberately exercises
             // the use of an upper bound.
             .apply(
@@ -79,6 +79,26 @@ public void testFlatMapBasic() throws Exception {
     pipeline.run();
   }
 
+  /** Basic test of {@link FlatMapElements} with an {@link InferableFunction}. 
*/
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFlatMapInferableFunction() throws Exception {
+    PCollection<Integer> output =
+        pipeline
+            .apply(Create.of(1, 2, 3))
+            .apply(
+                FlatMapElements.via(
+                    new InferableFunction<Integer, List<Integer>>() {
+                      @Override
+                      public List<Integer> apply(Integer input) throws 
Exception {
+                        return ImmutableList.of(-input, input);
+                      }
+                    }));
+
+    PAssert.that(output).containsInAnyOrder(1, -2, -1, -3, 2, 3);
+    pipeline.run();
+  }
+
   /** Basic test of {@link FlatMapElements} with a {@link Fn} and a side 
input. */
   @Test
   @Category(NeedsRunner.class)
@@ -181,6 +201,20 @@ public void testSimpleFunctionClassDisplayData() {
     assertThat(DisplayData.from(simpleMap), hasDisplayItem("class", 
simpleFn.getClass()));
   }
 
+  @Test
+  public void testInferableFunctionClassDisplayData() {
+    InferableFunction<Integer, List<Integer>> inferableFn =
+        new InferableFunction<Integer, List<Integer>>() {
+          @Override
+          public List<Integer> apply(Integer input) {
+            return Collections.emptyList();
+          }
+        };
+
+    FlatMapElements<?, ?> inferableMap = FlatMapElements.via(inferableFn);
+    assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", 
inferableFn.getClass()));
+  }
+
   @Test
   public void testSimpleFunctionDisplayData() {
     SimpleFunction<Integer, List<Integer>> simpleFn =
@@ -201,6 +235,26 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     assertThat(DisplayData.from(simpleFlatMap), hasDisplayItem("foo", "baz"));
   }
 
+  @Test
+  public void testInferableFunctionDisplayData() {
+    InferableFunction<Integer, List<Integer>> inferableFn =
+        new InferableFunction<Integer, List<Integer>>() {
+          @Override
+          public List<Integer> apply(Integer input) {
+            return Collections.emptyList();
+          }
+
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("foo", "baz"));
+          }
+        };
+
+    FlatMapElements<?, ?> inferableFlatMap = FlatMapElements.via(inferableFn);
+    assertThat(DisplayData.from(inferableFlatMap), hasDisplayItem("class", 
inferableFn.getClass()));
+    assertThat(DisplayData.from(inferableFlatMap), hasDisplayItem("foo", 
"baz"));
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testVoidValues() throws Exception {
@@ -230,7 +284,7 @@ public void testVoidValues() throws Exception {
 
   /**
    * Basic test of {@link FlatMapElements} with a lambda (which is 
instantiated as a {@link
-   * SerializableFunction}).
+   * ProcessFunction}).
    */
   @Test
   @Category(NeedsRunner.class)
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index c10b06e0fa61..952885a68485 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -76,10 +76,33 @@ public T apply(T input) {
     }
   }
 
+  /**
+   * An {@link InferableFunction} to test that the coder registry can 
propagate coders that are
+   * bound to type variables.
+   */
+  private static class PolymorphicInferableFunction<T> extends 
InferableFunction<T, T> {
+    @Override
+    public T apply(T input) throws Exception {
+      return input;
+    }
+  }
+
+  /**
+   * An {@link InferableFunction} to test that the coder registry can 
propagate coders that are
+   * bound to type variables, when the variable appears nested in the output.
+   */
+  private static class NestedPolymorphicInferableFunction<T>
+      extends InferableFunction<T, KV<T, String>> {
+    @Override
+    public KV<T, String> apply(T input) throws Exception {
+      return KV.of(input, "hello");
+    }
+  }
+
   /** Basic test of {@link MapElements} with a {@link SimpleFunction}. */
   @Test
   @Category(NeedsRunner.class)
-  public void testMapBasic() throws Exception {
+  public void testMapSimpleFunction() throws Exception {
     PCollection<Integer> output =
         pipeline
             .apply(Create.of(1, 2, 3))
@@ -96,6 +119,26 @@ public Integer apply(Integer input) {
     pipeline.run();
   }
 
+  /** Basic test of {@link MapElements} with an {@link InferableFunction}. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapInferableFunction() throws Exception {
+    PCollection<Integer> output =
+        pipeline
+            .apply(Create.of(1, 2, 3))
+            .apply(
+                MapElements.via(
+                    new InferableFunction<Integer, Integer>() {
+                      @Override
+                      public Integer apply(Integer input) throws Exception {
+                        return -input;
+                      }
+                    }));
+
+    PAssert.that(output).containsInAnyOrder(-2, -1, -3);
+    pipeline.run();
+  }
+
   /** Basic test of {@link MapElements} with a {@link Fn} and a side input. */
   @Test
   @Category(NeedsRunner.class)
@@ -139,6 +182,28 @@ public Integer apply(Integer input) {
                 }));
   }
 
+  /**
+   * Basic test of {@link MapElements} coder propagation with a parametric 
{@link
+   * InferableFunction}.
+   */
+  @Test
+  public void testPolymorphicInferableFunction() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply("Polymorphic Identity", MapElements.via(new 
PolymorphicInferableFunction<>()))
+        .apply(
+            "Test Consumer",
+            MapElements.via(
+                new InferableFunction<Integer, Integer>() {
+                  @Override
+                  public Integer apply(Integer input) throws Exception {
+                    return input;
+                  }
+                }));
+  }
+
   /**
    * Test of {@link MapElements} coder propagation with a parametric {@link 
SimpleFunction} where
    * the type variable occurs nested within other concrete type constructors.
@@ -166,12 +231,31 @@ public Integer apply(KV<Integer, String> input) {
   }
 
   /**
-   * Basic test of {@link MapElements} with a {@link SerializableFunction}. 
This style is generally
-   * discouraged in Java 7, in favor of {@link SimpleFunction}.
+   * Test of {@link MapElements} coder propagation with a parametric {@link 
InferableFunction} where
+   * the type variable occurs nested within other concrete type constructors.
    */
   @Test
+  public void testNestedPolymorphicInferableFunction() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply("Polymorphic Identity", MapElements.via(new 
NestedPolymorphicInferableFunction<>()))
+        .apply(
+            "Test Consumer",
+            MapElements.via(
+                new InferableFunction<KV<Integer, String>, Integer>() {
+                  @Override
+                  public Integer apply(KV<Integer, String> input) throws 
Exception {
+                    return 42;
+                  }
+                }));
+  }
+
+  /** Basic test of {@link MapElements} with a {@link ProcessFunction}. */
+  @Test
   @Category(NeedsRunner.class)
-  public void testMapBasicSerializableFunction() throws Exception {
+  public void testMapBasicProcessFunction() throws Exception {
     PCollection<Integer> output =
         pipeline.apply(Create.of(1, 2, 
3)).apply(MapElements.into(integers()).via(input -> -input));
 
@@ -208,6 +292,35 @@ public String apply(String input) {
     pipeline.run();
   }
 
+  /**
+   * Tests that when built with a concrete subclass of {@link 
InferableFunction}, the type
+   * descriptor of the output reflects its static type.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testInferableFunctionOutputTypeDescriptor() throws Exception {
+    PCollection<String> output =
+        pipeline
+            .apply(Create.of("hello"))
+            .apply(
+                MapElements.via(
+                    new InferableFunction<String, String>() {
+                      @Override
+                      public String apply(String input) throws Exception {
+                        return input;
+                      }
+                    }));
+    assertThat(
+        output.getTypeDescriptor(),
+        equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {}));
+    assertThat(
+        pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()),
+        equalTo(pipeline.getCoderRegistry().getCoder(new 
TypeDescriptor<String>() {})));
+
+    // Make sure the pipeline runs too
+    pipeline.run();
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testVoidValues() throws Exception {
@@ -228,6 +341,14 @@ public void testSerializableFunctionDisplayData() {
         DisplayData.from(serializableMap), hasDisplayItem("class", 
serializableFn.getClass()));
   }
 
+  @Test
+  public void testProcessFunctionDisplayData() {
+    ProcessFunction<Integer, Integer> processFn = input -> input;
+
+    MapElements<?, ?> processMap = MapElements.into(integers()).via(processFn);
+    assertThat(DisplayData.from(processMap), hasDisplayItem("class", 
processFn.getClass()));
+  }
+
   @Test
   public void testSimpleFunctionClassDisplayData() {
     SimpleFunction<?, ?> simpleFn =
@@ -242,6 +363,20 @@ public Integer apply(Integer input) {
     assertThat(DisplayData.from(simpleMap), hasDisplayItem("class", 
simpleFn.getClass()));
   }
 
+  @Test
+  public void testInferableFunctionClassDisplayData() {
+    InferableFunction<?, ?> inferableFn =
+        new InferableFunction<Integer, Integer>() {
+          @Override
+          public Integer apply(Integer input) throws Exception {
+            return input;
+          }
+        };
+
+    MapElements<?, ?> inferableMap = MapElements.via(inferableFn);
+    assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", 
inferableFn.getClass()));
+  }
+
   @Test
   public void testSimpleFunctionDisplayData() {
     SimpleFunction<Integer, ?> simpleFn =
@@ -262,6 +397,26 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     assertThat(DisplayData.from(simpleMap), hasDisplayItem("foo", "baz"));
   }
 
+  @Test
+  public void testInferableFunctionDisplayData() {
+    InferableFunction<Integer, ?> inferableFn =
+        new InferableFunction<Integer, Integer>() {
+          @Override
+          public Integer apply(Integer input) {
+            return input;
+          }
+
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("foo", "baz"));
+          }
+        };
+
+    MapElements<?, ?> inferableMap = MapElements.via(inferableFn);
+    assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", 
inferableFn.getClass()));
+    assertThat(DisplayData.from(inferableMap), hasDisplayItem("foo", "baz"));
+  }
+
   @Test
   @Category(ValidatesRunner.class)
   public void testPrimitiveDisplayData() {
diff --git a/website/src/contribute/ptransform-style-guide.md 
b/website/src/contribute/ptransform-style-guide.md
index 4cdcb7b98330..d07529cdb58b 100644
--- a/website/src/contribute/ptransform-style-guide.md
+++ b/website/src/contribute/ptransform-style-guide.md
@@ -395,8 +395,8 @@ If the transform has an aspect of behavior to be customized 
by a user's code, ma
 
 Do:
 
-* If possible, just use PTransform composition as an extensibility device - 
i.e. if the same effect can be achieved by the user applying the transform in 
their pipeline and composing it with another `PTransform`, then the transform 
itself should not be extensible. E.g., a transform that writes JSON objects to 
a third-party system should take a `PCollection<JsonObject>` (assuming it is 
possible to provide a `Coder` for `JsonObject`), rather than taking a generic 
`PCollection<T>` and a `SerializableFunction<T, JsonObject>` (anti-example that 
should be fixed: `TextIO`).
-* If extensibility by user code is necessary inside the transform, pass the 
user code as a `SerializableFunction` or define your own serializable 
function-like type (ideally single-method, for interoperability with Java 8 
lambdas). Because Java erases the types of lambdas, you should be sure to have 
adequate type information even if a raw-type `SerializableFunction` is provided 
by the user. See `MapElements` and `FlatMapElements` for examples of how to use 
`SimpleFunction` and `SerializableFunction` in tandem to support Java 7 and 
Java 8 well.
+* If possible, just use PTransform composition as an extensibility device - 
i.e. if the same effect can be achieved by the user applying the transform in 
their pipeline and composing it with another `PTransform`, then the transform 
itself should not be extensible. E.g., a transform that writes JSON objects to 
a third-party system should take a `PCollection<JsonObject>` (assuming it is 
possible to provide a `Coder` for `JsonObject`), rather than taking a generic 
`PCollection<T>` and a `ProcessFunction<T, JsonObject>` (anti-example that 
should be fixed: `TextIO`).
+* If extensibility by user code is necessary inside the transform, pass the 
user code as a `ProcessFunction` or define your own serializable function-like 
type (ideally single-method, for interoperability with Java 8 lambdas). Because 
Java erases the types of lambdas, you should be sure to have adequate type 
information even if a raw-type `ProcessFunction` is provided by the user. See 
`MapElements` and `FlatMapElements` for examples of how to use 
`ProcessFunction` and `InferableFunction` in tandem to provide good support for 
both lambdas and concrete subclasses with type information.
 
 Do not:
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 175569)
    Time Spent: 1.5h  (was: 1h 20m)

> Provide alternatives to SerializableFunction and SimpleFunction that may 
> declare exceptions
> -------------------------------------------------------------------------------------------
>
>                 Key: BEAM-6150
>                 URL: https://issues.apache.org/jira/browse/BEAM-6150
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Jeff Klukas
>            Assignee: Jeff Klukas
>            Priority: Minor
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Contextful.Fn allows subclasses to declare checked exceptions, but neither 
> SerializableFunction nor SimpleFunction do. We want to add a new entry in 
> each of those hierarchies where checked exceptions are allowed. We can then 
> change existing method signatures to accept the new superinterfaces in 
> contexts where allowing user code to throw checked exceptions is acceptable, 
> such as in ProcessElement methods.
> Discussed on the dev mailing list:
> https://lists.apache.org/thread.html/eecd8dea8b47710098ec67d73b87cf9b4e2926c444c3fee1a6b9a743@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to