yzeng1618 commented on code in PR #10268:
URL: https://github.com/apache/seatunnel/pull/10268#discussion_r2661542041


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java:
##########
@@ -470,10 +520,304 @@ protected static InputStream safeSlice(InputStream in, 
long start, long length)
     @Override
     public void close() throws IOException {
         try {
+            if (targetHadoopFileSystemProxy != null && 
!shareTargetFileSystemProxy) {
+                targetHadoopFileSystemProxy.close();
+            }
             if (hadoopFileSystemProxy != null) {
                 hadoopFileSystemProxy.close();
             }
         } catch (Exception ignore) {
         }
     }
+
+    private void validateUpdateSyncConfig(Config pluginConfig) {
+        if 
(!pluginConfig.hasPath(FileBaseSourceOptions.FILE_FORMAT_TYPE.key())) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "When sync_mode=update, file_format_type must be set.");
+        }
+        FileFormat fileFormat =
+                FileFormat.valueOf(
+                        pluginConfig
+                                
.getString(FileBaseSourceOptions.FILE_FORMAT_TYPE.key())
+                                .toUpperCase());
+        if (fileFormat != FileFormat.BINARY) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "sync_mode=update currently only supports 
file_format_type=binary.");
+        }
+
+        if (!pluginConfig.hasPath(FileBaseSourceOptions.TARGET_PATH.key())
+                || StringUtils.isBlank(
+                        
pluginConfig.getString(FileBaseSourceOptions.TARGET_PATH.key()))) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "When sync_mode=update, target_path must be set.");
+        }
+        targetPath = 
pluginConfig.getString(FileBaseSourceOptions.TARGET_PATH.key()).trim();
+
+        updateStrategy = FileBaseSourceOptions.UPDATE_STRATEGY.defaultValue();
+        if (pluginConfig.hasPath(FileBaseSourceOptions.UPDATE_STRATEGY.key())) 
{
+            updateStrategy =
+                    parseEnumValue(
+                            FileUpdateStrategy.class,
+                            
pluginConfig.getString(FileBaseSourceOptions.UPDATE_STRATEGY.key()),
+                            FileBaseSourceOptions.UPDATE_STRATEGY.key());
+        }
+
+        compareMode = FileBaseSourceOptions.COMPARE_MODE.defaultValue();
+        if (pluginConfig.hasPath(FileBaseSourceOptions.COMPARE_MODE.key())) {
+            compareMode =
+                    parseEnumValue(
+                            FileCompareMode.class,
+                            
pluginConfig.getString(FileBaseSourceOptions.COMPARE_MODE.key()),
+                            FileBaseSourceOptions.COMPARE_MODE.key());
+        }
+        if (updateStrategy == FileUpdateStrategy.DISTCP
+                && compareMode != FileCompareMode.LEN_MTIME) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "compare_mode="
+                            + compareMode.name().toLowerCase(Locale.ROOT)
+                            + " is not supported when 
update_strategy=distcp.");
+        }
+
+        if 
(pluginConfig.hasPath(FileBaseSourceOptions.TARGET_HADOOP_CONF.key())) {
+            ConfigObject configObject =
+                    
pluginConfig.getObject(FileBaseSourceOptions.TARGET_HADOOP_CONF.key());
+            Map<String, Object> raw = configObject.unwrapped();
+            Map<String, String> conf = new LinkedHashMap<>(raw.size());
+            raw.forEach((k, v) -> conf.put(k, v == null ? null : 
String.valueOf(v)));
+            targetHadoopConf = conf;
+        }
+    }
+
+    private void initTargetHadoopFileSystemProxy() {
+        HadoopConf targetConf = buildTargetHadoopConf();
+        if (targetConf == this.hadoopConf) {
+            targetHadoopFileSystemProxy = this.hadoopFileSystemProxy;
+            shareTargetFileSystemProxy = true;
+        } else {
+            targetHadoopFileSystemProxy = new 
HadoopFileSystemProxy(targetConf);
+            shareTargetFileSystemProxy = false;
+        }
+    }
+
+    private HadoopConf buildTargetHadoopConf() {
+        if (!enableUpdateSync) {
+            return this.hadoopConf;
+        }
+        Map<String, String> extraOptions =
+                targetHadoopConf == null
+                        ? new LinkedHashMap<>()
+                        : new LinkedHashMap<>(targetHadoopConf);
+
+        String fsDefaultNameKey = hadoopConf.getFsDefaultNameKey();
+        String targetDefaultFs = extraOptions.remove(fsDefaultNameKey);
+
+        if (StringUtils.isBlank(targetDefaultFs)) {
+            targetDefaultFs = tryDeriveDefaultFsFromPath(targetPath);
+        }
+        if (StringUtils.isBlank(targetDefaultFs)) {
+            targetDefaultFs = hadoopConf.getHdfsNameKey();
+        }
+
+        boolean needNewConf =
+                !extraOptions.isEmpty()
+                        || !Objects.equals(targetDefaultFs, 
hadoopConf.getHdfsNameKey());
+        if (!needNewConf) {
+            return this.hadoopConf;
+        }
+
+        HadoopConf conf = new HadoopConf(targetDefaultFs);
+        conf.setHdfsSitePath(hadoopConf.getHdfsSitePath());
+        conf.setRemoteUser(hadoopConf.getRemoteUser());
+        conf.setKrb5Path(hadoopConf.getKrb5Path());
+        conf.setKerberosPrincipal(hadoopConf.getKerberosPrincipal());
+        conf.setKerberosKeytabPath(hadoopConf.getKerberosKeytabPath());
+        conf.setExtraOptions(extraOptions);
+        return conf;
+    }
+
+    private static String tryDeriveDefaultFsFromPath(String basePath) {
+        if (StringUtils.isBlank(basePath)) {
+            return null;
+        }
+        try {
+            Path path = new Path(basePath);
+            if (path.toUri().getScheme() == null) {
+                return null;
+            }
+            if (path.toUri().getAuthority() == null) {
+                return null;
+            }
+            return path.toUri().getScheme() + "://" + 
path.toUri().getAuthority();
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private boolean shouldSyncFileInUpdateMode(FileStatus sourceFileStatus) 
throws IOException {
+        if (!enableUpdateSync) {
+            return true;
+        }
+        if (targetHadoopFileSystemProxy == null) {
+            initTargetHadoopFileSystemProxy();
+        }
+        String sourceFilePath = sourceFileStatus.getPath().toString();
+        String relativePath = resolveRelativePath(sourceRootPath, 
sourceFilePath);
+        String targetFilePath = buildTargetFilePath(targetPath, relativePath);
+
+        FileStatus targetFileStatus;
+        try {
+            targetFileStatus = 
targetHadoopFileSystemProxy.getFileStatus(targetFilePath);
+        } catch (FileNotFoundException e) {
+            return true;
+        }
+
+        long sourceLen = sourceFileStatus.getLen();
+        long targetLen = targetFileStatus.getLen();
+        if (sourceLen != targetLen) {
+            return true;
+        }
+
+        long sourceMtime = sourceFileStatus.getModificationTime();
+        long targetMtime = targetFileStatus.getModificationTime();
+
+        if (updateStrategy == FileUpdateStrategy.DISTCP) {
+            return sourceMtime > targetMtime;
+        }
+
+        if (updateStrategy == FileUpdateStrategy.STRICT) {
+            if (compareMode == FileCompareMode.LEN_MTIME) {
+                return sourceMtime != targetMtime;
+            }
+            if (compareMode == FileCompareMode.CHECKSUM) {
+                FileChecksum sourceChecksum = 
hadoopFileSystemProxy.getFileChecksum(sourceFilePath);
+                FileChecksum targetChecksum =
+                        
targetHadoopFileSystemProxy.getFileChecksum(targetFilePath);
+                if (sourceChecksum == null || targetChecksum == null) {
+                    if (!checksumUnavailableWarned) {
+                        log.warn(
+                                "File checksum is not available, fallback to 
content comparison. source={}, target={}",
+                                sourceFilePath,
+                                targetFilePath);
+                        checksumUnavailableWarned = true;
+                    }
+                    try {
+                        return !fileContentEquals(sourceFilePath, 
targetFilePath);
+                    } catch (Exception e) {
+                        log.warn(
+                                "Fallback content comparison failed, fallback 
to COPY. source={}, target={}",
+                                sourceFilePath,
+                                targetFilePath,
+                                e);
+                        return true;
+                    }
+                }
+                return !checksumEquals(sourceChecksum, targetChecksum);
+            }
+        }
+
+        return true;
+    }
+
+    private static boolean checksumEquals(FileChecksum source, FileChecksum 
target) {
+        if (source == null || target == null) {
+            return false;
+        }
+        return Objects.equals(source.getAlgorithmName(), 
target.getAlgorithmName())
+                && source.getLength() == target.getLength()
+                && Arrays.equals(source.getBytes(), target.getBytes());
+    }
+
+    private boolean fileContentEquals(String sourceFilePath, String 
targetFilePath)
+            throws IOException {
+        try (InputStream sourceIn = 
hadoopFileSystemProxy.getInputStream(sourceFilePath);
+                InputStream targetIn = 
targetHadoopFileSystemProxy.getInputStream(targetFilePath)) {
+            byte[] sourceBuffer = new byte[8 * 1024];
+            byte[] targetBuffer = new byte[8 * 1024];
+
+            while (true) {
+                int sourceRead = sourceIn.read(sourceBuffer);
+                int targetRead = targetIn.read(targetBuffer);
+                if (sourceRead != targetRead) {
+                    return false;
+                }
+                if (sourceRead == -1) {
+                    return true;
+                }
+                for (int i = 0; i < sourceRead; i++) {
+                    if (sourceBuffer[i] != targetBuffer[i]) {
+                        return false;
+                    }
+                }
+            }
+        }
+    }

Review Comment:
   Memory usage here is constant: we compare via streaming reads with two fixed 
8KB buffers (no full-file buffering), so large files shouldn’t cause OOM—only 
more I/O time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to