Repository: beam Updated Branches: refs/heads/master 92615ca4c -> dc3e2f756
PipelineOptionsFactory should prevent non PipelineOptions interfaces from being constructed Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7165e55d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7165e55d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7165e55d Branch: refs/heads/master Commit: 7165e55d624d41f29536286d7975fa78ab819f0d Parents: 92615ca Author: Kai Jiang <[email protected]> Authored: Tue Oct 3 23:41:32 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Oct 11 16:09:43 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/apex/ApexPipelineOptions.java | 2 +- .../dataflow/options/CloudDebuggerOptions.java | 3 +- .../options/DataflowProfilingOptions.java | 3 +- .../sdk/options/PipelineOptionsFactory.java | 45 ++++++++++++++++++++ .../sdk/options/PipelineOptionsFactoryTest.java | 39 +++++++++++++++++ 5 files changed, 89 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7165e55d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java index 92f6e8f..8db7c7a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions; /** * Options that configure the Apex pipeline. */ -public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializable { +public interface ApexPipelineOptions extends PipelineOptions { @Description("set unique application name for Apex runner") void setApplicationName(String name); http://git-wip-us.apache.org/repos/asf/beam/blob/7165e55d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java index d1c8e7a..317a30b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Hidden; +import org.apache.beam.sdk.options.PipelineOptions; /** * Options for controlling Cloud Debugger. @@ -30,7 +31,7 @@ import org.apache.beam.sdk.options.Hidden; @Description("[Experimental] Used to configure the Cloud Debugger") @Experimental @Hidden -public interface CloudDebuggerOptions { +public interface CloudDebuggerOptions extends PipelineOptions { /** Whether to enable the Cloud Debugger snapshot agent for the current job. */ @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.") http://git-wip-us.apache.org/repos/asf/beam/blob/7165e55d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java index a87d688..ef9b6e6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java @@ -21,6 +21,7 @@ import java.util.HashMap; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Hidden; +import org.apache.beam.sdk.options.PipelineOptions; /** * Options for controlling profiling of pipeline execution. @@ -28,7 +29,7 @@ import org.apache.beam.sdk.options.Hidden; @Description("[Experimental] Used to configure profiling of the Dataflow pipeline") @Experimental @Hidden -public interface DataflowProfilingOptions { +public interface DataflowProfilingOptions extends PipelineOptions { @Description("When set to a non-empty value, enables recording profiles and saving them to GCS.\n" + "Profiles will continue until the pipeline is stopped or updated without this option.\n") http://git-wip-us.apache.org/repos/asf/beam/blob/7165e55d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index d7e6cc8..ad6979e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -63,6 +63,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -581,6 +582,8 @@ public class PipelineOptionsFactory { /** * Validates that the interface conforms to the following: * <ul> + * <li>Every inherited interface of {@code iface} must extend PipelineOptions except for + * PipelineOptions itself. * <li>Any property with the same name must have the same return type for all derived * interfaces of {@link PipelineOptions}. * <li>Every bean property of any interface derived from {@link PipelineOptions} must have a @@ -602,6 +605,10 @@ public class PipelineOptionsFactory { Class<T> iface, Set<Class<? extends PipelineOptions>> validatedPipelineOptionsInterfaces) { checkArgument(iface.isInterface(), "Only interface types are supported."); + // Validate that every inherited interface must extend PipelineOptions except for + // PipelineOptions itself. + validateInheritedInterfacesExtendPipelineOptions(iface); + @SuppressWarnings("unchecked") Set<Class<? extends PipelineOptions>> combinedPipelineOptionsInterfaces = FluentIterable.from(validatedPipelineOptionsInterfaces).append(iface).toSet(); @@ -1258,6 +1265,44 @@ public class PipelineOptionsFactory { iface.getName()); } + private static void checkInheritedFrom(Class<?> checkClass, Class fromClass, + Set<Class<?>> nonPipelineOptions) { + if (checkClass.equals(fromClass)) { + return; + } + + if (checkClass.getInterfaces().length == 0) { + nonPipelineOptions.add(checkClass); + return; + } + + for (Class<?> klass : checkClass.getInterfaces()) { + checkInheritedFrom(klass, fromClass, nonPipelineOptions); + } + } + + private static void throwNonPipelineOptions(Class<?> klass, + Set<Class<?>> nonPipelineOptionsClasses) { + StringBuilder errorBuilder = new StringBuilder(String.format( + "All inherited interfaces of [%s] should inherit from the PipelineOptions interface. " + + "The following inherited interfaces do not:", + klass.getName())); + + for (Class<?> invalidKlass : nonPipelineOptionsClasses) { + errorBuilder.append(String.format("%n - %s", invalidKlass.getName())); + } + throw new IllegalArgumentException(errorBuilder.toString()); + } + + private static void validateInheritedInterfacesExtendPipelineOptions(Class<?> klass) { + Set<Class<?>> nonPipelineOptionsClasses = new LinkedHashSet<>(); + checkInheritedFrom(klass, PipelineOptions.class, nonPipelineOptionsClasses); + + if (!nonPipelineOptionsClasses.isEmpty()) { + throwNonPipelineOptions(klass, nonPipelineOptionsClasses); + } + } + private static class MultipleDefinitions { private Method method; private SortedSet<Method> collidingMethods; http://git-wip-us.apache.org/repos/asf/beam/blob/7165e55d/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index d40b5fc..f8de74a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -1629,9 +1629,47 @@ public class PipelineOptionsFactoryTest { containsString("The pipeline runner that will be used to execute the pipeline.")); } + interface PipelineOptionsInheritedInvalid extends Invalid1, + InvalidPipelineOptions2, PipelineOptions { + String getFoo(); + void setFoo(String value); + } + + interface InvalidPipelineOptions1 { + String getBar(); + void setBar(String value); + } + + interface Invalid1 extends InvalidPipelineOptions1 { + String getBar(); + void setBar(String value); + } + + interface InvalidPipelineOptions2 { + String getBar(); + void setBar(String value); + } + + @Test + public void testAllFromPipelineOptions() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "All inherited interfaces of [org.apache.beam.sdk.options.PipelineOptionsFactoryTest" + + "$PipelineOptionsInheritedInvalid] should inherit from the PipelineOptions interface. " + + "The following inherited interfaces do not:\n" + + " - org.apache.beam.sdk.options.PipelineOptionsFactoryTest" + + "$InvalidPipelineOptions1\n" + + " - org.apache.beam.sdk.options.PipelineOptionsFactoryTest" + + "$InvalidPipelineOptions2"); + + PipelineOptionsInheritedInvalid options = PipelineOptionsFactory.as( + PipelineOptionsInheritedInvalid.class); + } + private String emptyStringErrorMessage() { return emptyStringErrorMessage(null); } + private String emptyStringErrorMessage(String type) { String msg = "Empty argument value is only allowed for String, String Array, " + "Collections of Strings or any of these types in a parameterized ValueProvider"; @@ -1736,4 +1774,5 @@ public class PipelineOptionsFactoryTest { jsonGenerator.writeString(jacksonIncompatible.value); } } + }
