Repository: beam
Updated Branches:
  refs/heads/master d7a4e4943 -> 6a68e656a


Add TransformPayloadTranslatorRegistrar


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4a639702
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4a639702
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4a639702

Branch: refs/heads/master
Commit: 4a63970261be22660f5fef8bf37b5d18301315ef
Parents: 64cea06
Author: Thomas Groh <[email protected]>
Authored: Fri May 19 15:24:19 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Fri May 19 21:07:48 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |  7 ++++-
 .../runners/core/construction/PTransforms.java  | 21 +++++++++-----
 .../beam/runners/core/construction/ParDos.java  | 15 ++++++++++
 .../TransformPayloadTranslatorRegistrar.java    | 29 ++++++++++++++++++++
 4 files changed, 64 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4a639702/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml 
b/runners/core-construction-java/pom.xml
index abf0b65..7eaa6f3 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -90,6 +90,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.auto.value</groupId>
       <artifactId>auto-value</artifactId>
       <scope>provided</scope>
@@ -114,6 +120,5 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
-
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/4a639702/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
index 16276b9..9826b77 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
@@ -24,12 +24,11 @@ import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.ParDos.ParDoPayloadTranslator;
+import java.util.ServiceLoader;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -40,11 +39,19 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 public class PTransforms {
   private static final Map<Class<? extends PTransform>, 
TransformPayloadTranslator>
-      KNOWN_PAYLOAD_TRANSLATORS =
-          ImmutableMap.<Class<? extends PTransform>, 
TransformPayloadTranslator>builder()
-              .put(ParDo.MultiOutput.class, ParDoPayloadTranslator.create())
-              .build();
-  // TODO: Load via service loader.
+      KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
+
+  private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
+      loadTransformPayloadTranslators() {
+    ImmutableMap.Builder<Class<? extends PTransform>, 
TransformPayloadTranslator> builder =
+        ImmutableMap.builder();
+    for (TransformPayloadTranslatorRegistrar registrar :
+        ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+      builder.putAll(registrar.getTransformPayloadTranslators());
+    }
+    return builder.build();
+  }
+
   private PTransforms() {}
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/4a639702/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
index b2b29df..4752bd1 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Optional;
 import com.google.protobuf.Any;
@@ -28,6 +29,7 @@ import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import 
org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
@@ -46,6 +48,7 @@ import 
org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.ViewFn;
@@ -107,6 +110,18 @@ public class ParDos {
           .setParameter(Any.pack(payload))
           .build();
     }
+
+    /**
+     * Registers {@link ParDoPayloadTranslator}.
+     */
+    @AutoService(TransformPayloadTranslatorRegistrar.class)
+    public static class Registrar implements 
TransformPayloadTranslatorRegistrar {
+      @Override
+      public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+          getTransformPayloadTranslators() {
+        return Collections.singletonMap(ParDo.MultiOutput.class, new 
ParDoPayloadTranslator());
+      }
+    }
   }
 
   public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, 
SdkComponents components) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4a639702/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
new file mode 100644
index 0000000..bc568a6
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import java.util.Map;
+import 
org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/** A registrar of TransformPayloadTranslator. */
+public interface TransformPayloadTranslatorRegistrar {
+  Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+      getTransformPayloadTranslators();
+}

Reply via email to