This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ad45868ff [flink] Support "lookup.insert-if-not-exists" option for
Flink Lookup Join (#2601)
ad45868ff is described below
commit ad45868ff3da65f0ffa31e39cd3f70201513015f
Author: xx789 <[email protected]>
AuthorDate: Sun Feb 8 22:35:51 2026 +0800
[flink] Support "lookup.insert-if-not-exists" option for Flink Lookup Join
(#2601)
---
.../apache/fluss/flink/FlinkConnectorOptions.java | 10 +++
.../fluss/flink/catalog/FlinkTableFactory.java | 2 +
.../fluss/flink/source/FlinkTableSource.java | 10 ++-
.../source/lookup/FlinkAsyncLookupFunction.java | 7 +-
.../flink/source/lookup/FlinkLookupFunction.java | 7 +-
.../apache/fluss/flink/utils/PushdownUtils.java | 3 +-
.../fluss/flink/source/FlinkTableSourceITCase.java | 80 ++++++++++++++++++++++
.../source/lookup/FlinkLookupFunctionTest.java | 12 ++--
website/docs/engine-flink/options.md | 29 ++++----
9 files changed, 137 insertions(+), 23 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
index 6fd5d147f..e00f131f1 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
@@ -86,6 +86,16 @@ public class FlinkConnectorOptions {
.defaultValue(true)
.withDescription("Whether to set async lookup. Default is
true.");
+ public static final ConfigOption<Boolean> LOOKUP_INSERT_IF_NOT_EXISTS =
+ ConfigOptions.key("lookup.insert-if-not-exists")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable insert-if-not-exists behavior
for the lookup operation. "
+ + "When enabled, if a lookup does not find
a matching row, a new row will be inserted "
+ + "with the lookup key values. This
feature cannot be used with PREFIX_LOOKUP type. "
+ + "Default is false.");
+
//
--------------------------------------------------------------------------------------------
// Scan specific options
//
--------------------------------------------------------------------------------------------
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index a7a5608de..50223b520 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -153,6 +153,7 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
isStreamingMode,
startupOptions,
tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
+
tableOptions.get(FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS),
cache,
partitionDiscoveryIntervalMs,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
@@ -214,6 +215,7 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.LOOKUP_ASYNC,
+
FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS,
FlinkConnectorOptions.SINK_IGNORE_DELETE,
FlinkConnectorOptions.SINK_BUCKET_SHUFFLE,
FlinkConnectorOptions.SINK_DISTRIBUTION_MODE,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 8e234d86c..31377048c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -123,6 +123,7 @@ public class FlinkTableSource
// options for lookup source
private final boolean lookupAsync;
+ private final boolean insertIfNotExists;
@Nullable private final LookupCache cache;
private final long scanPartitionDiscoveryIntervalMs;
@@ -161,6 +162,7 @@ public class FlinkTableSource
boolean streaming,
FlinkConnectorOptionsUtils.StartupOptions startupOptions,
boolean lookupAsync,
+ boolean insertIfNotExists,
@Nullable LookupCache cache,
long scanPartitionDiscoveryIntervalMs,
boolean isDataLakeEnabled,
@@ -177,6 +179,7 @@ public class FlinkTableSource
this.startupOptions = checkNotNull(startupOptions, "startupOptions
must not be null");
this.lookupAsync = lookupAsync;
+ this.insertIfNotExists = insertIfNotExists;
this.cache = cache;
this.scanPartitionDiscoveryIntervalMs =
scanPartitionDiscoveryIntervalMs;
@@ -385,7 +388,8 @@ public class FlinkTableSource
tablePath,
tableOutputType,
lookupNormalizer,
- projectedFields);
+ projectedFields,
+ insertIfNotExists);
if (cache != null) {
return
PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache);
} else {
@@ -398,7 +402,8 @@ public class FlinkTableSource
tablePath,
tableOutputType,
lookupNormalizer,
- projectedFields);
+ projectedFields,
+ insertIfNotExists);
if (cache != null) {
return PartialCachingLookupProvider.of(lookupFunction, cache);
} else {
@@ -420,6 +425,7 @@ public class FlinkTableSource
streaming,
startupOptions,
lookupAsync,
+ insertIfNotExists,
cache,
scanPartitionDiscoveryIntervalMs,
isDataLakeEnabled,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
index 82fd1bbc3..78b4fb8ed 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
@@ -63,6 +63,7 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
private final RowType flinkRowType;
private final LookupNormalizer lookupNormalizer;
@Nullable private int[] projection;
+ private final boolean insertIfNotExists;
private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter;
private transient Connection connection;
@@ -75,12 +76,14 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
TablePath tablePath,
RowType flinkRowType,
LookupNormalizer lookupNormalizer,
- @Nullable int[] projection) {
+ @Nullable int[] projection,
+ boolean insertIfNotExists) {
this.flussConfig = flussConfig;
this.tablePath = tablePath;
this.flinkRowType = flinkRowType;
this.lookupNormalizer = lookupNormalizer;
this.projection = projection;
+ this.insertIfNotExists = insertIfNotExists;
}
@Override
@@ -110,6 +113,8 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes();
RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType,
lookupKeyIndexes);
lookup = lookup.lookupBy(lookupKeyRowType.getFieldNames());
+ } else if (insertIfNotExists) {
+ lookup = lookup.enableInsertIfNotExists();
}
lookuper = lookup.createLookuper();
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
index 477666737..a2bda2fec 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
@@ -58,6 +58,7 @@ public class FlinkLookupFunction extends LookupFunction {
private final RowType flinkRowType;
private final LookupNormalizer lookupNormalizer;
@Nullable private final int[] projection;
+ private final boolean insertIfNotExists;
private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter;
private transient Connection connection;
@@ -71,12 +72,14 @@ public class FlinkLookupFunction extends LookupFunction {
TablePath tablePath,
RowType flinkRowType,
LookupNormalizer lookupNormalizer,
- @Nullable int[] projection) {
+ @Nullable int[] projection,
+ boolean insertIfNotExists) {
this.flussConfig = flussConfig;
this.tablePath = tablePath;
this.flinkRowType = flinkRowType;
this.lookupNormalizer = lookupNormalizer;
this.projection = projection;
+ this.insertIfNotExists = insertIfNotExists;
}
@Override
@@ -109,6 +112,8 @@ public class FlinkLookupFunction extends LookupFunction {
int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes();
RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType,
lookupKeyIndexes);
lookup = lookup.lookupBy(lookupKeyRowType.getFieldNames());
+ } else if (insertIfNotExists) {
+ lookup = lookup.enableInsertIfNotExists();
}
lookuper = lookup.createLookuper();
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
index 1d1f39853..16cf55a41 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
@@ -271,7 +271,8 @@ public class PushdownUtils {
tablePath,
sourceOutputType,
lookupNormalizer,
- projectedFields);
+ projectedFields,
+ false);
try {
// it's fine to pass null here, as we don't use it in it
lookupFunction.open(null);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 5d903cb8d..2de475774 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -966,6 +966,86 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
.hasMessage("Full lookup caching is not supported yet.");
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testLookupInsertIfNotExists(boolean async) throws Exception {
+ // use single parallelism to make result ordering stable
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
+
+ String uidMappingTable = String.format("uid_mapping_%s", async ?
"async" : "sync");
+ // Create uid_mapping table: uid (STRING, PK), uid_int32 (INT,
auto-increment)
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " uid varchar not null,"
+ + " uid_int32 int,"
+ + " primary key (uid) NOT ENFORCED"
+ + ") with ('auto-increment.fields' =
'uid_int32', 'bucket.num' = '1')",
+ uidMappingTable));
+
+ // Create ods_table as a Flink temporary source table
+ List<Row> odsData =
+ Arrays.asList(
+ Row.of("CN", "Beijing", "Haidian", "2025-01-01",
"user_a"),
+ Row.of("CN", "Shanghai", "Pudong", "2025-01-02",
"user_b"),
+ Row.of("US", "California", "LA", "2025-01-03",
"user_a"),
+ Row.of("JP", "Tokyo", "Shibuya", "2025-01-04",
"user_c"));
+
+ Schema odsSchema =
+ Schema.newBuilder()
+ .column("country", DataTypes.STRING())
+ .column("prov", DataTypes.STRING())
+ .column("city", DataTypes.STRING())
+ .column("ymd", DataTypes.STRING())
+ .column("uid", DataTypes.STRING())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build();
+ RowTypeInfo odsTypeInfo =
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.STRING, Types.STRING, Types.STRING,
Types.STRING, Types.STRING
+ },
+ new String[] {"country", "prov", "city", "ymd",
"uid"});
+ DataStream<Row> odsDs =
execEnv.fromCollection(odsData).returns(odsTypeInfo);
+ tEnv.dropTemporaryView("ods_src");
+ tEnv.createTemporaryView("ods_src", tEnv.fromDataStream(odsDs,
odsSchema));
+
+ // Perform lookup join with insertIfNotExists hint
+ String lookupAsyncOption = async ? "'lookup.async' = 'true'" :
"'lookup.async' = 'false'";
+ String joinQuery =
+ String.format(
+ "SELECT ods.country, ods.prov, ods.city, ods.ymd,
ods.uid, dim.uid_int32 "
+ + "FROM ods_src AS ods "
+ + "JOIN %s /*+
OPTIONS('lookup.insert-if-not-exists' = 'true', %s) */ "
+ + "FOR SYSTEM_TIME AS OF ods.proctime AS dim "
+ + "ON dim.uid = ods.uid",
+ uidMappingTable, lookupAsyncOption);
+
+ CloseableIterator<Row> joinResult =
tEnv.executeSql(joinQuery).collect();
+
+ // user_a appears twice, user_b and user_c appear once each
+ // With insertIfNotExists, uid_mapping should auto-create entries:
+ // user_a -> uid_int32=1, user_b -> uid_int32=2, user_a (2nd) reuses
uid_int32=1,
+ // user_c -> uid_int32=3
+ List<String> expectedJoinResults =
+ Arrays.asList(
+ "+I[CN, Beijing, Haidian, 2025-01-01, user_a, 1]",
+ "+I[CN, Shanghai, Pudong, 2025-01-02, user_b, 2]",
+ "+I[US, California, LA, 2025-01-03, user_a, 1]",
+ "+I[JP, Tokyo, Shibuya, 2025-01-04, user_c, 3]");
+ assertResultsIgnoreOrder(joinResult, expectedJoinResults, true);
+
+ // Verify uid_mapping table has the correct data
+ List<String> expectedUidMappingResults =
+ Arrays.asList("+I[user_a, 1]", "+I[user_b, 2]", "+I[user_c,
3]");
+ assertQueryResultExactOrder(
+ tEnv,
+ String.format(
+ "SELECT uid, uid_int32 FROM %s /*+
OPTIONS('scan.startup.mode' = 'earliest') */",
+ uidMappingTable),
+ expectedUidMappingResults);
+ }
+
@Test
void testStreamingReadSinglePartitionPushDown() throws Exception {
tEnv.executeSql(
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
index 4d136c865..760849dfb 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
@@ -65,7 +65,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
flinkRowType,
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
// no projection when job compiling, new column added
after that.
- null);
+ null,
+ false);
ListOutputCollector collector = new ListOutputCollector();
lookupFunction.setCollector(collector);
@@ -104,7 +105,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
flinkRowType,
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
// no projection when job compiling, new column added
after that.
- null);
+ null,
+ false);
asyncLookupFunction.open(null);
int[] rowKeys = new int[] {0, 1, 2, 3, 4, 3, 0};
@@ -156,7 +158,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
tablePath,
flinkRowType,
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
- new int[] {1, 0});
+ new int[] {1, 0},
+ false);
ListOutputCollector collector = new ListOutputCollector();
lookupFunction.setCollector(collector);
@@ -202,7 +205,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
tablePath,
flinkRowType,
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
- null);
+ null,
+ false);
collector = new ListOutputCollector();
lookupFunction.setCollector(collector);
lookupFunction.open(null);
diff --git a/website/docs/engine-flink/options.md
b/website/docs/engine-flink/options.md
index 5f18a4b79..7f9463a73 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -110,20 +110,21 @@ See more details about [ALTER TABLE ...
SET](engine-flink/ddl.md#set-properties)
## Lookup Options
-| Option | Type | Default |
Description
|
-|------------------------------------------|------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------|
-| lookup.async | Boolean | true |
Whether to use asynchronous lookup. Asynchronous lookup has better throughput
performance than synchronous lookup. |
-| lookup.cache | Enum | NONE |
The caching strategy for this lookup table, including NONE, PARTIAL.
|
-| lookup.max-retries | Integer | 3 |
The maximum allowed retries if a lookup operation fails. Setting this value
will override option 'client.lookup.max-retries'. |
-| lookup.partial-cache.expire-after-access | Duration | (None) |
Duration to expire an entry in the cache after accessing.
|
-| lookup.partial-cache.expire-after-write | Duration | (None) |
Duration to expire an entry in the cache after writing.
|
-| lookup.partial-cache.cache-missing-key | Boolean | true |
Whether to store an empty value into the cache if the lookup key doesn't match
any rows in the table. |
-| lookup.partial-cache.max-rows | Long | (None) |
The maximum number of rows to store in the cache.
|
-| client.lookup.queue-size | Integer | 25600 |
The maximum number of pending lookup operations.
|
-| client.lookup.max-batch-size | Integer | 128 |
The maximum batch size of merging lookup operations to one lookup request.
|
-| client.lookup.max-inflight-requests | Integer | 128 |
The maximum number of unacknowledged lookup requests for lookup operations.
|
-| client.lookup.batch-timeout | Duration | 100ms |
The maximum time to wait for the lookup batch to full, if this timeout is
reached, the lookup batch will be closed to send. |
-| client.lookup.max-retries | Integer | Integer.MAX_VALUE |
Setting a value greater than zero will cause the client to resend any lookup
request that fails with a potentially transient error. |
+| Option | Type | Default |
Description
|
+|------------------------------------------|------------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| lookup.async | Boolean | true |
Whether to use asynchronous lookup. Asynchronous lookup has better throughput
performance than synchronous lookup.
|
+| lookup.insert-if-not-exists | Boolean | false |
Whether to enable insert-if-not-exists behavior for the lookup operation. When
enabled, if a lookup does not find a matching row, a new row will be inserted
with the lookup key values. |
+| lookup.cache | Enum | NONE |
The caching strategy for this lookup table, including NONE, PARTIAL.
|
+| lookup.max-retries | Integer | 3 |
The maximum allowed retries if a lookup operation fails. Setting this value
will override option 'client.lookup.max-retries'.
|
+| lookup.partial-cache.expire-after-access | Duration | (None) |
Duration to expire an entry in the cache after accessing.
|
+| lookup.partial-cache.expire-after-write | Duration | (None) |
Duration to expire an entry in the cache after writing.
|
+| lookup.partial-cache.cache-missing-key | Boolean | true |
Whether to store an empty value into the cache if the lookup key doesn't match
any rows in the table.
|
+| lookup.partial-cache.max-rows | Long | (None) |
The maximum number of rows to store in the cache.
|
+| client.lookup.queue-size | Integer | 25600 |
The maximum number of pending lookup operations.
|
+| client.lookup.max-batch-size | Integer | 128 |
The maximum batch size of merging lookup operations to one lookup request.
|
+| client.lookup.max-inflight-requests | Integer | 128 |
The maximum number of unacknowledged lookup requests for lookup operations.
|
+| client.lookup.batch-timeout | Duration | 100ms |
The maximum time to wait for the lookup batch to full, if this timeout is
reached, the lookup batch will be closed to send.
|
+| client.lookup.max-retries | Integer | Integer.MAX_VALUE |
Setting a value greater than zero will cause the client to resend any lookup
request that fails with a potentially transient error.
|
## Write Options