Repository: incubator-beam
Updated Branches:
  refs/heads/master 93d2e374c -> 5bfeb958d


Add a basic implementation of StaticValueProvider and DynamicValueProvider


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66686e63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66686e63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66686e63

Branch: refs/heads/master
Commit: 66686e63f1d55fab05ceb70d71b5f43c5b78e077
Parents: 93d2e37
Author: sammcveety <sam.mcve...@gmail.com>
Authored: Mon Sep 26 18:27:15 2016 -0400
Committer: Dan Halperin <dhalp...@google.com>
Committed: Thu Oct 13 22:38:07 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/options/PipelineOptions.java       |  23 ++
 .../sdk/options/PipelineOptionsFactory.java     |  15 +-
 .../sdk/options/ProxyInvocationHandler.java     |  94 +++++---
 .../apache/beam/sdk/options/ValueProvider.java  | 228 +++++++++++++++++++
 .../sdk/options/ProxyInvocationHandlerTest.java |  12 +-
 .../beam/sdk/options/ValueProviderTest.java     | 213 +++++++++++++++++
 .../apache/beam/sdk/util/ApiSurfaceTest.java    |   3 +
 7 files changed, 552 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index deb1cf4..3d6cad6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -26,6 +26,7 @@ import com.google.common.base.MoreObjects;
 import java.lang.reflect.Proxy;
 import java.util.ServiceLoader;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
@@ -327,4 +328,26 @@ public interface PipelineOptions extends HasDisplayData {
           normalizedAppName, normalizedUserName, datePart, randomPart);
     }
   }
+
+  /**
+   * Provides a unique ID for this {@link PipelineOptions} object, assigned at 
graph
+   * construction time.
+   */
+  @Hidden
+  @Default.InstanceFactory(AtomicLongFactory.class)
+  Long getOptionsId();
+  void setOptionsId(Long id);
+
+  /**
+   * {@link DefaultValueFactory} which supplies an ID that is guaranteed to be 
unique
+   * within the given process.
+   */
+  class AtomicLongFactory implements DefaultValueFactory<Long> {
+    private static final AtomicLong NEXT_ID = new AtomicLong(0);
+
+    @Override
+    public Long create(PipelineOptions options) {
+      return NEXT_ID.getAndIncrement();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 9fc6c2c..cd0c6b2 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -54,6 +54,7 @@ import java.io.PrintStream;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
@@ -1440,8 +1441,12 @@ public class PipelineOptionsFactory {
             }
           }
         } else if ((returnType.isArray() && 
(SIMPLE_TYPES.contains(returnType.getComponentType())
-                || returnType.getComponentType().isEnum()))
-            || Collection.class.isAssignableFrom(returnType)) {
+                   || returnType.getComponentType().isEnum()))
+                   || Collection.class.isAssignableFrom(returnType)
+                   || (returnType.equals(ValueProvider.class)
+                       && MAPPER.getTypeFactory().constructType(
+                         ((ParameterizedType) method.getGenericReturnType())
+                         
.getActualTypeArguments()[0]).isCollectionLikeType())) {
           // Split any strings with ","
           List<String> values = FluentIterable.from(entry.getValue())
               .transformAndConcat(new Function<String, Iterable<String>>() {
@@ -1452,7 +1457,8 @@ public class PipelineOptionsFactory {
           }).toList();
 
           if (returnType.isArray() && 
!returnType.getComponentType().equals(String.class)
-              || Collection.class.isAssignableFrom(returnType)) {
+              || Collection.class.isAssignableFrom(returnType)
+              || returnType.equals(ValueProvider.class)) {
             for (String value : values) {
               checkArgument(!value.isEmpty(),
                   "Empty argument value is only allowed for String, String 
Array, "
@@ -1461,7 +1467,8 @@ public class PipelineOptionsFactory {
             }
           }
           convertedOptions.put(entry.getKey(), MAPPER.convertValue(values, 
type));
-        } else if (SIMPLE_TYPES.contains(returnType) || returnType.isEnum()) {
+        } else if (SIMPLE_TYPES.contains(returnType) || returnType.isEnum()
+                   || returnType.equals(ValueProvider.class)) {
           String value = Iterables.getOnlyElement(entry.getValue());
           checkArgument(returnType.equals(String.class) || !value.isEmpty(),
                "Empty argument value is only allowed for String, String Array, 
"

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index c438a43..47d7cee 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -65,6 +65,8 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.options.PipelineOptionsFactory.JsonIgnorePredicate;
 import org.apache.beam.sdk.options.PipelineOptionsFactory.Registration;
+import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.InstanceBuilder;
@@ -202,7 +204,7 @@ class ProxyInvocationHandler implements InvocationHandler, 
HasDisplayData {
    */
   synchronized <T extends PipelineOptions> T as(Class<T> iface) {
     checkNotNull(iface);
-    checkArgument(iface.isInterface());
+    checkArgument(iface.isInterface(), "Not an interface: %s", iface);
     if (!interfaceToProxyCache.containsKey(iface)) {
       Registration<T> registration =
           PipelineOptionsFactory.validateWellFormed(iface, knownInterfaces);
@@ -468,36 +470,34 @@ class ProxyInvocationHandler implements 
InvocationHandler, HasDisplayData {
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
   private Object getDefault(PipelineOptions proxy, Method method) {
+    if (method.getReturnType().equals(RuntimeValueProvider.class)) {
+      throw new RuntimeException(String.format(
+        "Method %s should not have return type "
+        + "RuntimeValueProvider, use ValueProvider instead.", 
method.getName()));
+    }
+    if (method.getReturnType().equals(StaticValueProvider.class)) {
+      throw new RuntimeException(String.format(
+        "Method %s should not have return type "
+        + "StaticValueProvider, use ValueProvider instead.", 
method.getName()));
+    }
+    @Nullable Object defaultObject = null;
     for (Annotation annotation : method.getAnnotations()) {
-      if (annotation instanceof Default.Class) {
-        return ((Default.Class) annotation).value();
-      } else if (annotation instanceof Default.String) {
-        return ((Default.String) annotation).value();
-      } else if (annotation instanceof Default.Boolean) {
-        return ((Default.Boolean) annotation).value();
-      } else if (annotation instanceof Default.Character) {
-        return ((Default.Character) annotation).value();
-      } else if (annotation instanceof Default.Byte) {
-        return ((Default.Byte) annotation).value();
-      } else if (annotation instanceof Default.Short) {
-        return ((Default.Short) annotation).value();
-      } else if (annotation instanceof Default.Integer) {
-        return ((Default.Integer) annotation).value();
-      } else if (annotation instanceof Default.Long) {
-        return ((Default.Long) annotation).value();
-      } else if (annotation instanceof Default.Float) {
-        return ((Default.Float) annotation).value();
-      } else if (annotation instanceof Default.Double) {
-        return ((Default.Double) annotation).value();
-      } else if (annotation instanceof Default.Enum) {
-        return Enum.valueOf((Class<Enum>) method.getReturnType(),
-            ((Default.Enum) annotation).value());
-      } else if (annotation instanceof Default.InstanceFactory) {
-        return InstanceBuilder.ofType(((Default.InstanceFactory) 
annotation).value())
-            .build()
-            .create(proxy);
+      defaultObject = returnDefaultHelper(annotation, proxy, method);
+      if (defaultObject != null) {
+        break;
       }
     }
+    if (method.getReturnType().equals(ValueProvider.class)) {
+      return defaultObject == null
+        ? new RuntimeValueProvider(
+          method.getName(), (Class<? extends PipelineOptions>) 
method.getDeclaringClass(),
+          proxy.getOptionsId())
+        : new RuntimeValueProvider(
+          method.getName(), (Class<? extends PipelineOptions>) 
method.getDeclaringClass(),
+          defaultObject, proxy.getOptionsId());
+    } else if (defaultObject != null) {
+      return defaultObject;
+    }
 
     /*
      * We need to make sure that we return something appropriate for the 
return type. Thus we return
@@ -507,6 +507,43 @@ class ProxyInvocationHandler implements InvocationHandler, 
HasDisplayData {
   }
 
   /**
+   * Helper method to return standard Default cases.
+   */
+  @Nullable
+  private Object returnDefaultHelper(
+    Annotation annotation, PipelineOptions proxy, Method method) {
+    if (annotation instanceof Default.Class) {
+      return ((Default.Class) annotation).value();
+    } else if (annotation instanceof Default.String) {
+      return ((Default.String) annotation).value();
+    } else if (annotation instanceof Default.Boolean) {
+      return ((Default.Boolean) annotation).value();
+    } else if (annotation instanceof Default.Character) {
+      return ((Default.Character) annotation).value();
+    } else if (annotation instanceof Default.Byte) {
+      return ((Default.Byte) annotation).value();
+    } else if (annotation instanceof Default.Short) {
+      return ((Default.Short) annotation).value();
+    } else if (annotation instanceof Default.Integer) {
+      return ((Default.Integer) annotation).value();
+    } else if (annotation instanceof Default.Long) {
+      return ((Default.Long) annotation).value();
+    } else if (annotation instanceof Default.Float) {
+      return ((Default.Float) annotation).value();
+    } else if (annotation instanceof Default.Double) {
+      return ((Default.Double) annotation).value();
+    } else if (annotation instanceof Default.Enum) {
+      return Enum.valueOf((Class<Enum>) method.getReturnType(),
+                          ((Default.Enum) annotation).value());
+    } else if (annotation instanceof Default.InstanceFactory) {
+      return InstanceBuilder.ofType(((Default.InstanceFactory) 
annotation).value())
+        .build()
+        .create(proxy);
+    }
+    return null;
+  }
+
+  /**
    * Returns a map from the getters method name to the name of the property 
based upon the passed in
    * {@link PropertyDescriptor}s property descriptors.
    *
@@ -657,6 +694,7 @@ class ProxyInvocationHandler implements InvocationHandler, 
HasDisplayData {
       PipelineOptions options =
           new ProxyInvocationHandler(Maps.<String, BoundValue>newHashMap(), 
fields)
               .as(PipelineOptions.class);
+      ValueProvider.RuntimeValueProvider.setRuntimeOptions(options);
       return options;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
new file mode 100644
index 0000000..e4502fc
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.BeanProperty;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.ContextualDeserializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+
+/**
+ * {@link ValueProvider} is an interface which abstracts the notion of
+ * fetching a value that may or may not be currently available.  This can be
+ * used to parameterize transforms that only read values in at runtime, for
+ * example.
+ */
+@JsonSerialize(using = ValueProvider.Serializer.class)
+@JsonDeserialize(using = ValueProvider.Deserializer.class)
+public interface ValueProvider<T> {
+  /**
+   * Return the value wrapped by this {@link ValueProvider}.
+   */
+  T get();
+
+  /**
+   * Whether the contents of this {@link ValueProvider} is available to
+   * routines that run at graph construction time.
+   */
+  boolean isAccessible();
+
+  /**
+   * {@link StaticValueProvider} is an implementation of {@link ValueProvider} 
that
+   * allows for a static value to be provided.
+   */
+  public static class StaticValueProvider<T> implements ValueProvider<T>, 
Serializable {
+    @Nullable
+    private final T value;
+
+    StaticValueProvider(@Nullable T value) {
+      this.value = value;
+    }
+
+    /**
+     * Creates a {@link StaticValueProvider} that wraps the provided value.
+     */
+    public static <T> StaticValueProvider<T> of(T value) {
+      StaticValueProvider<T> factory = new StaticValueProvider<>(value);
+      return factory;
+    }
+
+    @Override
+    public T get() {
+      return value;
+    }
+
+    @Override
+    public boolean isAccessible() {
+      return true;
+    }
+  }
+
+  /**
+   * {@link RuntimeValueProvider} is an implementation of {@link 
ValueProvider} that
+   * allows for a value to be provided at execution time rather than at graph
+   * construction time.
+   *
+   * <p>To enforce this contract, if there is no default, users must only call
+   * {@link #get()} at execution time (after a call to {@link Pipeline#run}),
+   * which will provide the value of {@code optionsMap}.
+   */
+  public static class RuntimeValueProvider<T> implements ValueProvider<T>, 
Serializable {
+    private static ConcurrentHashMap<Long, PipelineOptions> optionsMap =
+      new ConcurrentHashMap<>();
+
+    private final Class<? extends PipelineOptions> klass;
+    private final String methodName;
+    @Nullable
+    private final T defaultValue;
+    private final Long optionsId;
+
+    /**
+     * Creates a {@link RuntimeValueProvider} that will query the provided
+     * {@code optionsId} for a value.
+     */
+    RuntimeValueProvider(String methodName, Class<? extends PipelineOptions> 
klass,
+                         Long optionsId) {
+      this.methodName = methodName;
+      this.klass = klass;
+      this.defaultValue = null;
+      this.optionsId = optionsId;
+    }
+
+    /**
+     * Creates a {@link RuntimeValueProvider} that will query the provided
+     * {@code optionsId} for a value, or use the default if no value is 
available.
+     */
+    RuntimeValueProvider(String methodName, Class<? extends PipelineOptions> 
klass,
+      T defaultValue, Long optionsId) {
+      this.methodName = methodName;
+      this.klass = klass;
+      this.defaultValue = defaultValue;
+      this.optionsId = optionsId;
+    }
+
+    /**
+     * Once set, all {@code RuntimeValueProviders} will return {@code true}
+     * from {@code isAccessible()}. By default, the value is set when
+     * deserializing {@link PipelineOptions}.
+     */
+    static void setRuntimeOptions(PipelineOptions runtimeOptions) {
+      optionsMap.put(runtimeOptions.getOptionsId(), runtimeOptions);
+    }
+
+    @Override
+    public T get() {
+      PipelineOptions options = optionsMap.get(optionsId);
+      if (options == null) {
+        throw new RuntimeException("Not called from a runtime context.");
+      }
+      try {
+        Method method = klass.getMethod(methodName);
+        PipelineOptions methodOptions = options.as(klass);
+        InvocationHandler handler = Proxy.getInvocationHandler(methodOptions);
+        T value = ((ValueProvider<T>) handler.invoke(methodOptions, method, 
null)).get();
+        return firstNonNull(value, defaultValue);
+      } catch (Throwable e) {
+        throw new RuntimeException("Unable to load runtime value.", e);
+      }
+    }
+
+    @Override
+    public boolean isAccessible() {
+      PipelineOptions options = optionsMap.get(optionsId);
+      return options != null;
+    }
+  }
+
+  /**
+   * Serializer for {@link ValueProvider}.
+   */
+  static class Serializer extends JsonSerializer<ValueProvider<?>> {
+    @Override
+    public void serialize(ValueProvider<?> value, JsonGenerator jgen,
+                          SerializerProvider provider) throws IOException {
+      if (value.isAccessible()) {
+        jgen.writeObject(value.get());
+      } else {
+        jgen.writeNull();
+      }
+    }
+  }
+
+  /**
+   * Deserializer for {@link ValueProvider}, which handles type marshalling.
+   */
+  static class Deserializer extends JsonDeserializer<ValueProvider<?>>
+    implements ContextualDeserializer {
+
+    private final JavaType innerType;
+
+    // A 0-arg constructor is required by the compiler.
+    Deserializer() {
+      this.innerType = null;
+    }
+
+    Deserializer(JavaType innerType) {
+      this.innerType = innerType;
+    }
+
+    @Override
+    public JsonDeserializer<?> createContextual(DeserializationContext ctxt,
+                                                BeanProperty property)
+        throws JsonMappingException {
+      checkNotNull(ctxt, "Null DeserializationContext.");
+      JavaType type = checkNotNull(ctxt.getContextualType(), "Invalid type: 
%s", getClass());
+      JavaType[] params = type.findTypeParameters(ValueProvider.class);
+      if (params.length != 1) {
+        throw new RuntimeException(
+          "Unable to derive type for ValueProvider: " + type.toString());
+      }
+      JavaType param = params[0];
+      return new Deserializer(param);
+    }
+
+    @Override
+    public ValueProvider<?> deserialize(JsonParser jp, DeserializationContext 
ctxt)
+        throws IOException, JsonProcessingException {
+      JsonDeserializer dser = ctxt.findRootValueDeserializer(
+        checkNotNull(innerType, "Invalid %s: innerType is null. Serialization 
error?", getClass()));
+      Object o = dser.deserialize(jp, ctxt);
+      return StaticValueProvider.of(o);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 5d8ef43..eecfff8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -244,12 +244,14 @@ public class ProxyInvocationHandlerTest {
   public void testToStringAfterDeserializationContainsJsonEntries() throws 
Exception {
     ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, 
Object>newHashMap());
     Simple proxy = handler.as(Simple.class);
+    Long optionsId = proxy.getOptionsId();
     proxy.setString("stringValue");
     DefaultAnnotations proxy2 = proxy.as(DefaultAnnotations.class);
     proxy2.setLong(57L);
-    assertEquals("Current Settings:\n"
+    assertEquals(String.format("Current Settings:\n"
         + "  long: 57\n"
-        + "  string: \"stringValue\"\n",
+        + "  optionsId: %d\n"
+        + "  string: \"stringValue\"\n", optionsId),
         serializeDeserialize(PipelineOptions.class, proxy2).toString());
   }
 
@@ -257,14 +259,16 @@ public class ProxyInvocationHandlerTest {
   public void testToStringAfterDeserializationContainsOverriddenEntries() 
throws Exception {
     ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, 
Object>newHashMap());
     Simple proxy = handler.as(Simple.class);
+    Long optionsId = proxy.getOptionsId();
     proxy.setString("stringValue");
     DefaultAnnotations proxy2 = proxy.as(DefaultAnnotations.class);
     proxy2.setLong(57L);
     Simple deserializedOptions = serializeDeserialize(Simple.class, proxy2);
     deserializedOptions.setString("overriddenValue");
-    assertEquals("Current Settings:\n"
+    assertEquals(String.format("Current Settings:\n"
         + "  long: 57\n"
-        + "  string: overriddenValue\n",
+        + "  optionsId: %d\n"
+        + "  string: overriddenValue\n", optionsId),
         deserializedOptions.toString());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
new file mode 100644
index 0000000..0cde615
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.options.ValueProvider.RuntimeValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ValueProvider}. */
+@RunWith(JUnit4.class)
+public class ValueProviderTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  /** A test interface. */
+  public static interface TestOptions extends PipelineOptions {
+    @Default.String("bar")
+    ValueProvider<String> getBar();
+    void setBar(ValueProvider<String> bar);
+
+    ValueProvider<String> getFoo();
+    void setFoo(ValueProvider<String> foo);
+
+    ValueProvider<List<Integer>> getList();
+    void setList(ValueProvider<List<Integer>> list);
+  }
+
+  @Test
+  public void testCommandLineNoDefault() {
+    TestOptions options = PipelineOptionsFactory.fromArgs(
+      new String[]{"--foo=baz"}).as(TestOptions.class);
+    ValueProvider<String> provider = options.getFoo();
+    assertEquals("baz", provider.get());
+    assertTrue(provider.isAccessible());
+  }
+
+  @Test
+  public void testListValueProvider() {
+    TestOptions options = PipelineOptionsFactory.fromArgs(
+      new String[]{"--list=1,2,3"}).as(TestOptions.class);
+    ValueProvider<List<Integer>> provider = options.getList();
+    assertEquals(ImmutableList.of(1, 2, 3), provider.get());
+    assertTrue(provider.isAccessible());
+  }
+
+  @Test
+  public void testCommandLineWithDefault() {
+    TestOptions options = PipelineOptionsFactory.fromArgs(
+      new String[]{"--bar=baz"}).as(TestOptions.class);
+    ValueProvider<String> provider = options.getBar();
+    assertEquals("baz", provider.get());
+    assertTrue(provider.isAccessible());
+  }
+
+  @Test
+  public void testStaticValueProvider() {
+    ValueProvider<String> provider = StaticValueProvider.of("foo");
+    assertEquals("foo", provider.get());
+    assertTrue(provider.isAccessible());
+  }
+
+  @Test
+  public void testNoDefaultRuntimeProvider() {
+    TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+    ValueProvider<String> provider = options.getFoo();
+    assertFalse(provider.isAccessible());
+
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage("Not called from a runtime context");
+    provider.get();
+  }
+
+  @Test
+  public void testDefaultRuntimeProvider() {
+    TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+    ValueProvider<String> provider = options.getBar();
+    assertFalse(provider.isAccessible());
+  }
+
+  @Test
+  public void testNoDefaultRuntimeProviderWithOverride() throws Exception {
+    ObjectMapper mapper = new ObjectMapper();
+    TestOptions runtime = mapper.readValue(
+      "{ \"options\": { \"foo\": \"quux\" }}", PipelineOptions.class)
+      .as(TestOptions.class);
+
+    TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+    runtime.setOptionsId(options.getOptionsId());
+    RuntimeValueProvider.setRuntimeOptions(runtime);
+
+    ValueProvider<String> provider = options.getFoo();
+    assertTrue(provider.isAccessible());
+    assertEquals("quux", provider.get());
+  }
+
+  @Test
+  public void testDefaultRuntimeProviderWithOverride() throws Exception {
+    ObjectMapper mapper = new ObjectMapper();
+    TestOptions runtime = mapper.readValue(
+      "{ \"options\": { \"bar\": \"quux\" }}", PipelineOptions.class)
+      .as(TestOptions.class);
+
+    TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+    runtime.setOptionsId(options.getOptionsId());
+    RuntimeValueProvider.setRuntimeOptions(runtime);
+
+    ValueProvider<String> provider = options.getBar();
+    assertTrue(provider.isAccessible());
+    assertEquals("quux", provider.get());
+  }
+
+  /** A test interface. */
+  public static interface BadOptionsRuntime extends PipelineOptions {
+    RuntimeValueProvider<String> getBar();
+    void setBar(RuntimeValueProvider<String> bar);
+  }
+
+  @Test
+  public void testOptionReturnTypeRuntime() {
+    BadOptionsRuntime options = 
PipelineOptionsFactory.as(BadOptionsRuntime.class);
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage(
+      "Method getBar should not have return type "
+      + "RuntimeValueProvider, use ValueProvider instead.");
+    RuntimeValueProvider<String> provider = options.getBar();
+  }
+
+  /** A test interface. */
+  public static interface BadOptionsStatic extends PipelineOptions {
+    StaticValueProvider<String> getBar();
+    void setBar(StaticValueProvider<String> bar);
+  }
+
+  @Test
+  public void testOptionReturnTypeStatic() {
+    BadOptionsStatic options = 
PipelineOptionsFactory.as(BadOptionsStatic.class);
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage(
+      "Method getBar should not have return type "
+      + "StaticValueProvider, use ValueProvider instead.");
+    StaticValueProvider<String> provider = options.getBar();
+  }
+
+  @Test
+  public void testSerializeDeserializeNoArg() throws Exception {
+    TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class);
+    assertFalse(submitOptions.getFoo().isAccessible());
+    ObjectMapper mapper = new ObjectMapper();
+    String serializedOptions = mapper.writeValueAsString(submitOptions);
+
+    // This is the expected behavior of the runner: deserialize and set the
+    // the runtime options.
+    String anchor = "\"appName\":\"ValueProviderTest\"";
+    assertThat(serializedOptions, containsString("\"foo\":null"));
+    String runnerString = serializedOptions.replaceAll(
+      "\"foo\":null", "\"foo\":\"quux\"");
+    TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class)
+      .as(TestOptions.class);
+
+    ValueProvider<String> vp = runtime.getFoo();
+    assertTrue(vp.isAccessible());
+    assertEquals("quux", vp.get());
+    assertEquals(vp.getClass(), StaticValueProvider.class);
+  }
+
+  @Test
+  public void testSerializeDeserializeWithArg() throws Exception {
+    TestOptions submitOptions = PipelineOptionsFactory.fromArgs(
+      new String[]{"--foo=baz"}).as(TestOptions.class);
+    assertEquals("baz", submitOptions.getFoo().get());
+    assertTrue(submitOptions.getFoo().isAccessible());
+    ObjectMapper mapper = new ObjectMapper();
+    String serializedOptions = mapper.writeValueAsString(submitOptions);
+
+    // This is the expected behavior of the runner: deserialize and set the
+    // the runtime options.
+    assertThat(serializedOptions, containsString("baz"));
+    String runnerString = serializedOptions.replaceAll("baz", "quux");
+    TestOptions runtime = mapper.readValue(runnerString, PipelineOptions.class)
+      .as(TestOptions.class);
+
+    ValueProvider<String> vp = runtime.getFoo();
+    assertTrue(vp.isAccessible());
+    assertEquals("quux", vp.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66686e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
index ea771b4..92dcbb8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
@@ -89,6 +89,9 @@ public class ApiSurfaceTest {
           inPackage("com.google.rpc"),
           inPackage("com.google.type"),
           inPackage("com.fasterxml.jackson.annotation"),
+          inPackage("com.fasterxml.jackson.core"),
+          inPackage("com.fasterxml.jackson.databind"),
+          inPackage("com.fasterxml.jackson.deser"),
           inPackage("io.grpc"),
           inPackage("org.apache.avro"),
           inPackage("org.apache.commons.logging"), // via BigTable

Reply via email to