This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new f847f63c8 [flink] Replace CLIENT_SCANNER_IO_TMP_DIR with flink's
TMP_DIRS in runtime rather then compile phase. (#2259)
f847f63c8 is described below
commit f847f63c8df5178854cc09b8f2dedfc51cc9dc33
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Dec 26 11:19:22 2025 +0800
[flink] Replace CLIENT_SCANNER_IO_TMP_DIR with flink's TMP_DIRS in runtime
rather then compile phase. (#2259)
---
.../fluss/flink/catalog/FlinkTableFactory.java | 7 -----
.../org/apache/fluss/flink/source/FlinkSource.java | 6 +++++
.../flink/utils/FlinkConnectorOptionsUtils.java | 16 +++++++++++
.../flink/utils/FlinkConnectorOptionsUtilTest.java | 31 ++++++++++++++++++++++
4 files changed, 53 insertions(+), 7 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 608138d86..aba27735c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -30,7 +30,6 @@ import org.apache.fluss.metadata.TablePath;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.config.TableConfigOptions;
@@ -47,7 +46,6 @@ import
org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.logical.RowType;
-import java.io.File;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -247,11 +245,6 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
// retry. The option 'client.lookup.max-retries' is only for dealing
with the
// RetriableException return by server not all exceptions. Trace by:
// https://github.com/apache/fluss/issues/2099
-
- // pass flink io tmp dir to fluss client.
- flussConfig.setString(
- ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR,
- new File(flinkConfig.get(CoreOptions.TMP_DIRS),
"/fluss").getAbsolutePath());
return flussConfig;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index bc15390fe..ba880ff91 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -50,6 +50,9 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import javax.annotation.Nullable;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
+import static
org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getClientScannerIoTmpDir;
+
/** Flink source for Fluss. */
public class FlinkSource<OUT>
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>,
ResultTypeQueryable {
@@ -181,6 +184,9 @@ public class FlinkSource<OUT>
FlinkSourceReaderMetrics flinkSourceReaderMetrics =
new FlinkSourceReaderMetrics(context.metricGroup());
+ flussConf.set(
+ CLIENT_SCANNER_IO_TMP_DIR,
+ getClientScannerIoTmpDir(flussConf,
context.getConfiguration()));
deserializationSchema.open(
new DeserializerInitContextImpl(
context.metricGroup().addGroup("deserializer"),
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java
index d0de2fb1d..bad6700ee 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java
@@ -17,14 +17,17 @@
package org.apache.fluss.flink.utils;
+import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.FlinkConnectorOptions.ScanStartupMode;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.types.logical.RowType;
+import java.io.File;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@@ -34,6 +37,8 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.configuration.CoreOptions.TMP_DIRS;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
import static
org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
import static
org.apache.fluss.flink.FlinkConnectorOptions.ScanStartupMode.TIMESTAMP;
@@ -148,6 +153,17 @@ public class FlinkConnectorOptionsUtils {
}
}
+ public static String getClientScannerIoTmpDir(
+ Configuration flussConf,
org.apache.flink.configuration.Configuration flinkConfig) {
+ if (!flussConf.contains(CLIENT_SCANNER_IO_TMP_DIR)) {
+ if (flinkConfig.contains(TMP_DIRS)) {
+ // pass flink io tmp dir to fluss client.
+ return new File(flinkConfig.get(CoreOptions.TMP_DIRS),
"/fluss").getAbsolutePath();
+ }
+ }
+ return flussConf.getString(CLIENT_SCANNER_IO_TMP_DIR);
+ }
+
/** Fluss startup options. * */
public static class StartupOptions {
public ScanStartupMode startupMode;
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java
index dfccf54fd..259a8ce6e 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java
@@ -17,12 +17,16 @@
package org.apache.fluss.flink.utils;
+import org.apache.fluss.config.Configuration;
+
import org.apache.flink.table.api.ValidationException;
import org.junit.jupiter.api.Test;
import java.time.ZoneId;
import java.util.TimeZone;
+import static org.apache.flink.configuration.CoreOptions.TMP_DIRS;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
import static
org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
import static
org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp;
import static org.assertj.core.api.Assertions.assertThat;
@@ -58,4 +62,31 @@ class FlinkConnectorOptionsUtilTest {
+ "or 'timestamp', but is
'2023-12-09T23:09:12'. "
+ "You can config like: '2023-12-09 23:09:12'
or '1678883047356'.");
}
+
+ @Test
+ void testGetClientScannerIoTmpDir() {
+ Configuration flussConfig =
+ new Configuration().set(CLIENT_SCANNER_IO_TMP_DIR,
"/fluss_tmp_dir");
+ org.apache.flink.configuration.Configuration flinkConfig =
+ new
org.apache.flink.configuration.Configuration().set(TMP_DIRS, "/flink_tmp_dir");
+ assertThat(
+ FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
+ flussConfig, new
org.apache.flink.configuration.Configuration()))
+ .isEqualTo("/fluss_tmp_dir");
+
assertThat(FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(flussConfig,
flinkConfig))
+ .isEqualTo("/fluss_tmp_dir");
+ String property = System.getProperty("java.io.tmpdir");
+ assertThat(
+ FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
+ new Configuration(),
+ new
org.apache.flink.configuration.Configuration()))
+ .isEqualTo(property + "/fluss");
+
+ // only replace when flussConfig not contains
CLIENT_SCANNER_IO_TMP_DIR while flinkConfig
+ // contains TMP_DIRS.
+ assertThat(
+ FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
+ new Configuration(), flinkConfig))
+ .isEqualTo("/flink_tmp_dir/fluss");
+ }
}