1u0 commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328063741
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##########
 @@ -150,43 +139,26 @@ public void setup(StreamTask<?, ?> containingTask, 
StreamConfig config, Output<S
                this.inStreamElementSerializer = new StreamElementSerializer<>(
                        
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
 
-               // create the operators executor for the complete operations of 
the queue entries
-               this.executor = Executors.newSingleThreadExecutor();
-
                switch (outputMode) {
                        case ORDERED:
-                               queue = new OrderedStreamElementQueue(
-                                       capacity,
-                                       executor,
-                                       this);
+                               queue = new 
OrderedStreamElementQueue<>(capacity);
                                break;
                        case UNORDERED:
-                               queue = new UnorderedStreamElementQueue(
-                                       capacity,
-                                       executor,
-                                       this);
+                               queue = new 
UnorderedStreamElementQueue<>(capacity);
                                break;
                        default:
                                throw new IllegalStateException("Unknown async 
mode: " + outputMode + '.');
                }
+
+               this.timestampedCollector = new TimestampedCollector<>(output);
        }
 
        @Override
        public void open() throws Exception {
                super.open();
 
-               // create the emitter
-               this.emitter = new Emitter<>(checkpointingLock, 
mailboxExecutor, output, queue, this);
-
-               // start the emitter thread
-               this.emitterThread = new Thread(emitter, 
"AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
-               emitterThread.setDaemon(true);
-               emitterThread.start();
-
-               // process stream elements from state, since the Emit thread 
will start as soon as all
-               // elements from previous state are in the StreamElementQueue, 
we have to make sure that the
-               // order to open all operators in the operator chain proceeds 
from the tail operator to the
-               // head operator.
+               // Process stream elements from state. We have to make sure 
that the order to open all operators in the
+               // operator chain proceeds from the tail operator to the head 
operator.
 
 Review comment:
   I think now the comment sounds more general (not specific to 
`AsyncWaitOperator`) and good explanation why `open()` is handled from tail to 
head.
   Maybe just remove this whole comment? And add more description in 
`StreamTask::openAllOperators()` method, that explains `open()` call order 
saying that `StreamOperator::open()` can already emit records and basically 
requires that downstream operators are already prepared (this addition can be a 
separate commit).
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to