Repository: incubator-beam Updated Branches: refs/heads/master 93d2e374c -> 5bfeb958d
Add a basic implementation of StaticValueProvider and DynamicValueProvider Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66686e63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66686e63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66686e63 Branch: refs/heads/master Commit: 66686e63f1d55fab05ceb70d71b5f43c5b78e077 Parents: 93d2e37 Author: sammcveety <sam.mcve...@gmail.com> Authored: Mon Sep 26 18:27:15 2016 -0400 Committer: Dan Halperin <dhalp...@google.com> Committed: Thu Oct 13 22:38:07 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/options/PipelineOptions.java | 23 ++ .../sdk/options/PipelineOptionsFactory.java | 15 +- .../sdk/options/ProxyInvocationHandler.java | 94 +++++--- .../apache/beam/sdk/options/ValueProvider.java | 228 +++++++++++++++++++ .../sdk/options/ProxyInvocationHandlerTest.java | 12 +- .../beam/sdk/options/ValueProviderTest.java | 213 +++++++++++++++++ .../apache/beam/sdk/util/ApiSurfaceTest.java | 3 + 7 files changed, 552 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index deb1cf4..3d6cad6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -26,6 +26,7 @@ import com.google.common.base.MoreObjects; import java.lang.reflect.Proxy; import java.util.ServiceLoader; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; @@ -327,4 +328,26 @@ public interface PipelineOptions extends HasDisplayData { normalizedAppName, normalizedUserName, datePart, randomPart); } } + + /** + * Provides a unique ID for this {@link PipelineOptions} object, assigned at graph + * construction time. + */ + @Hidden + @Default.InstanceFactory(AtomicLongFactory.class) + Long getOptionsId(); + void setOptionsId(Long id); + + /** + * {@link DefaultValueFactory} which supplies an ID that is guaranteed to be unique + * within the given process. + */ + class AtomicLongFactory implements DefaultValueFactory<Long> { + private static final AtomicLong NEXT_ID = new AtomicLong(0); + + @Override + public Long create(PipelineOptions options) { + return NEXT_ID.getAndIncrement(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 9fc6c2c..cd0c6b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -54,6 +54,7 @@ import java.io.PrintStream; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.lang.reflect.ParameterizedType; import java.lang.reflect.Proxy; import java.lang.reflect.Type; import java.util.ArrayList; @@ -1440,8 +1441,12 @@ public class PipelineOptionsFactory { } } } else if ((returnType.isArray() && (SIMPLE_TYPES.contains(returnType.getComponentType()) - || returnType.getComponentType().isEnum())) - || Collection.class.isAssignableFrom(returnType)) { + || returnType.getComponentType().isEnum())) + || Collection.class.isAssignableFrom(returnType) + || (returnType.equals(ValueProvider.class) + && MAPPER.getTypeFactory().constructType( + ((ParameterizedType) method.getGenericReturnType()) + .getActualTypeArguments()[0]).isCollectionLikeType())) { // Split any strings with "," List<String> values = FluentIterable.from(entry.getValue()) .transformAndConcat(new Function<String, Iterable<String>>() { @@ -1452,7 +1457,8 @@ public class PipelineOptionsFactory { }).toList(); if (returnType.isArray() && !returnType.getComponentType().equals(String.class) - || Collection.class.isAssignableFrom(returnType)) { + || Collection.class.isAssignableFrom(returnType) + || returnType.equals(ValueProvider.class)) { for (String value : values) { checkArgument(!value.isEmpty(), "Empty argument value is only allowed for String, String Array, " @@ -1461,7 +1467,8 @@ public class PipelineOptionsFactory { } } convertedOptions.put(entry.getKey(), MAPPER.convertValue(values, type)); - } else if (SIMPLE_TYPES.contains(returnType) || returnType.isEnum()) { + } else if (SIMPLE_TYPES.contains(returnType) || returnType.isEnum() + || returnType.equals(ValueProvider.class)) { String value = Iterables.getOnlyElement(entry.getValue()); checkArgument(returnType.equals(String.class) || !value.isEmpty(), "Empty argument value is only allowed for String, String Array, " http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index c438a43..47d7cee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -65,6 +65,8 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.options.PipelineOptionsFactory.JsonIgnorePredicate; import org.apache.beam.sdk.options.PipelineOptionsFactory.Registration; +import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.InstanceBuilder; @@ -202,7 +204,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { */ synchronized <T extends PipelineOptions> T as(Class<T> iface) { checkNotNull(iface); - checkArgument(iface.isInterface()); + checkArgument(iface.isInterface(), "Not an interface: %s", iface); if (!interfaceToProxyCache.containsKey(iface)) { Registration<T> registration = PipelineOptionsFactory.validateWellFormed(iface, knownInterfaces); @@ -468,36 +470,34 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { */ @SuppressWarnings({"unchecked", "rawtypes"}) private Object getDefault(PipelineOptions proxy, Method method) { + if (method.getReturnType().equals(RuntimeValueProvider.class)) { + throw new RuntimeException(String.format( + "Method %s should not have return type " + + "RuntimeValueProvider, use ValueProvider instead.", method.getName())); + } + if (method.getReturnType().equals(StaticValueProvider.class)) { + throw new RuntimeException(String.format( + "Method %s should not have return type " + + "StaticValueProvider, use ValueProvider instead.", method.getName())); + } + @Nullable Object defaultObject = null; for (Annotation annotation : method.getAnnotations()) { - if (annotation instanceof Default.Class) { - return ((Default.Class) annotation).value(); - } else if (annotation instanceof Default.String) { - return ((Default.String) annotation).value(); - } else if (annotation instanceof Default.Boolean) { - return ((Default.Boolean) annotation).value(); - } else if (annotation instanceof Default.Character) { - return ((Default.Character) annotation).value(); - } else if (annotation instanceof Default.Byte) { - return ((Default.Byte) annotation).value(); - } else if (annotation instanceof Default.Short) { - return ((Default.Short) annotation).value(); - } else if (annotation instanceof Default.Integer) { - return ((Default.Integer) annotation).value(); - } else if (annotation instanceof Default.Long) { - return ((Default.Long) annotation).value(); - } else if (annotation instanceof Default.Float) { - return ((Default.Float) annotation).value(); - } else if (annotation instanceof Default.Double) { - return ((Default.Double) annotation).value(); - } else if (annotation instanceof Default.Enum) { - return Enum.valueOf((Class<Enum>) method.getReturnType(), - ((Default.Enum) annotation).value()); - } else if (annotation instanceof Default.InstanceFactory) { - return InstanceBuilder.ofType(((Default.InstanceFactory) annotation).value()) - .build() - .create(proxy); + defaultObject = returnDefaultHelper(annotation, proxy, method); + if (defaultObject != null) { + break; } } + if (method.getReturnType().equals(ValueProvider.class)) { + return defaultObject == null + ? new RuntimeValueProvider( + method.getName(), (Class<? extends PipelineOptions>) method.getDeclaringClass(), + proxy.getOptionsId()) + : new RuntimeValueProvider( + method.getName(), (Class<? extends PipelineOptions>) method.getDeclaringClass(), + defaultObject, proxy.getOptionsId()); + } else if (defaultObject != null) { + return defaultObject; + } /* * We need to make sure that we return something appropriate for the return type. Thus we return @@ -507,6 +507,43 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { } /** + * Helper method to return standard Default cases. + */ + @Nullable + private Object returnDefaultHelper( + Annotation annotation, PipelineOptions proxy, Method method) { + if (annotation instanceof Default.Class) { + return ((Default.Class) annotation).value(); + } else if (annotation instanceof Default.String) { + return ((Default.String) annotation).value(); + } else if (annotation instanceof Default.Boolean) { + return ((Default.Boolean) annotation).value(); + } else if (annotation instanceof Default.Character) { + return ((Default.Character) annotation).value(); + } else if (annotation instanceof Default.Byte) { + return ((Default.Byte) annotation).value(); + } else if (annotation instanceof Default.Short) { + return ((Default.Short) annotation).value(); + } else if (annotation instanceof Default.Integer) { + return ((Default.Integer) annotation).value(); + } else if (annotation instanceof Default.Long) { + return ((Default.Long) annotation).value(); + } else if (annotation instanceof Default.Float) { + return ((Default.Float) annotation).value(); + } else if (annotation instanceof Default.Double) { + return ((Default.Double) annotation).value(); + } else if (annotation instanceof Default.Enum) { + return Enum.valueOf((Class<Enum>) method.getReturnType(), + ((Default.Enum) annotation).value()); + } else if (annotation instanceof Default.InstanceFactory) { + return InstanceBuilder.ofType(((Default.InstanceFactory) annotation).value()) + .build() + .create(proxy); + } + return null; + } + + /** * Returns a map from the getters method name to the name of the property based upon the passed in * {@link PropertyDescriptor}s property descriptors. * @@ -657,6 +694,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { PipelineOptions options = new ProxyInvocationHandler(Maps.<String, BoundValue>newHashMap(), fields) .as(PipelineOptions.class); + ValueProvider.RuntimeValueProvider.setRuntimeOptions(options); return options; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java new file mode 100644 index 0000000..e4502fc --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.BeanProperty; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.ContextualDeserializer; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; + +/** + * {@link ValueProvider} is an interface which abstracts the notion of + * fetching a value that may or may not be currently available. This can be + * used to parameterize transforms that only read values in at runtime, for + * example. + */ +@JsonSerialize(using = ValueProvider.Serializer.class) +@JsonDeserialize(using = ValueProvider.Deserializer.class) +public interface ValueProvider<T> { + /** + * Return the value wrapped by this {@link ValueProvider}. + */ + T get(); + + /** + * Whether the contents of this {@link ValueProvider} is available to + * routines that run at graph construction time. + */ + boolean isAccessible(); + + /** + * {@link StaticValueProvider} is an implementation of {@link ValueProvider} that + * allows for a static value to be provided. + */ + public static class StaticValueProvider<T> implements ValueProvider<T>, Serializable { + @Nullable + private final T value; + + StaticValueProvider(@Nullable T value) { + this.value = value; + } + + /** + * Creates a {@link StaticValueProvider} that wraps the provided value. + */ + public static <T> StaticValueProvider<T> of(T value) { + StaticValueProvider<T> factory = new StaticValueProvider<>(value); + return factory; + } + + @Override + public T get() { + return value; + } + + @Override + public boolean isAccessible() { + return true; + } + } + + /** + * {@link RuntimeValueProvider} is an implementation of {@link ValueProvider} that + * allows for a value to be provided at execution time rather than at graph + * construction time. + * + * <p>To enforce this contract, if there is no default, users must only call + * {@link #get()} at execution time (after a call to {@link Pipeline#run}), + * which will provide the value of {@code optionsMap}. + */ + public static class RuntimeValueProvider<T> implements ValueProvider<T>, Serializable { + private static ConcurrentHashMap<Long, PipelineOptions> optionsMap = + new ConcurrentHashMap<>(); + + private final Class<? extends PipelineOptions> klass; + private final String methodName; + @Nullable + private final T defaultValue; + private final Long optionsId; + + /** + * Creates a {@link RuntimeValueProvider} that will query the provided + * {@code optionsId} for a value. + */ + RuntimeValueProvider(String methodName, Class<? extends PipelineOptions> klass, + Long optionsId) { + this.methodName = methodName; + this.klass = klass; + this.defaultValue = null; + this.optionsId = optionsId; + } + + /** + * Creates a {@link RuntimeValueProvider} that will query the provided + * {@code optionsId} for a value, or use the default if no value is available. + */ + RuntimeValueProvider(String methodName, Class<? extends PipelineOptions> klass, + T defaultValue, Long optionsId) { + this.methodName = methodName; + this.klass = klass; + this.defaultValue = defaultValue; + this.optionsId = optionsId; + } + + /** + * Once set, all {@code RuntimeValueProviders} will return {@code true} + * from {@code isAccessible()}. By default, the value is set when + * deserializing {@link PipelineOptions}. + */ + static void setRuntimeOptions(PipelineOptions runtimeOptions) { + optionsMap.put(runtimeOptions.getOptionsId(), runtimeOptions); + } + + @Override + public T get() { + PipelineOptions options = optionsMap.get(optionsId); + if (options == null) { + throw new RuntimeException("Not called from a runtime context."); + } + try { + Method method = klass.getMethod(methodName); + PipelineOptions methodOptions = options.as(klass); + InvocationHandler handler = Proxy.getInvocationHandler(methodOptions); + T value = ((ValueProvider<T>) handler.invoke(methodOptions, method, null)).get(); + return firstNonNull(value, defaultValue); + } catch (Throwable e) { + throw new RuntimeException("Unable to load runtime value.", e); + } + } + + @Override + public boolean isAccessible() { + PipelineOptions options = optionsMap.get(optionsId); + return options != null; + } + } + + /** + * Serializer for {@link ValueProvider}. + */ + static class Serializer extends JsonSerializer<ValueProvider<?>> { + @Override + public void serialize(ValueProvider<?> value, JsonGenerator jgen, + SerializerProvider provider) throws IOException { + if (value.isAccessible()) { + jgen.writeObject(value.get()); + } else { + jgen.writeNull(); + } + } + } + + /** + * Deserializer for {@link ValueProvider}, which handles type marshalling. + */ + static class Deserializer extends JsonDeserializer<ValueProvider<?>> + implements ContextualDeserializer { + + private final JavaType innerType; + + // A 0-arg constructor is required by the compiler. + Deserializer() { + this.innerType = null; + } + + Deserializer(JavaType innerType) { + this.innerType = innerType; + } + + @Override + public JsonDeserializer<?> createContextual(DeserializationContext ctxt, + BeanProperty property) + throws JsonMappingException { + checkNotNull(ctxt, "Null DeserializationContext."); + JavaType type = checkNotNull(ctxt.getContextualType(), "Invalid type: %s", getClass()); + JavaType[] params = type.findTypeParameters(ValueProvider.class); + if (params.length != 1) { + throw new RuntimeException( + "Unable to derive type for ValueProvider: " + type.toString()); + } + JavaType param = params[0]; + return new Deserializer(param); + } + + @Override + public ValueProvider<?> deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + JsonDeserializer dser = ctxt.findRootValueDeserializer( + checkNotNull(innerType, "Invalid %s: innerType is null. Serialization error?", getClass())); + Object o = dser.deserialize(jp, ctxt); + return StaticValueProvider.of(o); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index 5d8ef43..eecfff8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -244,12 +244,14 @@ public class ProxyInvocationHandlerTest { public void testToStringAfterDeserializationContainsJsonEntries() throws Exception { ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, Object>newHashMap()); Simple proxy = handler.as(Simple.class); + Long optionsId = proxy.getOptionsId(); proxy.setString("stringValue"); DefaultAnnotations proxy2 = proxy.as(DefaultAnnotations.class); proxy2.setLong(57L); - assertEquals("Current Settings:\n" + assertEquals(String.format("Current Settings:\n" + " long: 57\n" - + " string: \"stringValue\"\n", + + " optionsId: %d\n" + + " string: \"stringValue\"\n", optionsId), serializeDeserialize(PipelineOptions.class, proxy2).toString()); } @@ -257,14 +259,16 @@ public class ProxyInvocationHandlerTest { public void testToStringAfterDeserializationContainsOverriddenEntries() throws Exception { ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, Object>newHashMap()); Simple proxy = handler.as(Simple.class); + Long optionsId = proxy.getOptionsId(); proxy.setString("stringValue"); DefaultAnnotations proxy2 = proxy.as(DefaultAnnotations.class); proxy2.setLong(57L); Simple deserializedOptions = serializeDeserialize(Simple.class, proxy2); deserializedOptions.setString("overriddenValue"); - assertEquals("Current Settings:\n" + assertEquals(String.format("Current Settings:\n" + " long: 57\n" - + " string: overriddenValue\n", + + " optionsId: %d\n" + + " string: overriddenValue\n", optionsId), deserializedOptions.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java new file mode 100644 index 0000000..0cde615 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ValueProvider}. */ +@RunWith(JUnit4.class) +public class ValueProviderTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + + /** A test interface. */ + public static interface TestOptions extends PipelineOptions { + @Default.String("bar") + ValueProvider<String> getBar(); + void setBar(ValueProvider<String> bar); + + ValueProvider<String> getFoo(); + void setFoo(ValueProvider<String> foo); + + ValueProvider<List<Integer>> getList(); + void setList(ValueProvider<List<Integer>> list); + } + + @Test + public void testCommandLineNoDefault() { + TestOptions options = PipelineOptionsFactory.fromArgs( + new String[]{"--foo=baz"}).as(TestOptions.class); + ValueProvider<String> provider = options.getFoo(); + assertEquals("baz", provider.get()); + assertTrue(provider.isAccessible()); + } + + @Test + public void testListValueProvider() { + TestOptions options = PipelineOptionsFactory.fromArgs( + new String[]{"--list=1,2,3"}).as(TestOptions.class); + ValueProvider<List<Integer>> provider = options.getList(); + assertEquals(ImmutableList.of(1, 2, 3), provider.get()); + assertTrue(provider.isAccessible()); + } + + @Test + public void testCommandLineWithDefault() { + TestOptions options = PipelineOptionsFactory.fromArgs( + new String[]{"--bar=baz"}).as(TestOptions.class); + ValueProvider<String> provider = options.getBar(); + assertEquals("baz", provider.get()); + assertTrue(provider.isAccessible()); + } + + @Test + public void testStaticValueProvider() { + ValueProvider<String> provider = StaticValueProvider.of("foo"); + assertEquals("foo", provider.get()); + assertTrue(provider.isAccessible()); + } + + @Test + public void testNoDefaultRuntimeProvider() { + TestOptions options = PipelineOptionsFactory.as(TestOptions.class); + ValueProvider<String> provider = options.getFoo(); + assertFalse(provider.isAccessible()); + + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("Not called from a runtime context"); + provider.get(); + } + + @Test + public void testDefaultRuntimeProvider() { + TestOptions options = PipelineOptionsFactory.as(TestOptions.class); + ValueProvider<String> provider = options.getBar(); + assertFalse(provider.isAccessible()); + } + + @Test + public void testNoDefaultRuntimeProviderWithOverride() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + TestOptions runtime = mapper.readValue( + "{ \"options\": { \"foo\": \"quux\" }}", PipelineOptions.class) + .as(TestOptions.class); + + TestOptions options = PipelineOptionsFactory.as(TestOptions.class); + runtime.setOptionsId(options.getOptionsId()); + RuntimeValueProvider.setRuntimeOptions(runtime); + + ValueProvider<String> provider = options.getFoo(); + assertTrue(provider.isAccessible()); + assertEquals("quux", provider.get()); + } + + @Test + public void testDefaultRuntimeProviderWithOverride() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + TestOptions runtime = mapper.readValue( + "{ \"options\": { \"bar\": \"quux\" }}", PipelineOptions.class) + .as(TestOptions.class); + + TestOptions options = PipelineOptionsFactory.as(TestOptions.class); + runtime.setOptionsId(options.getOptionsId()); + RuntimeValueProvider.setRuntimeOptions(runtime); + + ValueProvider<String> provider = options.getBar(); + assertTrue(provider.isAccessible()); + assertEquals("quux", provider.get()); + } + + /** A test interface. */ + public static interface BadOptionsRuntime extends PipelineOptions { + RuntimeValueProvider<String> getBar(); + void setBar(RuntimeValueProvider<String> bar); + } + + @Test + public void testOptionReturnTypeRuntime() { + BadOptionsRuntime options = PipelineOptionsFactory.as(BadOptionsRuntime.class); + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "Method getBar should not have return type " + + "RuntimeValueProvider, use ValueProvider instead."); + RuntimeValueProvider<String> provider = options.getBar(); + } + + /** A test interface. */ + public static interface BadOptionsStatic extends PipelineOptions { + StaticValueProvider<String> getBar(); + void setBar(StaticValueProvider<String> bar); + } + + @Test + public void testOptionReturnTypeStatic() { + BadOptionsStatic options = PipelineOptionsFactory.as(BadOptionsStatic.class); + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "Method getBar should not have return type " + + "StaticValueProvider, use ValueProvider instead."); + StaticValueProvider<String> provider = options.getBar(); + } + + @Test + public void testSerializeDeserializeNoArg() throws Exception { + TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class); + assertFalse(submitOptions.getFoo().isAccessible()); + ObjectMapper mapper = new ObjectMapper(); + String serializedOptions = mapper.writeValueAsString(submitOptions); + + // This is the expected behavior of the runner: deserialize and set the + // the runtime options. + String anchor = "\"appName\":\"ValueProviderTest\""; + assertThat(serializedOptions, containsString("\"foo\":null")); + String runnerString = serializedOptions.replaceAll( + "\"foo\":null", "\"foo\":\"quux\""); + TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class) + .as(TestOptions.class); + + ValueProvider<String> vp = runtime.getFoo(); + assertTrue(vp.isAccessible()); + assertEquals("quux", vp.get()); + assertEquals(vp.getClass(), StaticValueProvider.class); + } + + @Test + public void testSerializeDeserializeWithArg() throws Exception { + TestOptions submitOptions = PipelineOptionsFactory.fromArgs( + new String[]{"--foo=baz"}).as(TestOptions.class); + assertEquals("baz", submitOptions.getFoo().get()); + assertTrue(submitOptions.getFoo().isAccessible()); + ObjectMapper mapper = new ObjectMapper(); + String serializedOptions = mapper.writeValueAsString(submitOptions); + + // This is the expected behavior of the runner: deserialize and set the + // the runtime options. + assertThat(serializedOptions, containsString("baz")); + String runnerString = serializedOptions.replaceAll("baz", "quux"); + TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class) + .as(TestOptions.class); + + ValueProvider<String> vp = runtime.getFoo(); + assertTrue(vp.isAccessible()); + assertEquals("quux", vp.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java index ea771b4..92dcbb8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java @@ -89,6 +89,9 @@ public class ApiSurfaceTest { inPackage("com.google.rpc"), inPackage("com.google.type"), inPackage("com.fasterxml.jackson.annotation"), + inPackage("com.fasterxml.jackson.core"), + inPackage("com.fasterxml.jackson.databind"), + inPackage("com.fasterxml.jackson.deser"), inPackage("io.grpc"), inPackage("org.apache.avro"), inPackage("org.apache.commons.logging"), // via BigTable