This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new f847f63c8 [flink] Replace CLIENT_SCANNER_IO_TMP_DIR with flink's 
TMP_DIRS in runtime rather then compile phase. (#2259)
f847f63c8 is described below

commit f847f63c8df5178854cc09b8f2dedfc51cc9dc33
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Dec 26 11:19:22 2025 +0800

    [flink] Replace CLIENT_SCANNER_IO_TMP_DIR with flink's TMP_DIRS in runtime 
rather then compile phase. (#2259)
---
 .../fluss/flink/catalog/FlinkTableFactory.java     |  7 -----
 .../org/apache/fluss/flink/source/FlinkSource.java |  6 +++++
 .../flink/utils/FlinkConnectorOptionsUtils.java    | 16 +++++++++++
 .../flink/utils/FlinkConnectorOptionsUtilTest.java | 31 ++++++++++++++++++++++
 4 files changed, 53 insertions(+), 7 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 608138d86..aba27735c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -30,7 +30,6 @@ import org.apache.fluss.metadata.TablePath;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.config.TableConfigOptions;
@@ -47,7 +46,6 @@ import 
org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.io.File;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -247,11 +245,6 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
         // retry. The option 'client.lookup.max-retries' is only for dealing 
with the
         // RetriableException return by server not all exceptions. Trace by:
         // https://github.com/apache/fluss/issues/2099
-
-        // pass flink io tmp dir to fluss client.
-        flussConfig.setString(
-                ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR,
-                new File(flinkConfig.get(CoreOptions.TMP_DIRS), 
"/fluss").getAbsolutePath());
         return flussConfig;
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index bc15390fe..ba880ff91 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -50,6 +50,9 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import javax.annotation.Nullable;
 
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
+import static 
org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getClientScannerIoTmpDir;
+
 /** Flink source for Fluss. */
 public class FlinkSource<OUT>
         implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, 
ResultTypeQueryable {
@@ -181,6 +184,9 @@ public class FlinkSource<OUT>
         FlinkSourceReaderMetrics flinkSourceReaderMetrics =
                 new FlinkSourceReaderMetrics(context.metricGroup());
 
+        flussConf.set(
+                CLIENT_SCANNER_IO_TMP_DIR,
+                getClientScannerIoTmpDir(flussConf, 
context.getConfiguration()));
         deserializationSchema.open(
                 new DeserializerInitContextImpl(
                         context.metricGroup().addGroup("deserializer"),
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java
index d0de2fb1d..bad6700ee 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java
@@ -17,14 +17,17 @@
 
 package org.apache.fluss.flink.utils;
 
+import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.FlinkConnectorOptions;
 import org.apache.fluss.flink.FlinkConnectorOptions.ScanStartupMode;
 
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.io.File;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
@@ -34,6 +37,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.configuration.CoreOptions.TMP_DIRS;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
 import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
 import static 
org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
 import static 
org.apache.fluss.flink.FlinkConnectorOptions.ScanStartupMode.TIMESTAMP;
@@ -148,6 +153,17 @@ public class FlinkConnectorOptionsUtils {
         }
     }
 
+    public static String getClientScannerIoTmpDir(
+            Configuration flussConf, 
org.apache.flink.configuration.Configuration flinkConfig) {
+        if (!flussConf.contains(CLIENT_SCANNER_IO_TMP_DIR)) {
+            if (flinkConfig.contains(TMP_DIRS)) {
+                // pass flink io tmp dir to fluss client.
+                return new File(flinkConfig.get(CoreOptions.TMP_DIRS), 
"/fluss").getAbsolutePath();
+            }
+        }
+        return flussConf.getString(CLIENT_SCANNER_IO_TMP_DIR);
+    }
+
     /** Fluss startup options. * */
     public static class StartupOptions {
         public ScanStartupMode startupMode;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java
index dfccf54fd..259a8ce6e 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java
@@ -17,12 +17,16 @@
 
 package org.apache.fluss.flink.utils;
 
+import org.apache.fluss.config.Configuration;
+
 import org.apache.flink.table.api.ValidationException;
 import org.junit.jupiter.api.Test;
 
 import java.time.ZoneId;
 import java.util.TimeZone;
 
+import static org.apache.flink.configuration.CoreOptions.TMP_DIRS;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
 import static 
org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
 import static 
org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -58,4 +62,31 @@ class FlinkConnectorOptionsUtilTest {
                                 + "or 'timestamp', but is 
'2023-12-09T23:09:12'. "
                                 + "You can config like: '2023-12-09 23:09:12' 
or '1678883047356'.");
     }
+
+    @Test
+    void testGetClientScannerIoTmpDir() {
+        Configuration flussConfig =
+                new Configuration().set(CLIENT_SCANNER_IO_TMP_DIR, 
"/fluss_tmp_dir");
+        org.apache.flink.configuration.Configuration flinkConfig =
+                new 
org.apache.flink.configuration.Configuration().set(TMP_DIRS, "/flink_tmp_dir");
+        assertThat(
+                        FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
+                                flussConfig, new 
org.apache.flink.configuration.Configuration()))
+                .isEqualTo("/fluss_tmp_dir");
+        
assertThat(FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(flussConfig, 
flinkConfig))
+                .isEqualTo("/fluss_tmp_dir");
+        String property = System.getProperty("java.io.tmpdir");
+        assertThat(
+                        FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
+                                new Configuration(),
+                                new 
org.apache.flink.configuration.Configuration()))
+                .isEqualTo(property + "/fluss");
+
+        // only replace when flussConfig not contains 
CLIENT_SCANNER_IO_TMP_DIR while flinkConfig
+        // contains TMP_DIRS.
+        assertThat(
+                        FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
+                                new Configuration(), flinkConfig))
+                .isEqualTo("/flink_tmp_dir/fluss");
+    }
 }

Reply via email to