This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push: new 76d7fb1 [FLINK-10863][tests] Assign UIDs to all operators in DataStreamAllroundTestProgram 76d7fb1 is described below commit 76d7fb10488feb7cf83c20c00e331a12a8896492 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Tue Nov 13 12:32:56 2018 +0100 [FLINK-10863][tests] Assign UIDs to all operators in DataStreamAllroundTestProgram This closes #7085. --- .../streaming/tests/DataStreamAllroundTestProgram.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index 4372175..685ba87 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -77,7 +77,7 @@ public class DataStreamAllroundTestProgram { setupEnvironment(env, pt); // add a keyed stateful map operator, which uses Kryo for state serialization - DataStream<Event> eventStream = env.addSource(createEventSource(pt)) + DataStream<Event> eventStream = env.addSource(createEventSource(pt)).uid("0001") .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) .keyBy(Event::getKey) .map(createArtificialKeyedStateMapper( @@ -95,7 +95,7 @@ public class DataStreamAllroundTestProgram { new KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom KryoSerializer Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction ) - ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo"); + ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo").uid("0002"); // add a keyed stateful map operator, which uses Avro for state serialization eventStream = eventStream @@ -122,12 +122,12 @@ public class DataStreamAllroundTestProgram { new AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction ) - ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro"); + ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro").uid("0003"); DataStream<Event> eventStream2 = eventStream .map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in)) - .name(OPERATOR_STATE_OPER_NAME) - .returns(Event.class); + .returns(Event.class) + .name(OPERATOR_STATE_OPER_NAME).uid("0004"); // apply a tumbling window that simply passes forward window elements; // this allows the job to cover timers state @@ -139,19 +139,20 @@ public class DataStreamAllroundTestProgram { out.collect(e); } } - }).name(TIME_WINDOW_OPER_NAME); + }).name(TIME_WINDOW_OPER_NAME).uid("0005"); if (isSimulateFailures(pt)) { eventStream3 = eventStream3 .map(createFailureMapper(pt)) .setParallelism(1) - .name(FAILURE_MAPPER_NAME); + .name(FAILURE_MAPPER_NAME).uid("0006"); } eventStream3.keyBy(Event::getKey) .flatMap(createSemanticsCheckMapper(pt)) .name(SEMANTICS_CHECK_MAPPER_NAME) - .addSink(new PrintSinkFunction<>()); + .uid("0007") + .addSink(new PrintSinkFunction<>()).uid("0008"); env.execute("General purpose test job"); }