This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e07fdea9af560cde7602c64f197b797deb59b76e Author: Arvid Heise <[email protected]> AuthorDate: Wed Jun 23 17:16:17 2021 +0200 [hotfix][datastream] Remove raw casts in ContinuousFileReaderOperator. --- .../source/ContinuousFileReaderOperator.java | 29 ++++++++++++---------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 1fd7054b..f6149e9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -106,20 +106,20 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> private enum ReaderState { IDLE { @Override - public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) { + public <T extends TimestampedInputSplit> boolean prepareToProcessRecord( + ContinuousFileReaderOperator<?, T> op) throws IOException { throw new IllegalStateException("not processing any records in IDLE state"); } }, /** A message is enqueued to process split, but no split is opened. */ OPENING { // the split was added and message to itself was enqueued to process it - @Override - public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) - throws IOException { + public <T extends TimestampedInputSplit> boolean prepareToProcessRecord( + ContinuousFileReaderOperator<?, T> op) throws IOException { if (op.splits.isEmpty()) { op.switchState(ReaderState.IDLE); return false; } else { - ((ContinuousFileReaderOperator) op).loadSplit(op.splits.poll()); + op.loadSplit(op.splits.poll()); op.switchState(ReaderState.READING); return true; } @@ -128,7 +128,8 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> /** A message is enqueued to process split and its processing was started. */ READING { @Override - public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) { + public <T extends TimestampedInputSplit> boolean prepareToProcessRecord( + ContinuousFileReaderOperator<?, T> op) throws IOException { return true; } @@ -143,7 +144,8 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> */ FAILED { @Override - public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) { + public <T extends TimestampedInputSplit> boolean prepareToProcessRecord( + ContinuousFileReaderOperator<?, T> op) throws IOException { throw new IllegalStateException("not processing any records in ERRORED state"); } }, @@ -153,10 +155,10 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> */ CLOSING { @Override - public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) - throws IOException { + public <T extends TimestampedInputSplit> boolean prepareToProcessRecord( + ContinuousFileReaderOperator<?, T> op) throws IOException { if (op.currentSplit == null && !op.splits.isEmpty()) { - ((ContinuousFileReaderOperator) op).loadSplit(op.splits.poll()); + op.loadSplit(op.splits.poll()); } return true; } @@ -171,7 +173,8 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> }, CLOSED { @Override - public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) { + public <T extends TimestampedInputSplit> boolean prepareToProcessRecord( + ContinuousFileReaderOperator<?, T> op) { LOG.warn("not processing any records while closed"); return false; } @@ -211,8 +214,8 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> * * @return true if should read the record */ - public abstract boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) - throws IOException; + public abstract <T extends TimestampedInputSplit> boolean prepareToProcessRecord( + ContinuousFileReaderOperator<?, T> op) throws IOException; public void onNoMoreData(ContinuousFileReaderOperator<?, ?> op) {} }
