Repository: beam Updated Branches: refs/heads/master d7a4e4943 -> 6a68e656a
Add TransformPayloadTranslatorRegistrar Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4a639702 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4a639702 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4a639702 Branch: refs/heads/master Commit: 4a63970261be22660f5fef8bf37b5d18301315ef Parents: 64cea06 Author: Thomas Groh <[email protected]> Authored: Fri May 19 15:24:19 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri May 19 21:07:48 2017 -0700 ---------------------------------------------------------------------- runners/core-construction-java/pom.xml | 7 ++++- .../runners/core/construction/PTransforms.java | 21 +++++++++----- .../beam/runners/core/construction/ParDos.java | 15 ++++++++++ .../TransformPayloadTranslatorRegistrar.java | 29 ++++++++++++++++++++ 4 files changed, 64 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4a639702/runners/core-construction-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml index abf0b65..7eaa6f3 100644 --- a/runners/core-construction-java/pom.xml +++ b/runners/core-construction-java/pom.xml @@ -90,6 +90,12 @@ </dependency> <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <dependency> <groupId>com.google.auto.value</groupId> <artifactId>auto-value</artifactId> <scope>provided</scope> @@ -114,6 +120,5 @@ <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> - </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/4a639702/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java index 16276b9..9826b77 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java @@ -24,12 +24,11 @@ import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.construction.ParDos.ParDoPayloadTranslator; +import java.util.ServiceLoader; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; @@ -40,11 +39,19 @@ import org.apache.beam.sdk.values.TupleTag; */ public class PTransforms { private static final Map<Class<? extends PTransform>, TransformPayloadTranslator> - KNOWN_PAYLOAD_TRANSLATORS = - ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder() - .put(ParDo.MultiOutput.class, ParDoPayloadTranslator.create()) - .build(); - // TODO: Load via service loader. + KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); + + private static Map<Class<? extends PTransform>, TransformPayloadTranslator> + loadTransformPayloadTranslators() { + ImmutableMap.Builder<Class<? extends PTransform>, TransformPayloadTranslator> builder = + ImmutableMap.builder(); + for (TransformPayloadTranslatorRegistrar registrar : + ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) { + builder.putAll(registrar.getTransformPayloadTranslators()); + } + return builder.build(); + } + private PTransforms() {} /** http://git-wip-us.apache.org/repos/asf/beam/blob/4a639702/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java index b2b29df..4752bd1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import com.google.common.base.Optional; import com.google.protobuf.Any; @@ -28,6 +29,7 @@ import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator; @@ -46,6 +48,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.ViewFn; @@ -107,6 +110,18 @@ public class ParDos { .setParameter(Any.pack(payload)) .build(); } + + /** + * Registers {@link ParDoPayloadTranslator}. + */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator()); + } + } } public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) { http://git-wip-us.apache.org/repos/asf/beam/blob/4a639702/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java new file mode 100644 index 0000000..bc568a6 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator; +import org.apache.beam.sdk.transforms.PTransform; + +/** A registrar of TransformPayloadTranslator. */ +public interface TransformPayloadTranslatorRegistrar { + Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators(); +}
