[ 
https://issues.apache.org/jira/browse/BEAM-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16329694#comment-16329694
 ] 

ASF GitHub Bot commented on BEAM-3450:
--------------------------------------

tgroh closed pull request #4386: [BEAM-3450] Add wire_coder_id to RemoteGrpcPort
URL: https://github.com/apache/beam/pull/4386
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto 
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 36ed4242d6c..16f7709982d 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -76,6 +76,9 @@ message RemoteGrpcPort {
   // (Required) An API descriptor which describes where to
   // connect to including any authentication that is required.
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
api_service_descriptor = 1;
+
+  // (Required) The ID of the Coder that will be used to encode and decode 
data sent over this port.
+  string coder_id = 2;
 }
 
 /*
diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml
index b20e72b62e3..ca520c87d52 100644
--- a/sdks/java/fn-execution/pom.xml
+++ b/sdks/java/fn-execution/pom.xml
@@ -66,6 +66,11 @@
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 3e9d8a01eba..f5db80a64c8 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -34,8 +34,10 @@
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.construction.CoderTranslation;
@@ -121,7 +123,7 @@
   private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
   private final FnDataReceiver<WindowedValue<OutputT>> receiver;
   private final Supplier<String> processBundleInstructionIdSupplier;
-  private final BeamFnDataClient beamFnDataClientFactory;
+  private final BeamFnDataClient beamFnDataClient;
   private final Coder<WindowedValue<OutputT>> coder;
   private final BeamFnApi.Target inputTarget;
 
@@ -133,28 +135,37 @@
       BeamFnApi.Target inputTarget,
       RunnerApi.Coder coderSpec,
       Map<String, RunnerApi.Coder> coders,
-      BeamFnDataClient beamFnDataClientFactory,
+      BeamFnDataClient beamFnDataClient,
       Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers)
-          throws IOException {
-    this.apiServiceDescriptor =
-        
RemoteGrpcPortRead.fromPTransform(grpcReadNode).getPort().getApiServiceDescriptor();
+      throws IOException {
+    RemoteGrpcPort port = 
RemoteGrpcPortRead.fromPTransform(grpcReadNode).getPort();
+    this.apiServiceDescriptor = port.getApiServiceDescriptor();
     this.inputTarget = inputTarget;
     this.processBundleInstructionIdSupplier = 
processBundleInstructionIdSupplier;
-    this.beamFnDataClientFactory = beamFnDataClientFactory;
+    this.beamFnDataClient = beamFnDataClient;
     this.receiver = MultiplexingFnDataReceiver.forConsumers(consumers);
 
+    RehydratedComponents components =
+        
RehydratedComponents.forComponents(Components.newBuilder().putAllCoders(coders).build());
     @SuppressWarnings("unchecked")
-    Coder<WindowedValue<OutputT>> coder =
-        (Coder<WindowedValue<OutputT>>)
-            CoderTranslation.fromProto(
-                coderSpec,
-                RehydratedComponents.forComponents(
-                    
RunnerApi.Components.newBuilder().putAllCoders(coders).build()));
+    Coder<WindowedValue<OutputT>> coder;
+    if (!port.getCoderId().isEmpty()) {
+      coder =
+          (Coder<WindowedValue<OutputT>>)
+              CoderTranslation.fromProto(coders.get(port.getCoderId()), 
components);
+    } else {
+      // TODO: Remove this path once it is no longer used
+      coder =
+          (Coder<WindowedValue<OutputT>>)
+              CoderTranslation.fromProto(
+                  coderSpec,
+                  components);
+    }
     this.coder = coder;
   }
 
   public void registerInputLocation() {
-    this.readFuture = beamFnDataClientFactory.receive(
+    this.readFuture = beamFnDataClient.receive(
         apiServiceDescriptor,
         LogicalEndpoint.of(processBundleInstructionIdSupplier.get(), 
inputTarget),
         coder,
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 81fd136752f..09bbc6330af 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -31,8 +31,10 @@
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.construction.CoderTranslation;
@@ -123,19 +125,24 @@
       Map<String, RunnerApi.Coder> coders,
       BeamFnDataClient beamFnDataClientFactory)
           throws IOException {
-    this.apiServiceDescriptor =
-        
RemoteGrpcPortWrite.fromPTransform(remoteWriteNode).getPort().getApiServiceDescriptor();
+    RemoteGrpcPort port = 
RemoteGrpcPortWrite.fromPTransform(remoteWriteNode).getPort();
+    this.apiServiceDescriptor = port.getApiServiceDescriptor();
     this.beamFnDataClientFactory = beamFnDataClientFactory;
     this.processBundleInstructionIdSupplier = 
processBundleInstructionIdSupplier;
     this.outputTarget = outputTarget;
 
+    RehydratedComponents components =
+        
RehydratedComponents.forComponents(Components.newBuilder().putAllCoders(coders).build());
     @SuppressWarnings("unchecked")
-    Coder<WindowedValue<InputT>> coder =
-        (Coder<WindowedValue<InputT>>)
-            CoderTranslation.fromProto(
-                coderSpec,
-                RehydratedComponents.forComponents(
-                    
RunnerApi.Components.newBuilder().putAllCoders(coders).build()));
+    Coder<WindowedValue<InputT>> coder;
+    if (!port.getCoderId().isEmpty()) {
+      coder =
+          (Coder<WindowedValue<InputT>>)
+              CoderTranslation.fromProto(coders.get(port.getCoderId()), 
components);
+    } else {
+      // TODO: remove this path once it is no longer used
+      coder = (Coder<WindowedValue<InputT>>) 
CoderTranslation.fromProto(coderSpec, components);
+    }
     this.coder = coder;
   }
 
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index b4dc1685e7c..7e4a8b52278 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -78,13 +78,18 @@
 @RunWith(JUnit4.class)
 public class BeamFnDataReadRunnerTest {
 
-  private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = 
BeamFnApi.RemoteGrpcPort.newBuilder()
-      
.setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build();
+  private static final Coder<String> ELEMENT_CODER = StringUtf8Coder.of();
+  private static final String ELEMENT_CODER_SPEC_ID = "string-coder-id";
   private static final Coder<WindowedValue<String>> CODER =
-      WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE);
-  private static final String CODER_SPEC_ID = "string-coder-id";
+      WindowedValue.getFullCoder(ELEMENT_CODER, GlobalWindow.Coder.INSTANCE);
+  private static final String CODER_SPEC_ID = "windowed-string-coder-id";
   private static final RunnerApi.Coder CODER_SPEC;
   private static final RunnerApi.Components COMPONENTS;
+  private static final BeamFnApi.RemoteGrpcPort PORT_SPEC =
+      BeamFnApi.RemoteGrpcPort.newBuilder()
+          
.setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance())
+          .setCoderId(CODER_SPEC_ID)
+          .build();
 
   static {
     try {
@@ -95,6 +100,7 @@
               .getComponents()
               .toBuilder()
               .putCoders(CODER_SPEC_ID, CODER_SPEC)
+              .putCoders(ELEMENT_CODER_SPEC_ID, 
CoderTranslation.toProto(ELEMENT_CODER).getCoder())
               .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
@@ -138,7 +144,7 @@ public void testCreatingAndProcessingBeamFnDataReadRunner() 
throws Exception {
         pTransform,
         Suppliers.ofInstance(bundleId)::get,
         ImmutableMap.of(localOutputId,
-            
RunnerApi.PCollection.newBuilder().setCoderId(CODER_SPEC_ID).build()),
+            
RunnerApi.PCollection.newBuilder().setCoderId(ELEMENT_CODER_SPEC_ID).build()),
         COMPONENTS.getCodersMap(),
         COMPONENTS.getWindowingStrategiesMap(),
         consumers,
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index fbe33d55516..aaf2b3fc9a3 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -73,20 +73,31 @@
 @RunWith(JUnit4.class)
 public class BeamFnDataWriteRunnerTest {
 
-  private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = 
BeamFnApi.RemoteGrpcPort.newBuilder()
-      
.setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build();
-  private static final String CODER_ID = "string-coder-id";
-  private static final Coder<WindowedValue<String>> CODER =
-      WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE);
-  private static final RunnerApi.Coder CODER_SPEC;
+  private static final String ELEM_CODER_ID = "string-coder-id";
+  private static final Coder<String> ELEM_CODER = StringUtf8Coder.of();
+  private static final String WIRE_CODER_ID = "windowed-string-coder-id";
+  private static final Coder<WindowedValue<String>> WIRE_CODER =
+      WindowedValue.getFullCoder(ELEM_CODER, GlobalWindow.Coder.INSTANCE);
+  private static final RunnerApi.Coder WIRE_CODER_SPEC;
   private static final RunnerApi.Components COMPONENTS;
 
+  private static final BeamFnApi.RemoteGrpcPort PORT_SPEC =
+      BeamFnApi.RemoteGrpcPort.newBuilder()
+          
.setApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance())
+          .setCoderId(WIRE_CODER_ID)
+          .build();
+
   static {
     try {
-      MessageWithComponents coderAndComponents = 
CoderTranslation.toProto(CODER);
-      CODER_SPEC = coderAndComponents.getCoder();
+      MessageWithComponents coderAndComponents = 
CoderTranslation.toProto(WIRE_CODER);
+      WIRE_CODER_SPEC = coderAndComponents.getCoder();
       COMPONENTS =
-          coderAndComponents.getComponents().toBuilder().putCoders(CODER_ID, 
CODER_SPEC).build();
+          coderAndComponents
+              .getComponents()
+              .toBuilder()
+              .putCoders(WIRE_CODER_ID, WIRE_CODER_SPEC)
+              .putCoders(ELEM_CODER_ID, 
CoderTranslation.toProto(ELEM_CODER).getCoder())
+              .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }
@@ -124,7 +135,7 @@ public void 
testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception {
         pTransform,
         Suppliers.ofInstance(bundleId)::get,
         ImmutableMap.of(localInputId,
-            RunnerApi.PCollection.newBuilder().setCoderId(CODER_ID).build()),
+            
RunnerApi.PCollection.newBuilder().setCoderId(ELEM_CODER_ID).build()),
         COMPONENTS.getCodersMap(),
         COMPONENTS.getWindowingStrategiesMap(),
         consumers,
@@ -164,7 +175,7 @@ public void accept(WindowedValue<String> t) throws 
Exception {
                     // RemoteGrpcPortWrite uses
                     
.setName(Iterables.getOnlyElement(pTransform.getInputsMap().keySet()))
                     .build())),
-        eq(CODER));
+        eq(WIRE_CODER));
 
     assertThat(consumers.keySet(), containsInAnyOrder(localInputId));
     
Iterables.getOnlyElement(consumers.get(localInputId)).accept(valueInGlobalWindow("TestValue"));
@@ -190,8 +201,7 @@ public void testReuseForMultipleBundles() throws Exception {
     BeamFnDataWriteRunner<String> writeRunner = new BeamFnDataWriteRunner<>(
         RemoteGrpcPortWrite.writeToPort("myWrite", PORT_SPEC).toPTransform(),
         bundleId::get,
-        OUTPUT_TARGET,
-        CODER_SPEC,
+        OUTPUT_TARGET, WIRE_CODER_SPEC,
         COMPONENTS.getCodersMap(),
         mockBeamFnDataClient);
 
@@ -201,7 +211,7 @@ public void testReuseForMultipleBundles() throws Exception {
     verify(mockBeamFnDataClient).send(
         eq(PORT_SPEC.getApiServiceDescriptor()),
         eq(LogicalEndpoint.of(bundleId.get(), OUTPUT_TARGET)),
-        eq(CODER));
+        eq(WIRE_CODER));
 
     writeRunner.consume(valueInGlobalWindow("ABC"));
     writeRunner.consume(valueInGlobalWindow("DEF"));
@@ -219,7 +229,7 @@ public void testReuseForMultipleBundles() throws Exception {
     verify(mockBeamFnDataClient).send(
         eq(PORT_SPEC.getApiServiceDescriptor()),
         eq(LogicalEndpoint.of(bundleId.get(), OUTPUT_TARGET)),
-        eq(CODER));
+        eq(WIRE_CODER));
 
     writeRunner.consume(valueInGlobalWindow("GHI"));
     writeRunner.consume(valueInGlobalWindow("JKL"));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> RemoteGrpcPorts should contain the wire format
> ----------------------------------------------
>
>                 Key: BEAM-3450
>                 URL: https://issues.apache.org/jira/browse/BEAM-3450
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model, runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>
> This forces the runner to include the wire format independently from a 
> PCollection coder, which should be the coder of the type of the PCollection 
> (e.g. in Java, PCollection<T> has Coder<T> instead of 
> Coder<WindowedValue<T>>, but the runner must use Coder<WindowedValue<T>> over 
> edges).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to