This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 7bd28226cfa224d502d61dca83f3623b9e7b5db1
Author: xx789 <[email protected]>
AuthorDate: Sun Feb 8 22:35:51 2026 +0800

    [flink] Support "lookup.insert-if-not-exists" option for Flink Lookup Join 
(#2601)
---
 .../apache/fluss/flink/FlinkConnectorOptions.java  | 10 +++
 .../fluss/flink/catalog/FlinkTableFactory.java     |  2 +
 .../fluss/flink/source/FlinkTableSource.java       | 10 ++-
 .../source/lookup/FlinkAsyncLookupFunction.java    |  7 +-
 .../flink/source/lookup/FlinkLookupFunction.java   |  7 +-
 .../apache/fluss/flink/utils/PushdownUtils.java    |  3 +-
 .../fluss/flink/source/FlinkTableSourceITCase.java | 80 ++++++++++++++++++++++
 .../source/lookup/FlinkLookupFunctionTest.java     | 12 ++--
 website/docs/engine-flink/options.md               | 29 ++++----
 9 files changed, 137 insertions(+), 23 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
index 6fd5d147f..e00f131f1 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
@@ -86,6 +86,16 @@ public class FlinkConnectorOptions {
                     .defaultValue(true)
                     .withDescription("Whether to set async lookup. Default is 
true.");
 
+    public static final ConfigOption<Boolean> LOOKUP_INSERT_IF_NOT_EXISTS =
+            ConfigOptions.key("lookup.insert-if-not-exists")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable insert-if-not-exists behavior 
for the lookup operation. "
+                                    + "When enabled, if a lookup does not find 
a matching row, a new row will be inserted "
+                                    + "with the lookup key values. This 
feature cannot be used with PREFIX_LOOKUP type. "
+                                    + "Default is false.");
+
     // 
--------------------------------------------------------------------------------------------
     // Scan specific options
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index a7a5608de..50223b520 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -153,6 +153,7 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
                 isStreamingMode,
                 startupOptions,
                 tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
+                
tableOptions.get(FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS),
                 cache,
                 partitionDiscoveryIntervalMs,
                 
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
@@ -214,6 +215,7 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
                                 FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
                                 
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
                                 FlinkConnectorOptions.LOOKUP_ASYNC,
+                                
FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS,
                                 FlinkConnectorOptions.SINK_IGNORE_DELETE,
                                 FlinkConnectorOptions.SINK_BUCKET_SHUFFLE,
                                 FlinkConnectorOptions.SINK_DISTRIBUTION_MODE,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 8e234d86c..31377048c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -123,6 +123,7 @@ public class FlinkTableSource
 
     // options for lookup source
     private final boolean lookupAsync;
+    private final boolean insertIfNotExists;
     @Nullable private final LookupCache cache;
 
     private final long scanPartitionDiscoveryIntervalMs;
@@ -161,6 +162,7 @@ public class FlinkTableSource
             boolean streaming,
             FlinkConnectorOptionsUtils.StartupOptions startupOptions,
             boolean lookupAsync,
+            boolean insertIfNotExists,
             @Nullable LookupCache cache,
             long scanPartitionDiscoveryIntervalMs,
             boolean isDataLakeEnabled,
@@ -177,6 +179,7 @@ public class FlinkTableSource
         this.startupOptions = checkNotNull(startupOptions, "startupOptions 
must not be null");
 
         this.lookupAsync = lookupAsync;
+        this.insertIfNotExists = insertIfNotExists;
         this.cache = cache;
 
         this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
@@ -385,7 +388,8 @@ public class FlinkTableSource
                             tablePath,
                             tableOutputType,
                             lookupNormalizer,
-                            projectedFields);
+                            projectedFields,
+                            insertIfNotExists);
             if (cache != null) {
                 return 
PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache);
             } else {
@@ -398,7 +402,8 @@ public class FlinkTableSource
                             tablePath,
                             tableOutputType,
                             lookupNormalizer,
-                            projectedFields);
+                            projectedFields,
+                            insertIfNotExists);
             if (cache != null) {
                 return PartialCachingLookupProvider.of(lookupFunction, cache);
             } else {
@@ -420,6 +425,7 @@ public class FlinkTableSource
                         streaming,
                         startupOptions,
                         lookupAsync,
+                        insertIfNotExists,
                         cache,
                         scanPartitionDiscoveryIntervalMs,
                         isDataLakeEnabled,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
index 82fd1bbc3..78b4fb8ed 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
@@ -63,6 +63,7 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
     private final RowType flinkRowType;
     private final LookupNormalizer lookupNormalizer;
     @Nullable private int[] projection;
+    private final boolean insertIfNotExists;
 
     private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter;
     private transient Connection connection;
@@ -75,12 +76,14 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
             TablePath tablePath,
             RowType flinkRowType,
             LookupNormalizer lookupNormalizer,
-            @Nullable int[] projection) {
+            @Nullable int[] projection,
+            boolean insertIfNotExists) {
         this.flussConfig = flussConfig;
         this.tablePath = tablePath;
         this.flinkRowType = flinkRowType;
         this.lookupNormalizer = lookupNormalizer;
         this.projection = projection;
+        this.insertIfNotExists = insertIfNotExists;
     }
 
     @Override
@@ -110,6 +113,8 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
             int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes();
             RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType, 
lookupKeyIndexes);
             lookup = lookup.lookupBy(lookupKeyRowType.getFieldNames());
+        } else if (insertIfNotExists) {
+            lookup = lookup.enableInsertIfNotExists();
         }
         lookuper = lookup.createLookuper();
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
index 477666737..a2bda2fec 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
@@ -58,6 +58,7 @@ public class FlinkLookupFunction extends LookupFunction {
     private final RowType flinkRowType;
     private final LookupNormalizer lookupNormalizer;
     @Nullable private final int[] projection;
+    private final boolean insertIfNotExists;
 
     private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter;
     private transient Connection connection;
@@ -71,12 +72,14 @@ public class FlinkLookupFunction extends LookupFunction {
             TablePath tablePath,
             RowType flinkRowType,
             LookupNormalizer lookupNormalizer,
-            @Nullable int[] projection) {
+            @Nullable int[] projection,
+            boolean insertIfNotExists) {
         this.flussConfig = flussConfig;
         this.tablePath = tablePath;
         this.flinkRowType = flinkRowType;
         this.lookupNormalizer = lookupNormalizer;
         this.projection = projection;
+        this.insertIfNotExists = insertIfNotExists;
     }
 
     @Override
@@ -109,6 +112,8 @@ public class FlinkLookupFunction extends LookupFunction {
             int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes();
             RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType, 
lookupKeyIndexes);
             lookup = lookup.lookupBy(lookupKeyRowType.getFieldNames());
+        } else if (insertIfNotExists) {
+            lookup = lookup.enableInsertIfNotExists();
         }
         lookuper = lookup.createLookuper();
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
index 1d1f39853..16cf55a41 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
@@ -271,7 +271,8 @@ public class PushdownUtils {
                         tablePath,
                         sourceOutputType,
                         lookupNormalizer,
-                        projectedFields);
+                        projectedFields,
+                        false);
         try {
             // it's fine to pass null here, as we don't use it in it
             lookupFunction.open(null);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 5d903cb8d..2de475774 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -966,6 +966,86 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 .hasMessage("Full lookup caching is not supported yet.");
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testLookupInsertIfNotExists(boolean async) throws Exception {
+        // use single parallelism to make result ordering stable
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+
+        String uidMappingTable = String.format("uid_mapping_%s", async ? 
"async" : "sync");
+        // Create uid_mapping table: uid (STRING, PK), uid_int32 (INT, 
auto-increment)
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " uid varchar not null,"
+                                + " uid_int32 int,"
+                                + " primary key (uid) NOT ENFORCED"
+                                + ") with ('auto-increment.fields' = 
'uid_int32', 'bucket.num' = '1')",
+                        uidMappingTable));
+
+        // Create ods_table as a Flink temporary source table
+        List<Row> odsData =
+                Arrays.asList(
+                        Row.of("CN", "Beijing", "Haidian", "2025-01-01", 
"user_a"),
+                        Row.of("CN", "Shanghai", "Pudong", "2025-01-02", 
"user_b"),
+                        Row.of("US", "California", "LA", "2025-01-03", 
"user_a"),
+                        Row.of("JP", "Tokyo", "Shibuya", "2025-01-04", 
"user_c"));
+
+        Schema odsSchema =
+                Schema.newBuilder()
+                        .column("country", DataTypes.STRING())
+                        .column("prov", DataTypes.STRING())
+                        .column("city", DataTypes.STRING())
+                        .column("ymd", DataTypes.STRING())
+                        .column("uid", DataTypes.STRING())
+                        .columnByExpression("proctime", "PROCTIME()")
+                        .build();
+        RowTypeInfo odsTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {
+                            Types.STRING, Types.STRING, Types.STRING, 
Types.STRING, Types.STRING
+                        },
+                        new String[] {"country", "prov", "city", "ymd", 
"uid"});
+        DataStream<Row> odsDs = 
execEnv.fromCollection(odsData).returns(odsTypeInfo);
+        tEnv.dropTemporaryView("ods_src");
+        tEnv.createTemporaryView("ods_src", tEnv.fromDataStream(odsDs, 
odsSchema));
+
+        // Perform lookup join with insertIfNotExists hint
+        String lookupAsyncOption = async ? "'lookup.async' = 'true'" : 
"'lookup.async' = 'false'";
+        String joinQuery =
+                String.format(
+                        "SELECT ods.country, ods.prov, ods.city, ods.ymd, 
ods.uid, dim.uid_int32 "
+                                + "FROM ods_src AS ods "
+                                + "JOIN %s /*+ 
OPTIONS('lookup.insert-if-not-exists' = 'true', %s) */ "
+                                + "FOR SYSTEM_TIME AS OF ods.proctime AS dim "
+                                + "ON dim.uid = ods.uid",
+                        uidMappingTable, lookupAsyncOption);
+
+        CloseableIterator<Row> joinResult = 
tEnv.executeSql(joinQuery).collect();
+
+        // user_a appears twice, user_b and user_c appear once each
+        // With insertIfNotExists, uid_mapping should auto-create entries:
+        // user_a -> uid_int32=1, user_b -> uid_int32=2, user_a (2nd) reuses 
uid_int32=1,
+        // user_c -> uid_int32=3
+        List<String> expectedJoinResults =
+                Arrays.asList(
+                        "+I[CN, Beijing, Haidian, 2025-01-01, user_a, 1]",
+                        "+I[CN, Shanghai, Pudong, 2025-01-02, user_b, 2]",
+                        "+I[US, California, LA, 2025-01-03, user_a, 1]",
+                        "+I[JP, Tokyo, Shibuya, 2025-01-04, user_c, 3]");
+        assertResultsIgnoreOrder(joinResult, expectedJoinResults, true);
+
+        // Verify uid_mapping table has the correct data
+        List<String> expectedUidMappingResults =
+                Arrays.asList("+I[user_a, 1]", "+I[user_b, 2]", "+I[user_c, 
3]");
+        assertQueryResultExactOrder(
+                tEnv,
+                String.format(
+                        "SELECT uid, uid_int32 FROM %s /*+ 
OPTIONS('scan.startup.mode' = 'earliest') */",
+                        uidMappingTable),
+                expectedUidMappingResults);
+    }
+
     @Test
     void testStreamingReadSinglePartitionPushDown() throws Exception {
         tEnv.executeSql(
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
index 4d136c865..760849dfb 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
@@ -65,7 +65,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         flinkRowType,
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
                         // no projection when job compiling, new column added 
after that.
-                        null);
+                        null,
+                        false);
 
         ListOutputCollector collector = new ListOutputCollector();
         lookupFunction.setCollector(collector);
@@ -104,7 +105,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         flinkRowType,
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
                         // no projection when job compiling, new column added 
after that.
-                        null);
+                        null,
+                        false);
         asyncLookupFunction.open(null);
 
         int[] rowKeys = new int[] {0, 1, 2, 3, 4, 3, 0};
@@ -156,7 +158,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         tablePath,
                         flinkRowType,
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
-                        new int[] {1, 0});
+                        new int[] {1, 0},
+                        false);
 
         ListOutputCollector collector = new ListOutputCollector();
         lookupFunction.setCollector(collector);
@@ -202,7 +205,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         tablePath,
                         flinkRowType,
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
-                        null);
+                        null,
+                        false);
         collector = new ListOutputCollector();
         lookupFunction.setCollector(collector);
         lookupFunction.open(null);
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index 5f18a4b79..7f9463a73 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -110,20 +110,21 @@ See more details about [ALTER TABLE ... 
SET](engine-flink/ddl.md#set-properties)
 
 ## Lookup Options
 
-| Option                                   | Type       | Default           | 
Description                                                                     
                                                    |
-|------------------------------------------|------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------|
-| lookup.async                             | Boolean    | true              | 
Whether to use asynchronous lookup. Asynchronous lookup has better throughput 
performance than synchronous lookup.                  |
-| lookup.cache                             | Enum       | NONE              | 
The caching strategy for this lookup table, including NONE, PARTIAL.            
                                                    |
-| lookup.max-retries                       | Integer    | 3                 | 
The maximum allowed retries if a lookup operation fails. Setting this value 
will override option 'client.lookup.max-retries'.       |
-| lookup.partial-cache.expire-after-access | Duration   | (None)            | 
Duration to expire an entry in the cache after accessing.                       
                                                    |
-| lookup.partial-cache.expire-after-write  | Duration   | (None)            | 
Duration to expire an entry in the cache after writing.                         
                                                    |
-| lookup.partial-cache.cache-missing-key   | Boolean    | true              | 
Whether to store an empty value into the cache if the lookup key doesn't match 
any rows in the table.                               |
-| lookup.partial-cache.max-rows            | Long       | (None)            | 
The maximum number of rows to store in the cache.                               
                                                    |
-| client.lookup.queue-size                 | Integer    | 25600             | 
The maximum number of pending lookup operations.                                
                                                    |
-| client.lookup.max-batch-size             | Integer    | 128               | 
The maximum batch size of merging lookup operations to one lookup request.      
                                                    |
-| client.lookup.max-inflight-requests      | Integer    | 128               | 
The maximum number of unacknowledged lookup requests for lookup operations.     
                                                    |
-| client.lookup.batch-timeout              | Duration   | 100ms             | 
The maximum time to wait for the lookup batch to full, if this timeout is 
reached, the lookup batch will be closed to send.         | 
-| client.lookup.max-retries                | Integer    | Integer.MAX_VALUE | 
Setting a value greater than zero will cause the client to resend any lookup 
request that fails with a potentially transient error. |
+| Option                                   | Type       | Default           | 
Description                                                                     
                                                                                
                         |
+|------------------------------------------|------------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| lookup.async                             | Boolean    | true              | 
Whether to use asynchronous lookup. Asynchronous lookup has better throughput 
performance than synchronous lookup.                                            
                           |
+| lookup.insert-if-not-exists              | Boolean    | false             | 
Whether to enable insert-if-not-exists behavior for the lookup operation. When 
enabled, if a lookup does not find a matching row, a new row will be inserted 
with the lookup key values. |
+| lookup.cache                             | Enum       | NONE              | 
The caching strategy for this lookup table, including NONE, PARTIAL.            
                                                                                
                         |
+| lookup.max-retries                       | Integer    | 3                 | 
The maximum allowed retries if a lookup operation fails. Setting this value 
will override option 'client.lookup.max-retries'.                               
                             |
+| lookup.partial-cache.expire-after-access | Duration   | (None)            | 
Duration to expire an entry in the cache after accessing.                       
                                                                                
                         |
+| lookup.partial-cache.expire-after-write  | Duration   | (None)            | 
Duration to expire an entry in the cache after writing.                         
                                                                                
                         |
+| lookup.partial-cache.cache-missing-key   | Boolean    | true              | 
Whether to store an empty value into the cache if the lookup key doesn't match 
any rows in the table.                                                          
                          |
+| lookup.partial-cache.max-rows            | Long       | (None)            | 
The maximum number of rows to store in the cache.                               
                                                                                
                         |
+| client.lookup.queue-size                 | Integer    | 25600             | 
The maximum number of pending lookup operations.                                
                                                                                
                         |
+| client.lookup.max-batch-size             | Integer    | 128               | 
The maximum batch size of merging lookup operations to one lookup request.      
                                                                                
                         |
+| client.lookup.max-inflight-requests      | Integer    | 128               | 
The maximum number of unacknowledged lookup requests for lookup operations.     
                                                                                
                         |
+| client.lookup.batch-timeout              | Duration   | 100ms             | 
The maximum time to wait for the lookup batch to full, if this timeout is 
reached, the lookup batch will be closed to send.                               
                               | 
+| client.lookup.max-retries                | Integer    | Integer.MAX_VALUE | 
Setting a value greater than zero will cause the client to resend any lookup 
request that fails with a potentially transient error.                          
                            |
 
 
 ## Write Options

Reply via email to