Repository: beam Updated Branches: refs/heads/master 5c2cab017 -> f6c840533
[BEAM-2644] Introduces TestPipeline.newProvider() Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b19b71 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b19b71 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b19b71 Branch: refs/heads/master Commit: f1b19b71d2905079a4640d9fb89e02985ca6e873 Parents: 5c2cab0 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Aug 23 19:13:46 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Wed Aug 30 12:14:34 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/options/ValueProvider.java | 10 ++-- .../apache/beam/sdk/options/ValueProviders.java | 15 +++--- .../apache/beam/sdk/testing/TestPipeline.java | 49 +++++++++++++++++++- .../sdk/transforms/display/DisplayData.java | 5 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++++-- .../sdk/options/ProxyInvocationHandlerTest.java | 4 +- .../beam/sdk/options/ValueProviderTest.java | 23 +++++---- .../beam/sdk/testing/TestPipelineTest.java | 37 ++++++++++++++- .../apache/beam/sdk/transforms/CreateTest.java | 22 ++------- 9 files changed, 127 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java index 15413e8..3e6a24b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java @@ -41,6 +41,7 @@ import java.lang.reflect.Proxy; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; @@ -54,18 +55,21 @@ import org.apache.beam.sdk.values.PCollection; * <p>A common task is to create a {@link PCollection} containing the value of this * {@link ValueProvider} regardless of whether it's accessible at construction time or not. * For that, use {@link Create#ofProvider}. + * + * <p>For unit-testing a transform against a {@link ValueProvider} that only becomes available + * at runtime, use {@link TestPipeline#newProvider}. */ @JsonSerialize(using = ValueProvider.Serializer.class) @JsonDeserialize(using = ValueProvider.Deserializer.class) public interface ValueProvider<T> extends Serializable { /** - * Return the value wrapped by this {@link ValueProvider}. + * Returns the runtime value wrapped by this {@link ValueProvider} in case it is {@link + * #isAccessible}, otherwise fails. */ T get(); /** - * Whether the contents of this {@link ValueProvider} is available to - * routines that run at graph construction time. + * Whether the contents of this {@link ValueProvider} is currently available via {@link #get}. */ boolean isAccessible(); http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java index 2fffffa..9345462 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java @@ -22,17 +22,19 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; import java.util.Map; +import org.apache.beam.sdk.testing.TestPipeline; -/** - * Utilities for working with the {@link ValueProvider} interface. - */ +/** Utilities for working with the {@link ValueProvider} interface. */ public class ValueProviders { private ValueProviders() {} /** - * Given {@code serializedOptions} as a JSON-serialized {@link PipelineOptions}, updates - * the values according to the provided values in {@code runtimeValues}. + * Given {@code serializedOptions} as a JSON-serialized {@link PipelineOptions}, updates the + * values according to the provided values in {@code runtimeValues}. + * + * @deprecated Use {@link TestPipeline#newProvider} for testing {@link ValueProvider} code. */ + @Deprecated public static String updateSerializedOptions( String serializedOptions, Map<String, String> runtimeValues) { ObjectNode root, options; @@ -41,8 +43,7 @@ public class ValueProviders { options = (ObjectNode) root.get("options"); checkNotNull(options, "Unable to locate 'options' in %s", serializedOptions); } catch (IOException e) { - throw new RuntimeException( - String.format("Unable to parse %s", serializedOptions), e); + throw new RuntimeException(String.format("Unable to parse %s", serializedOptions), e); } for (Map.Entry<String, String> entry : runtimeValues.entrySet()) { http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index b67b14f..be2f193 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -31,15 +31,19 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.base.Strings; import com.google.common.collect.FluentIterable; +import com.google.common.collect.Maps; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.UUID; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricResult; @@ -49,7 +53,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; @@ -341,7 +348,12 @@ public class TestPipeline extends Pipeline implements TestRule { final PipelineResult pipelineResult; try { enforcement.get().beforePipelineExecution(); - pipelineResult = super.run(options); + PipelineOptions updatedOptions = + MAPPER.convertValue(MAPPER.valueToTree(options), PipelineOptions.class); + updatedOptions + .as(TestValueProviderOptions.class) + .setProviderRuntimeValues(StaticValueProvider.of(providerRuntimeValues)); + pipelineResult = super.run(updatedOptions); verifyPAssertsSucceeded(this, pipelineResult); } catch (RuntimeException exc) { Throwable cause = exc.getCause(); @@ -358,6 +370,41 @@ public class TestPipeline extends Pipeline implements TestRule { return pipelineResult; } + /** Implementation detail of {@link #newProvider}, do not use. */ + @Internal + public interface TestValueProviderOptions extends PipelineOptions { + ValueProvider<Map<String, Object>> getProviderRuntimeValues(); + void setProviderRuntimeValues(ValueProvider<Map<String, Object>> runtimeValues); + } + + /** + * Returns a new {@link ValueProvider} that is inaccessible before {@link #run}, but will be + * accessible while the pipeline runs. + */ + public <T> ValueProvider<T> newProvider(T runtimeValue) { + String uuid = UUID.randomUUID().toString(); + providerRuntimeValues.put(uuid, runtimeValue); + return ValueProvider.NestedValueProvider.of( + options.as(TestValueProviderOptions.class).getProviderRuntimeValues(), + new GetFromRuntimeValues<T>(uuid)); + } + + private final Map<String, Object> providerRuntimeValues = Maps.newHashMap(); + + private static class GetFromRuntimeValues<T> + implements SerializableFunction<Map<String, Object>, T> { + private final String key; + + private GetFromRuntimeValues(String key) { + this.key = key; + } + + @Override + public T apply(Map<String, Object> input) { + return (T) input.get(key); + } + } + /** * Enables the abandoned node detection. Abandoned nodes are <code>PTransforms</code>, <code> * PAsserts</code> included, that were not executed by the pipeline runner. Abandoned nodes are http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 10ef428..917c070 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -796,8 +796,9 @@ public class DisplayData implements Serializable { // Don't re-wrap exceptions recursively. throw e; } catch (Throwable e) { - String msg = String.format("Error while populating display data for component: %s", - namespace.getName()); + String msg = String.format( + "Error while populating display data for component '%s': %s", + namespace.getName(), e.getMessage()); throw new PopulateDisplayDataException(msg, e); } http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index f49443d..8870dd8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -65,7 +65,6 @@ import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -226,14 +225,19 @@ public class AvroIOTest { ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); - ValueProvider<String> pathProvider = StaticValueProvider.of(outputFile.getAbsolutePath()); - writePipeline .apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class).to(pathProvider).withoutSharding()); + .apply( + AvroIO.write(GenericClass.class) + .to(writePipeline.newProvider(outputFile.getAbsolutePath())) + .withoutSharding()); writePipeline.run().waitUntilFinish(); - PAssert.that(readPipeline.apply("Read", AvroIO.read(GenericClass.class).from(pathProvider))) + PAssert.that( + readPipeline.apply( + "Read", + AvroIO.read(GenericClass.class) + .from(readPipeline.newProvider(outputFile.getAbsolutePath())))) .containsInAnyOrder(values); readPipeline.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index fb0a0d7..fe8a0f9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -24,7 +24,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -62,7 +61,6 @@ import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.rules.ExternalResource; import org.junit.rules.TestRule; @@ -797,7 +795,7 @@ public class ProxyInvocationHandlerTest { expectedException.expectMessage( ProxyInvocationHandler.PipelineOptionsDisplayData.class.getName()); - expectedException.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!"))); + expectedException.expectMessage("oh noes!!"); p.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java index 7bbbf7e..51a92e3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java @@ -23,8 +23,8 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -194,16 +194,16 @@ public class ValueProviderTest { StaticValueProvider<String> provider = options.getBar(); } + @Test public void testSerializeDeserializeNoArg() throws Exception { TestOptions submitOptions = PipelineOptionsFactory.as(TestOptions.class); assertFalse(submitOptions.getFoo().isAccessible()); - String serializedOptions = MAPPER.writeValueAsString(submitOptions); - String runnerString = ValueProviders.updateSerializedOptions( - serializedOptions, ImmutableMap.of("foo", "quux")); - TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class) - .as(TestOptions.class); + ObjectNode root = MAPPER.valueToTree(submitOptions); + ((ObjectNode) root.get("options")).put("foo", "quux"); + TestOptions runtime = + MAPPER.convertValue(root, PipelineOptions.class).as(TestOptions.class); ValueProvider<String> vp = runtime.getFoo(); assertTrue(vp.isAccessible()); @@ -214,14 +214,13 @@ public class ValueProviderTest { @Test public void testSerializeDeserializeWithArg() throws Exception { TestOptions submitOptions = PipelineOptionsFactory.fromArgs("--foo=baz").as(TestOptions.class); - assertEquals("baz", submitOptions.getFoo().get()); assertTrue(submitOptions.getFoo().isAccessible()); - String serializedOptions = MAPPER.writeValueAsString(submitOptions); + assertEquals("baz", submitOptions.getFoo().get()); - String runnerString = ValueProviders.updateSerializedOptions( - serializedOptions, ImmutableMap.of("foo", "quux")); - TestOptions runtime = MAPPER.readValue(runnerString, PipelineOptions.class) - .as(TestOptions.class); + ObjectNode root = MAPPER.valueToTree(submitOptions); + ((ObjectNode) root.get("options")).put("foo", "quux"); + TestOptions runtime = + MAPPER.convertValue(root, PipelineOptions.class).as(TestOptions.class); ValueProvider<String> vp = runtime.getFoo(); assertTrue(vp.isAccessible()); http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 664f2f4..ec681ea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.testing; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -37,8 +38,10 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.PCollection; @@ -59,7 +62,8 @@ import org.junit.runners.Suite; @Suite.SuiteClasses({ TestPipelineTest.TestPipelineCreationTest.class, TestPipelineTest.TestPipelineEnforcementsTest.WithRealPipelineRunner.class, - TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class + TestPipelineTest.TestPipelineEnforcementsTest.WithCrashingPipelineRunner.class, + TestPipelineTest.NewProviderTest.class }) public class TestPipelineTest implements Serializable { private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( @@ -337,4 +341,35 @@ public class TestPipelineTest implements Serializable { } } } + + /** Tests for {@link TestPipeline#newProvider}. */ + @RunWith(JUnit4.class) + public static class NewProviderTest implements Serializable { + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(ValidatesRunner.class) + public void testNewProvider() { + ValueProvider<String> foo = pipeline.newProvider("foo"); + ValueProvider<String> foobar = + ValueProvider.NestedValueProvider.of( + foo, + new SerializableFunction<String, String>() { + @Override + public String apply(String input) { + return input + "bar"; + } + }); + + assertFalse(foo.isAccessible()); + assertFalse(foobar.isAccessible()); + + PAssert.that(pipeline.apply("create foo", Create.ofProvider(foo, StringUtf8Coder.of()))) + .containsInAnyOrder("foo"); + PAssert.that(pipeline.apply("create foobar", Create.ofProvider(foobar, StringUtf8Coder.of()))) + .containsInAnyOrder("foobar"); + + pipeline.run(); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/f1b19b71/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 81ad947..1c7e1af 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -25,9 +25,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.io.IOException; import java.io.InputStream; @@ -52,7 +50,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.options.ValueProviders; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -60,7 +57,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create.Values.CreateSource; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; @@ -355,9 +351,6 @@ public class CreateTest { p.run(); } - private static final ObjectMapper MAPPER = new ObjectMapper().registerModules( - ObjectMapper.findModules(ReflectHelpers.findClassLoader())); - /** Testing options for {@link #testCreateOfProvider()}. */ public interface CreateOfProviderOptions extends PipelineOptions { ValueProvider<String> getFoo(); @@ -385,19 +378,12 @@ public class CreateTest { }), StringUtf8Coder.of()))) .containsInAnyOrder("foobar"); - CreateOfProviderOptions submitOptions = - p.getOptions().as(CreateOfProviderOptions.class); PAssert.that( - p.apply("Runtime", Create.ofProvider(submitOptions.getFoo(), StringUtf8Coder.of()))) - .containsInAnyOrder("runtime foo"); - - String serializedOptions = MAPPER.writeValueAsString(p.getOptions()); - String runnerString = ValueProviders.updateSerializedOptions( - serializedOptions, ImmutableMap.of("foo", "runtime foo")); - CreateOfProviderOptions runtimeOptions = - MAPPER.readValue(runnerString, PipelineOptions.class).as(CreateOfProviderOptions.class); + p.apply( + "Runtime", Create.ofProvider(p.newProvider("runtimeFoo"), StringUtf8Coder.of()))) + .containsInAnyOrder("runtimeFoo"); - p.run(runtimeOptions); + p.run(); }