rkhachatryan commented on a change in pull request #10542: 
[FLINK-12484][runtime] move checkpoint lock to source task
URL: https://github.com/apache/flink/pull/10542#discussion_r377010511
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##########
 @@ -706,47 +680,6 @@ private void testAsyncTimeout(
                }
        }
 
-       /**
-        * Test case for FLINK-5638: Tests that the async wait operator can be 
closed even if the
-        * emitter is currently waiting on the checkpoint lock (e.g. in the 
case of two chained async
-        * wait operators where the latter operator's queue is currently full).
-        *
-        * <p>Note that this test does not enforce the exact strict ordering 
because with the fix it is no
-        * longer possible. However, it provokes the described situation 
without the fix.
-        */
-       @Test
-       public void testClosingWithBlockedEmitter() throws Exception {
-
-               JobVertex chainedVertex = createChainedVertex(new 
MyAsyncFunction(), new EmitterBlockingFunction());
-
-               final OneInputStreamTaskTestHarness<Integer, Integer> 
testHarness = new OneInputStreamTaskTestHarness<>(
-                               OneInputStreamTask::new,
-                               1, 1,
-                               BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
-               testHarness.setupOutputForSingletonOperatorChain();
-
-               testHarness.taskConfig = chainedVertex.getConfiguration();
-
-               final StreamConfig streamConfig = testHarness.getStreamConfig();
-               final StreamConfig operatorChainStreamConfig = new 
StreamConfig(chainedVertex.getConfiguration());
-               streamConfig.setStreamOperatorFactory(
-                               
operatorChainStreamConfig.getStreamOperatorFactory(AsyncWaitOperatorTest.class.getClassLoader()));
-
-               testHarness.invoke();
-               testHarness.waitForTaskRunning();
-               Object checkpointLock = 
testHarness.getTask().getCheckpointLock();
-               EmitterBlockingFunction.setLock(checkpointLock);
-
-               testHarness.processElement(new StreamRecord<>(42, 1L));
-
-               EmitterBlockingFunction.outputLatch.await();
-               testHarness.endInput();
-               EmitterBlockingFunction.closingLatch.trigger();
-               testHarness.waitForTaskCompletion();
-
-               assertEquals(emptyList(), new 
ArrayList<>(testHarness.getOutput()));
-       }
 
 Review comment:
   During the review of another PR it turned that this test only worked because 
the default timeout in `AsyncWaitOperator` was less than the test timeout.
   
   I don't think we have a separate test for closing `AsyncWaitOperator`, but 
it could (should) be tested simpler. Not sure though if it's in the scope of 
this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to