AHeise commented on a change in pull request #10435: [FLINK-13955][runtime]
migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r370037602
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
##########
@@ -299,34 +198,76 @@ public void open(FileInputSplit fileSplit) throws
IOException {
}
@Override
- public void reopen(FileInputSplit split, Integer state) throws
IOException {
+ public void reopen(FileInputSplit split, Integer state) {
this.split = split;
this.state = state;
}
@Override
- public Integer getCurrentState() throws IOException {
+ public Integer getCurrentState() {
return state;
}
@Override
- public boolean reachedEnd() throws IOException {
+ public boolean reachedEnd() {
if (state == elementsBeforeCheckpoint) {
- triggerLatch.trigger();
- if (!waitingLatch.isTriggered()) {
- try {
- waitingLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ firstChunkTrigger.trigger();
+ try {
+ continueLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
}
}
- return state == linesPerSplit;
+ boolean ended = state == linesPerSplit;
+ if (ended) {
+ endTrigger.trigger();
+ }
+ return ended;
}
@Override
- public String nextRecord(String reuse) throws IOException {
+ public String nextRecord(String reuse) {
return reachedEnd() ? null : split.getSplitNumber() +
": test line " + state++;
}
+
+ public void awaitFirstChunkProcessed() throws
InterruptedException {
+ firstChunkTrigger.await();
+ }
+
+ public void awaitLastProcessed() throws InterruptedException {
+ endTrigger.await();
+ }
+
+ public void resume() {
+ continueLatch.trigger();
+ }
+ }
+
+ private static final class HarnessWithFormat extends
Tuple2<OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String>,
BlockingFileInputFormat> {
+ public
HarnessWithFormat(OneInputStreamOperatorTestHarness<TimestampedFileInputSplit,
String> harness, BlockingFileInputFormat format) {
+ super(harness, format);
+ }
+
+ public
OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String>
getHarness() {
+ return f0;
+ }
+
+ public BlockingFileInputFormat getFormat() {
+ return f1;
+ }
+
+ public void awaitEverythingProcessed() throws Exception {
+ getFormat().awaitFirstChunkProcessed();
+ getFormat().resume();
+ getFormat().awaitLastProcessed();
+ }
+ }
+
+ private List<Object> collectOutput(HarnessWithFormat... in) {
+ return Stream.of(in)
+ .flatMap(i ->
i.getHarness().getOutput().stream())
+ .filter(o -> !(o instanceof Watermark))
Review comment:
nit: o -> output/element
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services