This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e07fdea9af560cde7602c64f197b797deb59b76e
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Jun 23 17:16:17 2021 +0200

    [hotfix][datastream] Remove raw casts in ContinuousFileReaderOperator.
---
 .../source/ContinuousFileReaderOperator.java       | 29 ++++++++++++----------
 1 file changed, 16 insertions(+), 13 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 1fd7054b..f6149e9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -106,20 +106,20 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
     private enum ReaderState {
         IDLE {
             @Override
-            public boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) {
+            public <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
+                    ContinuousFileReaderOperator<?, T> op) throws IOException {
                 throw new IllegalStateException("not processing any records in 
IDLE state");
             }
         },
         /** A message is enqueued to process split, but no split is opened. */
         OPENING { // the split was added and message to itself was enqueued to 
process it
-            @Override
-            public boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op)
-                    throws IOException {
+            public <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
+                    ContinuousFileReaderOperator<?, T> op) throws IOException {
                 if (op.splits.isEmpty()) {
                     op.switchState(ReaderState.IDLE);
                     return false;
                 } else {
-                    ((ContinuousFileReaderOperator) 
op).loadSplit(op.splits.poll());
+                    op.loadSplit(op.splits.poll());
                     op.switchState(ReaderState.READING);
                     return true;
                 }
@@ -128,7 +128,8 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
         /** A message is enqueued to process split and its processing was 
started. */
         READING {
             @Override
-            public boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) {
+            public <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
+                    ContinuousFileReaderOperator<?, T> op) throws IOException {
                 return true;
             }
 
@@ -143,7 +144,8 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
          */
         FAILED {
             @Override
-            public boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) {
+            public <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
+                    ContinuousFileReaderOperator<?, T> op) throws IOException {
                 throw new IllegalStateException("not processing any records in 
ERRORED state");
             }
         },
@@ -153,10 +155,10 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
          */
         CLOSING {
             @Override
-            public boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op)
-                    throws IOException {
+            public <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
+                    ContinuousFileReaderOperator<?, T> op) throws IOException {
                 if (op.currentSplit == null && !op.splits.isEmpty()) {
-                    ((ContinuousFileReaderOperator) 
op).loadSplit(op.splits.poll());
+                    op.loadSplit(op.splits.poll());
                 }
                 return true;
             }
@@ -171,7 +173,8 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
         },
         CLOSED {
             @Override
-            public boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op) {
+            public <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
+                    ContinuousFileReaderOperator<?, T> op) {
                 LOG.warn("not processing any records while closed");
                 return false;
             }
@@ -211,8 +214,8 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
          *
          * @return true if should read the record
          */
-        public abstract boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?, ?> op)
-                throws IOException;
+        public abstract <T extends TimestampedInputSplit> boolean 
prepareToProcessRecord(
+                ContinuousFileReaderOperator<?, T> op) throws IOException;
 
         public void onNoMoreData(ContinuousFileReaderOperator<?, ?> op) {}
     }

Reply via email to