guoweiM commented on a change in pull request #6613:
URL: https://github.com/apache/flink/pull/6613#discussion_r460665955



##########
File path: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
##########
@@ -1054,11 +1095,22 @@ private static int getLineNo(String line) {
        }
 
        /**
-        * Create continuous monitoring function with 1 reader-parallelism and 
interval: {@link #INTERVAL}.
+        * Create continuous monitoring function with 1 reader-parallelism, 
interval {@link #INTERVAL}
+        * and read_consistency_offset_interval {@link 
#READ_CONSISTENCY_OFFSET_INTERVAL}.
         */
        private <OUT> ContinuousFileMonitoringFunction<OUT> 
createTestContinuousFileMonitoringFunction(FileInputFormat<OUT> format, 
FileProcessingMode fileProcessingMode) {
                ContinuousFileMonitoringFunction<OUT> monitoringFunction =
-                       new ContinuousFileMonitoringFunction<>(format, 
fileProcessingMode, 1, INTERVAL);
+                       new ContinuousFileMonitoringFunction<>(format, 
fileProcessingMode, 1, INTERVAL, READ_CONSISTENCY_OFFSET_INTERVAL);
+               
monitoringFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class));

Review comment:
       It is best not to rely on mockito when writing tests.  You could follow 
the coding guide 
https://flink.apache.org/contributing/code-style-and-quality-common.html

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
        /** Which new data to process (see {@link FileProcessingMode}. */
        private final FileProcessingMode watchType;
 
-       /** The maximum file modification time seen so far. */
+       /** The offset interval back from the latest file modification 
timestamp to scan for our-of-order files.
+        *  Valid value for this is from 0 to Long.MAX_VALUE.
+        *
+        *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + 
readConsistencyOffset) will NOT be read.

Review comment:
       Do you mean modTime  < maxProcessedTime - readConsistencyOffset?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
        /** Which new data to process (see {@link FileProcessingMode}. */
        private final FileProcessingMode watchType;
 
-       /** The maximum file modification time seen so far. */
+       /** The offset interval back from the latest file modification 
timestamp to scan for our-of-order files.

Review comment:
       out-of-order

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
        /** Which new data to process (see {@link FileProcessingMode}. */
        private final FileProcessingMode watchType;
 
-       /** The maximum file modification time seen so far. */
+       /** The offset interval back from the latest file modification 
timestamp to scan for our-of-order files.
+        *  Valid value for this is from 0 to Long.MAX_VALUE.
+        *
+        *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + 
readConsistencyOffset) will NOT be read.
+        */
+       private final long readConsistencyOffset;
+
+       /** The current modification time watermark. */
        private volatile long globalModificationTime = Long.MIN_VALUE;
 
+       /** The maximum file modification time seen so far. */
+       private volatile long maxProcessedTime = Long.MIN_VALUE;

Review comment:
       Construction already initializes the value.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -172,6 +236,26 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
                                        LOG.debug("{} retrieved a global mod 
time of {}.",
                                                getClass().getSimpleName(), 
globalModificationTime);
                                }
+                               if (retrievedStates2.size() == 1 && 
processedFiles.size() != 0) {

Review comment:
       Maybe I miss something but I want to know why we need this check? 
   Currently, I find that the `processedFiles.size()` always is 0 when 
`initializeState` is called.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -248,7 +332,19 @@ private void monitorDirAndForwardSplits(FileSystem fs,
                                context.collect(split);
                        }
                        // update the global modification time
-                       globalModificationTime = 
Math.max(globalModificationTime, modificationTime);
+                       maxProcessedTime = Math.max(maxProcessedTime, 
modificationTime);

Review comment:
       I think this line might not be consistent with the up comments

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -248,7 +332,19 @@ private void monitorDirAndForwardSplits(FileSystem fs,
                                context.collect(split);
                        }
                        // update the global modification time
-                       globalModificationTime = 
Math.max(globalModificationTime, modificationTime);
+                       maxProcessedTime = Math.max(maxProcessedTime, 
modificationTime);
+               }
+               // Populate processed files.
+               // This check is to ensure that globalModificationTime will not 
go backward
+               // even if readConsistencyOffset is changed to a large value 
after a restore from checkpoint,
+               // so  files would be processed twice
+               globalModificationTime = Math.max(maxProcessedTime - 
readConsistencyOffset, globalModificationTime);

Review comment:
       Maybe we could remove this logic if we do not use the 
globalModificationTime.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
        /** Which new data to process (see {@link FileProcessingMode}. */
        private final FileProcessingMode watchType;
 
-       /** The maximum file modification time seen so far. */
+       /** The offset interval back from the latest file modification 
timestamp to scan for our-of-order files.
+        *  Valid value for this is from 0 to Long.MAX_VALUE.
+        *
+        *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + 
readConsistencyOffset) will NOT be read.
+        */
+       private final long readConsistencyOffset;
+
+       /** The current modification time watermark. */
        private volatile long globalModificationTime = Long.MIN_VALUE;
 
+       /** The maximum file modification time seen so far. */
+       private volatile long maxProcessedTime = Long.MIN_VALUE;
+
+       /** The list of processed files having modification time within the 
period from globalModificationTime
+        *  to maxProcessedTime in the form of a Map&lt;filePath, 
lastModificationTime&gt;. */
+       private volatile Map<String, Long> processedFiles;
+
        private transient Object checkpointLock;
 
        private volatile boolean isRunning = true;
 
        private transient ListState<Long> checkpointedState;

Review comment:
       Do we still need this state in the current implementation?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -93,20 +102,45 @@
        /** Which new data to process (see {@link FileProcessingMode}. */
        private final FileProcessingMode watchType;
 
-       /** The maximum file modification time seen so far. */
+       /** The offset interval back from the latest file modification 
timestamp to scan for our-of-order files.
+        *  Valid value for this is from 0 to Long.MAX_VALUE.
+        *
+        *  <p><b>NOTE: </b>: Files with (modTime > Long.MIN_VALUE + 
readConsistencyOffset) will NOT be read.
+        */
+       private final long readConsistencyOffset;
+
+       /** The current modification time watermark. */
        private volatile long globalModificationTime = Long.MIN_VALUE;

Review comment:
        Is it possible that we could not depend on this variable? And I think 
this would reduce the complexity of restoring logical.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
##########
@@ -376,9 +479,12 @@ public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
 
                this.checkpointedState.clear();
                this.checkpointedState.add(this.globalModificationTime);

Review comment:
       see above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to