Jiabao-Sun commented on code in PR #23770:
URL: https://github.com/apache/flink/pull/23770#discussion_r1402565436


##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java:
##########
@@ -406,46 +328,28 @@ void testCancelCoGroupTaskWhileCoGrouping() {
         
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
         getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
 
-        final CoGroupDriver<Record, Record, Record> testTask =
-                new CoGroupDriver<Record, Record, Record>();
+        final CoGroupDriver<Record, Record, Record> testTask = new 
CoGroupDriver<>();
 
-        try {
-            addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-            addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("The test caused an exception.");
-        }
+        addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+        addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
 
-        final AtomicBoolean success = new AtomicBoolean(false);
+        final OneShotLatch closeLatch = new OneShotLatch();
 
-        Thread taskRunner =
-                new Thread() {
+        CheckedThread taskRunner =
+                new CheckedThread() {
                     @Override
-                    public void run() {
-                        try {
-                            testDriver(testTask, 
MockDelayingCoGroupStub.class);
-                            success.set(true);
-                        } catch (Exception ie) {
-                            ie.printStackTrace();
-                        }
+                    public void go() throws Exception {
+                        testDriver(testTask, new 
MockDelayingCoGroupStub(closeLatch));

Review Comment:
   Thanks @XComp, it makes sense to me.
   
   In the old implementation, the "Thread.sleep" method is used to prevent the 
`coGroup` operation from ending too early. Therefore, it is highly likely that 
the "cancel" method is called during the execution of the `coGroup` operation. 
To maintain consistency with the original scenario, I have adjusted the 
`OneShotLatch` inside the `coGroup` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to