[
https://issues.apache.org/jira/browse/BEAM-3800?focusedWorklogId=84307&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-84307
]
ASF GitHub Bot logged work on BEAM-3800:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Mar/18 11:16
Start Date: 26/Mar/18 11:16
Worklog Time Spent: 10m
Work Description: aljoscha closed pull request #4903: [BEAM-3800] Set
uids on Flink operators
URL: https://github.com/apache/beam/pull/4903
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 8f954a5cf0e..970ece1a4a5 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.flink;
+import static java.lang.String.format;
import static
org.apache.beam.runners.core.construction.SplittableParDo.SPLITTABLE_PROCESS_URN;
import com.google.auto.service.AutoService;
@@ -198,13 +199,14 @@ public void translateNode(
context.getExecutionEnvironment().getParallelism());
nonDedupSource = context
.getExecutionEnvironment()
- .addSource(sourceWrapper).name(fullName).returns(withIdTypeInfo);
+ .addSource(sourceWrapper)
+ .name(fullName).uid(fullName)
+ .returns(withIdTypeInfo);
if (rawSource.requiresDeduping()) {
- source =
- nonDedupSource
- .keyBy(new ValueWithRecordIdKeySelector<>())
- .transform("deduping", outputTypeInfo, new
DedupingOperator<>());
+ source = nonDedupSource.keyBy(new ValueWithRecordIdKeySelector<>())
+ .transform("deduping", outputTypeInfo, new DedupingOperator<>())
+ .uid(format("%s/__deduplicated__", fullName));
} else {
source = nonDedupSource.flatMap(new
StripIdsMap<>()).returns(outputTypeInfo);
}
@@ -297,7 +299,9 @@ public void translateNode(
context.getExecutionEnvironment().getParallelism());
source = context
.getExecutionEnvironment()
- .addSource(sourceWrapper).name(fullName).returns(outputTypeInfo);
+ .addSource(sourceWrapper)
+ .name(fullName).uid(fullName)
+ .returns(outputTypeInfo);
} catch (Exception e) {
throw new RuntimeException(
"Error while translating BoundedSource: " + rawSource, e);
@@ -511,7 +515,7 @@ public RawUnionValue map(T o) throws Exception {
transformedSideInputs.f0);
if (stateful) {
- // we have to manually contruct the two-input transform because
we're not
+ // we have to manually construct the two-input transform because
we're not
// allowed to have only one input keyed, normally.
KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
TwoInputTransformation<
@@ -542,6 +546,7 @@ public RawUnionValue map(T o) throws Exception {
}
}
+ outputStream.uid(transformName);
context.setOutputDataStream(outputs.get(mainOutputTag), outputStream);
for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
@@ -722,9 +727,10 @@ public void translateNode(
FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
new FlinkAssignWindows<>(windowFn);
+ String fullName = context.getOutput(transform).getName();
SingleOutputStreamOperator<WindowedValue<T>> outputDataStream =
inputDataStream
.flatMap(assignWindowsFunction)
- .name(context.getOutput(transform).getName())
+ .name(fullName).uid(fullName)
.returns(typeInfo);
context.setOutputDataStream(context.getOutput(transform),
outputDataStream);
@@ -822,7 +828,8 @@ public void translateNode(
@SuppressWarnings("unchecked")
SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>>
outDataStream =
keyedWorkItemStream
- .transform(fullName, outputTypeInfo, (OneInputStreamOperator)
doFnOperator);
+ .transform(fullName, outputTypeInfo, (OneInputStreamOperator)
doFnOperator)
+ .uid(fullName);
context.setOutputDataStream(context.getOutput(transform), outDataStream);
}
@@ -860,7 +867,7 @@ boolean canTranslate(
public void translateNode(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
transform,
FlinkStreamingTranslationContext context) {
-
+ String fullName = getCurrentTransformName(context);
PCollection<KV<K, InputT>> input = context.getInput(transform);
@SuppressWarnings("unchecked")
@@ -919,7 +926,6 @@ public void translateNode(
throw new RuntimeException(e);
}
- String fullName = getCurrentTransformName(context);
if (sideInputs.isEmpty()) {
TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
@@ -940,9 +946,9 @@ public void translateNode(
// is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java
doesn't like it ...
@SuppressWarnings("unchecked")
SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>>
outDataStream =
- keyedWorkItemStream.transform(
- fullName, outputTypeInfo, (OneInputStreamOperator)
doFnOperator);
-
+ keyedWorkItemStream
+ .transform(fullName, outputTypeInfo, (OneInputStreamOperator)
doFnOperator)
+ .uid(fullName);
context.setOutputDataStream(context.getOutput(transform),
outDataStream);
} else {
Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
transformSideInputs =
@@ -963,7 +969,7 @@ public void translateNode(
context.getPipelineOptions(),
inputKvCoder.getKeyCoder());
- // we have to manually contruct the two-input transform because we're
not
+ // we have to manually construct the two-input transform because we're
not
// allowed to have only one input keyed, normally.
TwoInputTransformation<
@@ -985,6 +991,7 @@ public void translateNode(
new SingleOutputStreamOperator(
keyedWorkItemStream.getExecutionEnvironment(),
rawFlinkTransform) {}; // we have to cheat around the ctor
being protected
+ outDataStream.uid(fullName);
keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 84307)
Time Spent: 0.5h (was: 20m)
> Set uids on Flink operators
> ---------------------------
>
> Key: BEAM-3800
> URL: https://issues.apache.org/jira/browse/BEAM-3800
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Grzegorz Kołakowski
> Assignee: Grzegorz Kołakowski
> Priority: Major
> Fix For: 2.5.0
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Flink operators should have unique ids assigned, which are, in turn, used for
> checkpointing stateful operators. Assigning operator ids is highly
> recommended according to Flink documentation.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)