[ 
https://issues.apache.org/jira/browse/BEAM-3800?focusedWorklogId=84307&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-84307
 ]

ASF GitHub Bot logged work on BEAM-3800:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Mar/18 11:16
            Start Date: 26/Mar/18 11:16
    Worklog Time Spent: 10m 
      Work Description: aljoscha closed pull request #4903: [BEAM-3800] Set 
uids on Flink operators
URL: https://github.com/apache/beam/pull/4903
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 8f954a5cf0e..970ece1a4a5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.flink;
 
+import static java.lang.String.format;
 import static 
org.apache.beam.runners.core.construction.SplittableParDo.SPLITTABLE_PROCESS_URN;
 
 import com.google.auto.service.AutoService;
@@ -198,13 +199,14 @@ public void translateNode(
                 context.getExecutionEnvironment().getParallelism());
         nonDedupSource = context
             .getExecutionEnvironment()
-            .addSource(sourceWrapper).name(fullName).returns(withIdTypeInfo);
+            .addSource(sourceWrapper)
+            .name(fullName).uid(fullName)
+            .returns(withIdTypeInfo);
 
         if (rawSource.requiresDeduping()) {
-          source =
-              nonDedupSource
-                  .keyBy(new ValueWithRecordIdKeySelector<>())
-                  .transform("deduping", outputTypeInfo, new 
DedupingOperator<>());
+          source = nonDedupSource.keyBy(new ValueWithRecordIdKeySelector<>())
+              .transform("deduping", outputTypeInfo, new DedupingOperator<>())
+              .uid(format("%s/__deduplicated__", fullName));
         } else {
           source = nonDedupSource.flatMap(new 
StripIdsMap<>()).returns(outputTypeInfo);
         }
@@ -297,7 +299,9 @@ public void translateNode(
                 context.getExecutionEnvironment().getParallelism());
         source = context
             .getExecutionEnvironment()
-            .addSource(sourceWrapper).name(fullName).returns(outputTypeInfo);
+            .addSource(sourceWrapper)
+            .name(fullName).uid(fullName)
+            .returns(outputTypeInfo);
       } catch (Exception e) {
         throw new RuntimeException(
             "Error while translating BoundedSource: " + rawSource, e);
@@ -511,7 +515,7 @@ public RawUnionValue map(T o) throws Exception {
                 transformedSideInputs.f0);
 
         if (stateful) {
-          // we have to manually contruct the two-input transform because 
we're not
+          // we have to manually construct the two-input transform because 
we're not
           // allowed to have only one input keyed, normally.
           KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
           TwoInputTransformation<
@@ -542,6 +546,7 @@ public RawUnionValue map(T o) throws Exception {
         }
       }
 
+      outputStream.uid(transformName);
       context.setOutputDataStream(outputs.get(mainOutputTag), outputStream);
 
       for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
@@ -722,9 +727,10 @@ public void translateNode(
       FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
           new FlinkAssignWindows<>(windowFn);
 
+      String fullName = context.getOutput(transform).getName();
       SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = 
inputDataStream
           .flatMap(assignWindowsFunction)
-          .name(context.getOutput(transform).getName())
+          .name(fullName).uid(fullName)
           .returns(typeInfo);
 
       context.setOutputDataStream(context.getOutput(transform), 
outputDataStream);
@@ -822,7 +828,8 @@ public void translateNode(
       @SuppressWarnings("unchecked")
       SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> 
outDataStream =
           keyedWorkItemStream
-              .transform(fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator);
+              .transform(fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator)
+              .uid(fullName);
 
       context.setOutputDataStream(context.getOutput(transform), outDataStream);
     }
@@ -860,7 +867,7 @@ boolean canTranslate(
     public void translateNode(
         PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> 
transform,
         FlinkStreamingTranslationContext context) {
-
+      String fullName = getCurrentTransformName(context);
       PCollection<KV<K, InputT>> input = context.getInput(transform);
 
       @SuppressWarnings("unchecked")
@@ -919,7 +926,6 @@ public void translateNode(
         throw new RuntimeException(e);
       }
 
-      String fullName = getCurrentTransformName(context);
       if (sideInputs.isEmpty()) {
         TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
         WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
@@ -940,9 +946,9 @@ public void translateNode(
         // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
         @SuppressWarnings("unchecked")
         SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> 
outDataStream =
-            keyedWorkItemStream.transform(
-                fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator);
-
+            keyedWorkItemStream
+                .transform(fullName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator)
+                .uid(fullName);
         context.setOutputDataStream(context.getOutput(transform), 
outDataStream);
       } else {
         Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> 
transformSideInputs =
@@ -963,7 +969,7 @@ public void translateNode(
                 context.getPipelineOptions(),
                 inputKvCoder.getKeyCoder());
 
-        // we have to manually contruct the two-input transform because we're 
not
+        // we have to manually construct the two-input transform because we're 
not
         // allowed to have only one input keyed, normally.
 
         TwoInputTransformation<
@@ -985,6 +991,7 @@ public void translateNode(
             new SingleOutputStreamOperator(
                 keyedWorkItemStream.getExecutionEnvironment(),
                 rawFlinkTransform) {}; // we have to cheat around the ctor 
being protected
+        outDataStream.uid(fullName);
 
         
keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 84307)
    Time Spent: 0.5h  (was: 20m)

> Set uids on Flink operators
> ---------------------------
>
>                 Key: BEAM-3800
>                 URL: https://issues.apache.org/jira/browse/BEAM-3800
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Grzegorz Kołakowski
>            Assignee: Grzegorz Kołakowski
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Flink operators should have unique ids assigned, which are, in turn, used for 
> checkpointing stateful operators. Assigning operator ids is highly 
> recommended according to Flink documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to