AHeise commented on a change in pull request #10435: [FLINK-13955][runtime] 
migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r369995184
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
 ##########
 @@ -299,34 +198,76 @@ public void open(FileInputSplit fileSplit) throws 
IOException {
                }
 
                @Override
-               public void reopen(FileInputSplit split, Integer state) throws 
IOException {
+               public void reopen(FileInputSplit split, Integer state) {
                        this.split = split;
                        this.state = state;
                }
 
                @Override
-               public Integer getCurrentState() throws IOException {
+               public Integer getCurrentState() {
                        return state;
                }
 
                @Override
-               public boolean reachedEnd() throws IOException {
+               public boolean reachedEnd() {
                        if (state == elementsBeforeCheckpoint) {
-                               triggerLatch.trigger();
-                               if (!waitingLatch.isTriggered()) {
-                                       try {
-                                               waitingLatch.await();
-                                       } catch (InterruptedException e) {
-                                               e.printStackTrace();
-                                       }
+                               firstChunkTrigger.trigger();
+                               try {
+                                       continueLatch.await();
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
+                                       Thread.currentThread().interrupt();
                                }
                        }
-                       return state == linesPerSplit;
+                       boolean ended = state == linesPerSplit;
+                       if (ended) {
+                               endTrigger.trigger();
+                       }
+                       return ended;
                }
 
                @Override
-               public String nextRecord(String reuse) throws IOException {
+               public String nextRecord(String reuse) {
                        return reachedEnd() ? null : split.getSplitNumber() + 
": test line " + state++;
                }
+
+               public void awaitFirstChunkProcessed() throws 
InterruptedException {
+                       firstChunkTrigger.await();
+               }
+
+               public void awaitLastProcessed() throws InterruptedException {
+                       endTrigger.await();
+               }
+
+               public void resume() {
+                       continueLatch.trigger();
+               }
+       }
+
+       private static final class HarnessWithFormat extends 
Tuple2<OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String>, 
BlockingFileInputFormat> {
 
 Review comment:
   Is there a reason to extend Tuple2 instead of using an immutable class with 
two named fields?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to