This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 76d7fb1  [FLINK-10863][tests] Assign UIDs to all operators in 
DataStreamAllroundTestProgram
76d7fb1 is described below

commit 76d7fb10488feb7cf83c20c00e331a12a8896492
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Tue Nov 13 12:32:56 2018 +0100

    [FLINK-10863][tests] Assign UIDs to all operators in 
DataStreamAllroundTestProgram
    
    This closes #7085.
---
 .../streaming/tests/DataStreamAllroundTestProgram.java  | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 4372175..685ba87 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -77,7 +77,7 @@ public class DataStreamAllroundTestProgram {
                setupEnvironment(env, pt);
 
                // add a keyed stateful map operator, which uses Kryo for state 
serialization
-               DataStream<Event> eventStream = 
env.addSource(createEventSource(pt))
+               DataStream<Event> eventStream = 
env.addSource(createEventSource(pt)).uid("0001")
                        
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
                        .keyBy(Event::getKey)
                        .map(createArtificialKeyedStateMapper(
@@ -95,7 +95,7 @@ public class DataStreamAllroundTestProgram {
                                                new 
KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom 
KryoSerializer
                                        
Collections.singletonList(ComplexPayload.class) // KryoSerializer via type 
extraction
                                )
-                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Kryo");
+                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Kryo").uid("0002");
 
                // add a keyed stateful map operator, which uses Avro for state 
serialization
                eventStream = eventStream
@@ -122,12 +122,12 @@ public class DataStreamAllroundTestProgram {
                                                new 
AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer
                                        
Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type 
extraction
                                )
-                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Avro");
+                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Avro").uid("0003");
 
                DataStream<Event> eventStream2 = eventStream
                        
.map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in))
-                       .name(OPERATOR_STATE_OPER_NAME)
-                       .returns(Event.class);
+                       .returns(Event.class)
+                       .name(OPERATOR_STATE_OPER_NAME).uid("0004");
 
                // apply a tumbling window that simply passes forward window 
elements;
                // this allows the job to cover timers state
@@ -139,19 +139,20 @@ public class DataStreamAllroundTestProgram {
                                                out.collect(e);
                                        }
                                }
-                       }).name(TIME_WINDOW_OPER_NAME);
+                       }).name(TIME_WINDOW_OPER_NAME).uid("0005");
 
                if (isSimulateFailures(pt)) {
                        eventStream3 = eventStream3
                                .map(createFailureMapper(pt))
                                .setParallelism(1)
-                               .name(FAILURE_MAPPER_NAME);
+                               .name(FAILURE_MAPPER_NAME).uid("0006");
                }
 
                eventStream3.keyBy(Event::getKey)
                        .flatMap(createSemanticsCheckMapper(pt))
                        .name(SEMANTICS_CHECK_MAPPER_NAME)
-                       .addSink(new PrintSinkFunction<>());
+                       .uid("0007")
+                       .addSink(new PrintSinkFunction<>()).uid("0008");
 
                env.execute("General purpose test job");
        }

Reply via email to