reswqa commented on code in PR #23100: URL: https://github.com/apache/flink/pull/23100#discussion_r1283940833
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); - boolean isExist; + boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); - } catch (IOException e) { - throw new RuntimeException("Failed to check segment file. " + segmentPath, e); + currentRetryTime = 0; + } catch (Throwable t) { + currentRetryTime++; + throwException(t, "Failed to check the status of segment file."); Review Comment: `throwException` imply that this will always throw exception, but it does not. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); - boolean isExist; + boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); - } catch (IOException e) { - throw new RuntimeException("Failed to check segment file. " + segmentPath, e); + currentRetryTime = 0; + } catch (Throwable t) { + currentRetryTime++; + throwException(t, "Failed to check the status of segment file."); } return isExist; } + private void throwException(Throwable t, String logMessage) { + LOG.error(logMessage); Review Comment: This can easily confuse people. Some users monitor the system by scanning the logs for ERROR (might sending alerts). IMO, It can only be WARNING at most. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -109,7 +118,11 @@ private FileSystem createFileSystem() { /** Start the executor. */ public void start() { - scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS); + synchronized (scannerExecutor) { Review Comment: Do we really need synchronized lock? IIUC, a volatile variable is enough? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); - boolean isExist; + boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); - } catch (IOException e) { - throw new RuntimeException("Failed to check segment file. " + segmentPath, e); + currentRetryTime = 0; + } catch (Throwable t) { Review Comment: I'm not sure if catch any throwable including un-recoverable error is safe enough. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -144,7 +157,9 @@ public void watchSegment( /** Close the executor. */ public void close() { - scannerExecutor.shutdownNow(); + synchronized (scannerExecutor) { + scannerExecutor.shutdownNow(); Review Comment: I'd suggestion waiting for all pending task become finished when close this scanner. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ########## @@ -207,20 +222,23 @@ private void scanMaxSegmentId( Path segmentFinishDir = getSegmentFinishDirPath( baseRemoteStoragePath, partitionId, subpartitionId.getSubpartitionId()); - FileStatus[] fileStatuses; + FileStatus[] fileStatuses = new FileStatus[0]; try { if (!remoteFileSystem.exists(segmentFinishDir)) { return; } fileStatuses = remoteFileSystem.listStatus(segmentFinishDir); - } catch (IOException e) { - throw new RuntimeException( - "Failed to list the segment finish file. " + segmentFinishDir, e); + currentRetryTime = 0; + } catch (Throwable t) { + if (t instanceof java.io.FileNotFoundException) { + return; + } + currentRetryTime++; + throwException(t, "Failed to list the segment finish file."); } - if (fileStatuses.length == 0) { + if (fileStatuses.length != 1) { Review Comment: Will there be multiple finish files in this directory at the same time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org