Repository: incubator-beam Updated Branches: refs/heads/apex-runner 989e39987 -> 8827ccf6b
[BEAM-815] Fix ApexPipelineOptions conversion and add ApexRunnerRegistrar Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8f8a80d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8f8a80d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8f8a80d Branch: refs/heads/apex-runner Commit: c8f8a80d4c6846fd941fbba08727b7a3ecaca7e1 Parents: c9f1406 Author: Ismaël MejÃa <[email protected]> Authored: Mon Oct 24 11:30:46 2016 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Tue Oct 25 22:36:19 2016 +0200 ---------------------------------------------------------------------- runners/apex/pom.xml | 6 +- .../apache/beam/runners/apex/ApexRunner.java | 5 +- .../beam/runners/apex/ApexRunnerRegistrar.java | 61 ++++++++++++++++++++ .../apex/translators/utils/ApexStreamTuple.java | 2 +- .../runners/apex/ApexRunnerRegistrarTest.java | 47 +++++++++++++++ 5 files changed, 116 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 8b62410..de191f5 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -138,11 +138,11 @@ <scope>test</scope> </dependency--> <!-- Optional Pipeline Registration --> - <!--dependency> + <dependency> <groupId>com.google.auto.service</groupId> <artifactId>auto-service</artifactId> <optional>true</optional> - </dependency--> + </dependency> </dependencies> <build> @@ -183,7 +183,7 @@ <systemPropertyVariables> <beamTestPipelineOptions> [ - "--runner=org.apache.beam.runners.apex.TestApexRunner" + "--runner=TestApexRunner" ] </beamTestPipelineOptions> </systemPropertyVariables> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index f3c44bb..8da4ec3 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.apex.translators.TranslationContext; import org.apache.beam.runners.core.AssignWindows; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; @@ -75,7 +76,9 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } public static ApexRunner fromOptions(PipelineOptions options) { - return new ApexRunner((ApexPipelineOptions) options); + ApexPipelineOptions apexPipelineOptions = + PipelineOptionsValidator.validate(ApexPipelineOptions.class, options); + return new ApexRunner(apexPipelineOptions); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java new file mode 100644 index 0000000..aa6ef45 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +/** + * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the + * {@link ApexRunner}. + * + * {@link AutoService} will register Apex's implementations of the {@link PipelineRunner} + * and {@link PipelineOptions} as available pipeline runner services. + */ +public final class ApexRunnerRegistrar { + private ApexRunnerRegistrar() {} + + /** + * Registers the {@link ApexRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { + return ImmutableList + .<Class<? extends PipelineRunner<?>>>of(ApexRunner.class, TestApexRunner.class); + } + } + + /** + * Registers the {@link ApexPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(ApexPipelineOptions.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java index a260a66..7f8b0fa 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java @@ -134,7 +134,7 @@ public interface ApexStreamTuple<T> { /** * Coder for {@link ApexStreamTuple}. */ - public static class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> { + class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> { private static final long serialVersionUID = 1L; final Coder<T> valueCoder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java new file mode 100644 index 0000000..7af5585 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +/** + * Tests the proper registration of the Apex runner. + */ +public class ApexRunnerRegistrarTest { + + @Test + public void testFullName() { + String[] args = + new String[] {String.format("--runner=%s", ApexRunner.class.getName())}; + PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); + assertEquals(opts.getRunner(), ApexRunner.class); + } + + @Test + public void testClassName() { + String[] args = + new String[] {String.format("--runner=%s", ApexRunner.class.getSimpleName())}; + PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); + assertEquals(opts.getRunner(), ApexRunner.class); + } + +}
