[ 
https://issues.apache.org/jira/browse/BEAM-3971?focusedWorklogId=118280&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118280
 ]

ASF GitHub Bot logged work on BEAM-3971:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Jul/18 17:42
            Start Date: 02/Jul/18 17:42
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #5833: 
[BEAM-3971, BEAM-4284] Remove fromProto for Pipeline and PTransform translation.
URL: https://github.com/apache/beam/pull/5833#discussion_r199571699
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 ##########
 @@ -912,30 +922,81 @@ public void translateNode(
       TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
-      WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
-          new WindowDoFnOperator<>(
-              reduceFn,
-              fullName,
-              (Coder) windowedWorkItemCoder,
-              mainTag,
-              Collections.emptyList(),
-              new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
-              windowingStrategy,
-              new HashMap<>(), /* side-input mapping */
-              Collections.emptyList(), /* side inputs */
-              context.getPipelineOptions(),
-              inputKvCoder.getKeyCoder(),
-              keySelector);
+      List<PCollectionView<?>> sideInputs = ((Combine.PerKey) 
transform).getSideInputs();
 
-      // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
-      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
-      @SuppressWarnings("unchecked")
-      SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
-          keyedWorkItemStream
-              .transform(fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator)
-              .uid(fullName);
-      context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      if (sideInputs.isEmpty()) {
+        TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                fullName,
+                (Coder) windowedWorkItemCoder,
+                mainTag,
+                Collections.emptyList(),
+                new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, 
outputCoder),
+                windowingStrategy,
+                new HashMap<>(), /* side-input mapping */
+                Collections.emptyList(), /* side inputs */
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder(),
+                keySelector);
+
+        // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
 
 Review comment:
   fixed here and elsewhere.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 118280)
    Time Spent: 6h  (was: 5h 50m)

> Pipeline translation utilities should not use SDK construction classes
> ----------------------------------------------------------------------
>
>                 Key: BEAM-3971
>                 URL: https://issues.apache.org/jira/browse/BEAM-3971
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 6h
>  Remaining Estimate: 0h
>
> In general, portable runners will require access to pipeline information not 
> available in rehydrated pipelines while constructing physical plans. 
> Translation utilities should operate directly on protos or on thin, 
> information-preserving wrappers.
> The pipeline fusion utilities already operate on protos directly and can be 
> used as an example of how this could be done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to