[
https://issues.apache.org/jira/browse/FLINK-10863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685395#comment-16685395
]
ASF GitHub Bot commented on FLINK-10863:
----------------------------------------
asfgit closed pull request #7085: [FLINK-10863][tests] Assign UIDs to all
operators in DataStreamAllrou…
URL: https://github.com/apache/flink/pull/7085
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index b14e2af1b52..3c406c7598d 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception {
setupEnvironment(env, pt);
// add a keyed stateful map operator, which uses Kryo for state
serialization
- DataStream<Event> eventStream =
env.addSource(createEventSource(pt))
+ DataStream<Event> eventStream =
env.addSource(createEventSource(pt)).uid("0001")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey)
.map(createArtificialKeyedStateMapper(
@@ -97,7 +97,7 @@ public static void main(String[] args) throws Exception {
new
StatefulComplexPayloadSerializer()), // custom stateful serializer
Collections.singletonList(ComplexPayload.class) // KryoSerializer via type
extraction
)
- ).returns(Event.class).name(KEYED_STATE_OPER_NAME +
"_Kryo_and_Custom_Stateful");
+ ).returns(Event.class).name(KEYED_STATE_OPER_NAME +
"_Kryo_and_Custom_Stateful").uid("0002");
// add a keyed stateful map operator, which uses Avro for state
serialization
eventStream = eventStream
@@ -124,12 +124,12 @@ public static void main(String[] args) throws Exception {
new
AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer
Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type
extraction
)
- ).returns(Event.class).name(KEYED_STATE_OPER_NAME +
"_Avro");
+ ).returns(Event.class).name(KEYED_STATE_OPER_NAME +
"_Avro").uid("0003");
DataStream<Event> eventStream2 = eventStream
.map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in))
- .name(OPERATOR_STATE_OPER_NAME)
- .returns(Event.class);
+ .returns(Event.class)
+ .name(OPERATOR_STATE_OPER_NAME).uid("0004");
// apply a tumbling window that simply passes forward window
elements;
// this allows the job to cover timers state
@@ -141,19 +141,20 @@ public void apply(Integer integer, TimeWindow window,
Iterable<Event> input, Col
out.collect(e);
}
}
- }).name(TIME_WINDOW_OPER_NAME);
+ }).name(TIME_WINDOW_OPER_NAME).uid("0005");
if (isSimulateFailures(pt)) {
eventStream3 = eventStream3
.map(createFailureMapper(pt))
.setParallelism(1)
- .name(FAILURE_MAPPER_NAME);
+ .name(FAILURE_MAPPER_NAME).uid("0006");
}
eventStream3.keyBy(Event::getKey)
.flatMap(createSemanticsCheckMapper(pt))
.name(SEMANTICS_CHECK_MAPPER_NAME)
- .addSink(new PrintSinkFunction<>());
+ .uid("0007")
+ .addSink(new PrintSinkFunction<>()).uid("0008");
env.execute("General purpose test job");
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Assign uids to all operators
> ----------------------------
>
> Key: FLINK-10863
> URL: https://issues.apache.org/jira/browse/FLINK-10863
> Project: Flink
> Issue Type: Sub-task
> Components: E2E Tests
> Affects Versions: 1.5.5, 1.6.2, 1.7.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> We should assign uids to operators in the test so that we can also properly
> test removing operators.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)