reswqa commented on code in PR #23100:
URL: https://github.com/apache/flink/pull/23100#discussion_r1283940833


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
                         partitionId,
                         subpartitionId.getSubpartitionId(),
                         segmentId);
-        boolean isExist;
+        boolean isExist = false;
         try {
             isExist = remoteFileSystem.exists(segmentPath);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+            currentRetryTime = 0;
+        } catch (Throwable t) {
+            currentRetryTime++;
+            throwException(t, "Failed to check the status of segment file.");

Review Comment:
   `throwException` imply that this will always throw exception, but it does 
not.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
                         partitionId,
                         subpartitionId.getSubpartitionId(),
                         segmentId);
-        boolean isExist;
+        boolean isExist = false;
         try {
             isExist = remoteFileSystem.exists(segmentPath);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+            currentRetryTime = 0;
+        } catch (Throwable t) {
+            currentRetryTime++;
+            throwException(t, "Failed to check the status of segment file.");
         }
         return isExist;
     }
 
+    private void throwException(Throwable t, String logMessage) {
+        LOG.error(logMessage);

Review Comment:
   This can easily confuse people. Some users monitor the system by scanning 
the logs for ERROR (might sending alerts). IMO, It can only be WARNING at most.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -109,7 +118,11 @@ private FileSystem createFileSystem() {
 
     /** Start the executor. */
     public void start() {
-        scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS);
+        synchronized (scannerExecutor) {

Review Comment:
   Do we really need synchronized lock? IIUC, a volatile variable is enough?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
                         partitionId,
                         subpartitionId.getSubpartitionId(),
                         segmentId);
-        boolean isExist;
+        boolean isExist = false;
         try {
             isExist = remoteFileSystem.exists(segmentPath);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+            currentRetryTime = 0;
+        } catch (Throwable t) {

Review Comment:
   I'm not sure if catch any throwable including un-recoverable error is safe 
enough.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -144,7 +157,9 @@ public void watchSegment(
 
     /** Close the executor. */
     public void close() {
-        scannerExecutor.shutdownNow();
+        synchronized (scannerExecutor) {
+            scannerExecutor.shutdownNow();

Review Comment:
   I'd suggestion waiting for all pending task become finished when close this 
scanner.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##########
@@ -207,20 +222,23 @@ private void scanMaxSegmentId(
         Path segmentFinishDir =
                 getSegmentFinishDirPath(
                         baseRemoteStoragePath, partitionId, 
subpartitionId.getSubpartitionId());
-        FileStatus[] fileStatuses;
+        FileStatus[] fileStatuses = new FileStatus[0];
         try {
             if (!remoteFileSystem.exists(segmentFinishDir)) {
                 return;
             }
             fileStatuses = remoteFileSystem.listStatus(segmentFinishDir);
-        } catch (IOException e) {
-            throw new RuntimeException(
-                    "Failed to list the segment finish file. " + 
segmentFinishDir, e);
+            currentRetryTime = 0;
+        } catch (Throwable t) {
+            if (t instanceof java.io.FileNotFoundException) {
+                return;
+            }
+            currentRetryTime++;
+            throwException(t, "Failed to list the segment finish file.");
         }
-        if (fileStatuses.length == 0) {
+        if (fileStatuses.length != 1) {

Review Comment:
   Will there be multiple finish files in this directory at the same time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to