[ 
https://issues.apache.org/jira/browse/BEAM-12704?focusedWorklogId=641773&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-641773
 ]

ASF GitHub Bot logged work on BEAM-12704:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/21 16:02
            Start Date: 25/Aug/21 16:02
    Worklog Time Spent: 10m 
      Work Description: dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695897495



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -474,20 +479,64 @@ public void flatMap(T t, Collector<T> collector) {
       throw new RuntimeException("Failed to parse ReadPayload from transform", 
e);
     }
 
-    Preconditions.checkState(
-        payload.getIsBounded() != RunnerApi.IsBounded.Enum.BOUNDED,
-        "Bounded reads should run inside an environment instead of being 
translated by the Runner.");
+    final DataStream<WindowedValue<T>> source;
+    if (payload.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED) {
+      source =
+          translateBoundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    } else {
+      source =
+          translateUnboundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    }
+    context.addDataStream(outputCollectionId, source);
+  }
 
-    DataStream<WindowedValue<T>> source =
-        translateUnboundedSource(
-            transform.getUniqueName(),
-            outputCollectionId,
-            payload,
-            pipeline,
-            context.getPipelineOptions(),
-            context.getExecutionEnvironment());
+  private <T> DataStream<WindowedValue<T>> translateBoundedSource(
+      String transformName,
+      String outputCollectionId,
+      RunnerApi.ReadPayload payload,
+      RunnerApi.Pipeline pipeline,
+      FlinkPipelineOptions pipelineOptions,
+      StreamExecutionEnvironment env) {
 
-    context.addDataStream(outputCollectionId, source);
+    try {
+      @SuppressWarnings("unchecked")
+      BoundedSource<T> boundedSource =
+          (BoundedSource<T>) ReadTranslation.boundedSourceFromProto(payload);
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder)
+              instantiateCoder(outputCollectionId, pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInfo =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      return env.createInput(new SourceInputFormat<>(transformName, 
boundedSource, pipelineOptions))
+          .name(transformName)
+          .uid(transformName)
+          .returns(sdkTypeInfo)
+          .map(value -> intoWireTypes(sdkCoder, wireCoder, value))

Review comment:
       It would be nice to give this transform a name (and uid).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 641773)
    Time Spent: 4h 10m  (was: 4h)

> Fix primitive Read on portable Flink Runner
> -------------------------------------------
>
>                 Key: BEAM-12704
>                 URL: https://issues.apache.org/jira/browse/BEAM-12704
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>    Affects Versions: 2.32.0
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: P2
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> {{ReadSourcePortableTest}} is not testing correctly the expansion into 
> primitive Read. As a result, the primitive Read operation is broken.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to