AHeise commented on a change in pull request #10435: [FLINK-13955][runtime] 
migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r370037602
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
 ##########
 @@ -299,34 +198,76 @@ public void open(FileInputSplit fileSplit) throws 
IOException {
                }
 
                @Override
-               public void reopen(FileInputSplit split, Integer state) throws 
IOException {
+               public void reopen(FileInputSplit split, Integer state) {
                        this.split = split;
                        this.state = state;
                }
 
                @Override
-               public Integer getCurrentState() throws IOException {
+               public Integer getCurrentState() {
                        return state;
                }
 
                @Override
-               public boolean reachedEnd() throws IOException {
+               public boolean reachedEnd() {
                        if (state == elementsBeforeCheckpoint) {
-                               triggerLatch.trigger();
-                               if (!waitingLatch.isTriggered()) {
-                                       try {
-                                               waitingLatch.await();
-                                       } catch (InterruptedException e) {
-                                               e.printStackTrace();
-                                       }
+                               firstChunkTrigger.trigger();
+                               try {
+                                       continueLatch.await();
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
+                                       Thread.currentThread().interrupt();
                                }
                        }
-                       return state == linesPerSplit;
+                       boolean ended = state == linesPerSplit;
+                       if (ended) {
+                               endTrigger.trigger();
+                       }
+                       return ended;
                }
 
                @Override
-               public String nextRecord(String reuse) throws IOException {
+               public String nextRecord(String reuse) {
                        return reachedEnd() ? null : split.getSplitNumber() + 
": test line " + state++;
                }
+
+               public void awaitFirstChunkProcessed() throws 
InterruptedException {
+                       firstChunkTrigger.await();
+               }
+
+               public void awaitLastProcessed() throws InterruptedException {
+                       endTrigger.await();
+               }
+
+               public void resume() {
+                       continueLatch.trigger();
+               }
+       }
+
+       private static final class HarnessWithFormat extends 
Tuple2<OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String>, 
BlockingFileInputFormat> {
+               public 
HarnessWithFormat(OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> harness, BlockingFileInputFormat format) {
+                       super(harness, format);
+               }
+
+               public 
OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> 
getHarness() {
+                       return f0;
+               }
+
+               public BlockingFileInputFormat getFormat() {
+                       return f1;
+               }
+
+               public void awaitEverythingProcessed() throws Exception {
+                       getFormat().awaitFirstChunkProcessed();
+                       getFormat().resume();
+                       getFormat().awaitLastProcessed();
+               }
+       }
+
+       private List<Object> collectOutput(HarnessWithFormat... in) {
+               return Stream.of(in)
+                               .flatMap(i -> 
i.getHarness().getOutput().stream())
+                               .filter(o -> !(o instanceof Watermark))
 
 Review comment:
   nit: o -> output/element

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to