AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime]
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328136330
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
##########
@@ -150,43 +139,26 @@ public void setup(StreamTask<?, ?> containingTask,
StreamConfig config, Output<S
this.inStreamElementSerializer = new StreamElementSerializer<>(
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
- // create the operators executor for the complete operations of
the queue entries
- this.executor = Executors.newSingleThreadExecutor();
-
switch (outputMode) {
case ORDERED:
- queue = new OrderedStreamElementQueue(
- capacity,
- executor,
- this);
+ queue = new
OrderedStreamElementQueue<>(capacity);
break;
case UNORDERED:
- queue = new UnorderedStreamElementQueue(
- capacity,
- executor,
- this);
+ queue = new
UnorderedStreamElementQueue<>(capacity);
break;
default:
throw new IllegalStateException("Unknown async
mode: " + outputMode + '.');
}
+
+ this.timestampedCollector = new TimestampedCollector<>(output);
}
@Override
public void open() throws Exception {
super.open();
- // create the emitter
- this.emitter = new Emitter<>(checkpointingLock,
mailboxExecutor, output, queue, this);
-
- // start the emitter thread
- this.emitterThread = new Thread(emitter,
"AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
- emitterThread.setDaemon(true);
- emitterThread.start();
-
- // process stream elements from state, since the Emit thread
will start as soon as all
- // elements from previous state are in the StreamElementQueue,
we have to make sure that the
- // order to open all operators in the operator chain proceeds
from the tail operator to the
- // head operator.
+ // Process stream elements from state. We have to make sure
that the order to open all operators in the
+ // operator chain proceeds from the tail operator to the head
operator.
Review comment:
Yes, sounds like a good hotfix.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services