1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328063741
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ########## @@ -150,43 +139,26 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S this.inStreamElementSerializer = new StreamElementSerializer<>( getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader())); - // create the operators executor for the complete operations of the queue entries - this.executor = Executors.newSingleThreadExecutor(); - switch (outputMode) { case ORDERED: - queue = new OrderedStreamElementQueue( - capacity, - executor, - this); + queue = new OrderedStreamElementQueue<>(capacity); break; case UNORDERED: - queue = new UnorderedStreamElementQueue( - capacity, - executor, - this); + queue = new UnorderedStreamElementQueue<>(capacity); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } + + this.timestampedCollector = new TimestampedCollector<>(output); } @Override public void open() throws Exception { super.open(); - // create the emitter - this.emitter = new Emitter<>(checkpointingLock, mailboxExecutor, output, queue, this); - - // start the emitter thread - this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')'); - emitterThread.setDaemon(true); - emitterThread.start(); - - // process stream elements from state, since the Emit thread will start as soon as all - // elements from previous state are in the StreamElementQueue, we have to make sure that the - // order to open all operators in the operator chain proceeds from the tail operator to the - // head operator. + // Process stream elements from state. We have to make sure that the order to open all operators in the + // operator chain proceeds from the tail operator to the head operator. Review comment: I think now the comment sounds more general (not specific to `AsyncWaitOperator`) and good explanation why `open()` is handled from tail to head. Maybe just remove this whole comment? And add more description in `StreamTask::openAllOperators()` method, that explains `open()` call order saying that `StreamOperator::open()` can already emit records and basically requires that downstream operators are already prepared (this addition can be a separate commit). WDYT? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services