[ 
https://issues.apache.org/jira/browse/BEAM-3450?focusedWorklogId=156249&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-156249
 ]

ASF GitHub Bot logged work on BEAM-3450:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Oct/18 07:09
            Start Date: 19/Oct/18 07:09
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6741: [BEAM-3450] Log 
when the runner is not properly setting the coder.
URL: https://github.com/apache/beam/pull/6741
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 6ec31551288..c6813086991 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -98,9 +98,17 @@
               .setPrimitiveTransformReference(pTransformId)
               .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
               .build();
-      RunnerApi.Coder coderSpec =
-          coders.get(
-              
pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+      RunnerApi.Coder coderSpec;
+      if 
(RemoteGrpcPortRead.fromPTransform(pTransform).getPort().getCoderId().isEmpty())
 {
+        LOG.error(
+            "Missing required coder_id on grpc_port for %s; using deprecated 
fallback.",
+            pTransformId);
+        coderSpec =
+            coders.get(
+                
pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+      } else {
+        coderSpec = null;
+      }
       Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers =
           (Collection)
               
pCollectionIdsToConsumers.get(getOnlyElement(pTransform.getOutputsMap().values()));
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 02f3babc817..0c48913bfa6 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -47,6 +47,8 @@
 import org.apache.beam.sdk.fn.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Registers as a consumer with the Beam Fn Data Api. Consumes elements and 
encodes them for
@@ -57,6 +59,8 @@
  */
 public class BeamFnDataWriteRunner<InputT> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataWriteRunner.class);
+
   /** A registrar which provides a factory to handle writing to the Fn Api 
Data Plane. */
   @AutoService(PTransformRunnerFactory.Registrar.class)
   public static class Registrar implements PTransformRunnerFactory.Registrar {
@@ -91,9 +95,17 @@
               .setPrimitiveTransformReference(pTransformId)
               .setName(getOnlyElement(pTransform.getInputsMap().keySet()))
               .build();
-      RunnerApi.Coder coderSpec =
-          coders.get(
-              
pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
+      RunnerApi.Coder coderSpec;
+      if 
(RemoteGrpcPortWrite.fromPTransform(pTransform).getPort().getCoderId().isEmpty())
 {
+        LOG.error(
+            "Missing required coder_id on grpc_port for %s; using deprecated 
fallback.",
+            pTransformId);
+        coderSpec =
+            coders.get(
+                
pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
+      } else {
+        coderSpec = null;
+      }
       BeamFnDataWriteRunner<InputT> runner =
           new BeamFnDataWriteRunner<>(
               pTransform, processBundleInstructionId, target, coderSpec, 
coders, beamFnDataClient);
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index e4737b4ad09..6b1fc414695 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -582,15 +582,21 @@ def create(factory, transform_id, transform_proto, 
grpc_port, consumers):
   target = beam_fn_api_pb2.Target(
       primitive_transform_reference=transform_id,
       name=only_element(list(transform_proto.outputs.keys())))
+  if grpc_port.coder_id:
+    output_coder = factory.get_coder(grpc_port.coder_id)
+  else:
+    logging.error(
+        'Missing required coder_id on grpc_port for %s; '
+        'using deprecated fallback.',
+        transform_id)
+    output_coder = factory.get_only_output_coder(transform_proto)
   return DataInputOperation(
       transform_proto.unique_name,
       transform_proto.unique_name,
       consumers,
       factory.counter_factory,
       factory.state_sampler,
-      factory.get_coder(grpc_port.coder_id)
-      if grpc_port.coder_id
-      else factory.get_only_output_coder(transform_proto),
+      output_coder,
       input_target=target,
       data_channel=factory.data_channel_factory.create_data_channel(grpc_port))
 
@@ -601,15 +607,21 @@ def create(factory, transform_id, transform_proto, 
grpc_port, consumers):
   target = beam_fn_api_pb2.Target(
       primitive_transform_reference=transform_id,
       name=only_element(list(transform_proto.inputs.keys())))
+  if grpc_port.coder_id:
+    output_coder = factory.get_coder(grpc_port.coder_id)
+  else:
+    logging.error(
+        'Missing required coder_id on grpc_port for %s; '
+        'using deprecated fallback.',
+        transform_id)
+    output_coder = factory.get_only_input_coder(transform_proto)
   return DataOutputOperation(
       transform_proto.unique_name,
       transform_proto.unique_name,
       consumers,
       factory.counter_factory,
       factory.state_sampler,
-      factory.get_coder(grpc_port.coder_id)
-      if grpc_port.coder_id
-      else factory.get_only_input_coder(transform_proto),
+      output_coder,
       target=target,
       data_channel=factory.data_channel_factory.create_data_channel(grpc_port))
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 156249)
    Time Spent: 40m  (was: 0.5h)

> RemoteGrpcPorts should contain the wire format
> ----------------------------------------------
>
>                 Key: BEAM-3450
>                 URL: https://issues.apache.org/jira/browse/BEAM-3450
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model, runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>             Fix For: 2.3.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> This forces the runner to include the wire format independently from a 
> PCollection coder, which should be the coder of the type of the PCollection 
> (e.g. in Java, PCollection<T> has Coder<T> instead of 
> Coder<WindowedValue<T>>, but the runner must use Coder<WindowedValue<T>> over 
> edges).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to