This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 7bd28226cfa224d502d61dca83f3623b9e7b5db1 Author: xx789 <[email protected]> AuthorDate: Sun Feb 8 22:35:51 2026 +0800 [flink] Support "lookup.insert-if-not-exists" option for Flink Lookup Join (#2601) --- .../apache/fluss/flink/FlinkConnectorOptions.java | 10 +++ .../fluss/flink/catalog/FlinkTableFactory.java | 2 + .../fluss/flink/source/FlinkTableSource.java | 10 ++- .../source/lookup/FlinkAsyncLookupFunction.java | 7 +- .../flink/source/lookup/FlinkLookupFunction.java | 7 +- .../apache/fluss/flink/utils/PushdownUtils.java | 3 +- .../fluss/flink/source/FlinkTableSourceITCase.java | 80 ++++++++++++++++++++++ .../source/lookup/FlinkLookupFunctionTest.java | 12 ++-- website/docs/engine-flink/options.md | 29 ++++---- 9 files changed, 137 insertions(+), 23 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 6fd5d147f..e00f131f1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -86,6 +86,16 @@ public class FlinkConnectorOptions { .defaultValue(true) .withDescription("Whether to set async lookup. Default is true."); + public static final ConfigOption<Boolean> LOOKUP_INSERT_IF_NOT_EXISTS = + ConfigOptions.key("lookup.insert-if-not-exists") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable insert-if-not-exists behavior for the lookup operation. " + + "When enabled, if a lookup does not find a matching row, a new row will be inserted " + + "with the lookup key values. This feature cannot be used with PREFIX_LOOKUP type. " + + "Default is false."); + // -------------------------------------------------------------------------------------------- // Scan specific options // -------------------------------------------------------------------------------------------- diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index a7a5608de..50223b520 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -153,6 +153,7 @@ public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTabl isStreamingMode, startupOptions, tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC), + tableOptions.get(FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS), cache, partitionDiscoveryIntervalMs, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), @@ -214,6 +215,7 @@ public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTabl FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP, FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, FlinkConnectorOptions.LOOKUP_ASYNC, + FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS, FlinkConnectorOptions.SINK_IGNORE_DELETE, FlinkConnectorOptions.SINK_BUCKET_SHUFFLE, FlinkConnectorOptions.SINK_DISTRIBUTION_MODE, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 8e234d86c..31377048c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -123,6 +123,7 @@ public class FlinkTableSource // options for lookup source private final boolean lookupAsync; + private final boolean insertIfNotExists; @Nullable private final LookupCache cache; private final long scanPartitionDiscoveryIntervalMs; @@ -161,6 +162,7 @@ public class FlinkTableSource boolean streaming, FlinkConnectorOptionsUtils.StartupOptions startupOptions, boolean lookupAsync, + boolean insertIfNotExists, @Nullable LookupCache cache, long scanPartitionDiscoveryIntervalMs, boolean isDataLakeEnabled, @@ -177,6 +179,7 @@ public class FlinkTableSource this.startupOptions = checkNotNull(startupOptions, "startupOptions must not be null"); this.lookupAsync = lookupAsync; + this.insertIfNotExists = insertIfNotExists; this.cache = cache; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; @@ -385,7 +388,8 @@ public class FlinkTableSource tablePath, tableOutputType, lookupNormalizer, - projectedFields); + projectedFields, + insertIfNotExists); if (cache != null) { return PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache); } else { @@ -398,7 +402,8 @@ public class FlinkTableSource tablePath, tableOutputType, lookupNormalizer, - projectedFields); + projectedFields, + insertIfNotExists); if (cache != null) { return PartialCachingLookupProvider.of(lookupFunction, cache); } else { @@ -420,6 +425,7 @@ public class FlinkTableSource streaming, startupOptions, lookupAsync, + insertIfNotExists, cache, scanPartitionDiscoveryIntervalMs, isDataLakeEnabled, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java index 82fd1bbc3..78b4fb8ed 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -63,6 +63,7 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { private final RowType flinkRowType; private final LookupNormalizer lookupNormalizer; @Nullable private int[] projection; + private final boolean insertIfNotExists; private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter; private transient Connection connection; @@ -75,12 +76,14 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { TablePath tablePath, RowType flinkRowType, LookupNormalizer lookupNormalizer, - @Nullable int[] projection) { + @Nullable int[] projection, + boolean insertIfNotExists) { this.flussConfig = flussConfig; this.tablePath = tablePath; this.flinkRowType = flinkRowType; this.lookupNormalizer = lookupNormalizer; this.projection = projection; + this.insertIfNotExists = insertIfNotExists; } @Override @@ -110,6 +113,8 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes); lookup = lookup.lookupBy(lookupKeyRowType.getFieldNames()); + } else if (insertIfNotExists) { + lookup = lookup.enableInsertIfNotExists(); } lookuper = lookup.createLookuper(); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java index 477666737..a2bda2fec 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java @@ -58,6 +58,7 @@ public class FlinkLookupFunction extends LookupFunction { private final RowType flinkRowType; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; + private final boolean insertIfNotExists; private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter; private transient Connection connection; @@ -71,12 +72,14 @@ public class FlinkLookupFunction extends LookupFunction { TablePath tablePath, RowType flinkRowType, LookupNormalizer lookupNormalizer, - @Nullable int[] projection) { + @Nullable int[] projection, + boolean insertIfNotExists) { this.flussConfig = flussConfig; this.tablePath = tablePath; this.flinkRowType = flinkRowType; this.lookupNormalizer = lookupNormalizer; this.projection = projection; + this.insertIfNotExists = insertIfNotExists; } @Override @@ -109,6 +112,8 @@ public class FlinkLookupFunction extends LookupFunction { int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); RowType lookupKeyRowType = FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes); lookup = lookup.lookupBy(lookupKeyRowType.getFieldNames()); + } else if (insertIfNotExists) { + lookup = lookup.enableInsertIfNotExists(); } lookuper = lookup.createLookuper(); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java index 1d1f39853..16cf55a41 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java @@ -271,7 +271,8 @@ public class PushdownUtils { tablePath, sourceOutputType, lookupNormalizer, - projectedFields); + projectedFields, + false); try { // it's fine to pass null here, as we don't use it in it lookupFunction.open(null); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 5d903cb8d..2de475774 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -966,6 +966,86 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { .hasMessage("Full lookup caching is not supported yet."); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testLookupInsertIfNotExists(boolean async) throws Exception { + // use single parallelism to make result ordering stable + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + String uidMappingTable = String.format("uid_mapping_%s", async ? "async" : "sync"); + // Create uid_mapping table: uid (STRING, PK), uid_int32 (INT, auto-increment) + tEnv.executeSql( + String.format( + "create table %s (" + + " uid varchar not null," + + " uid_int32 int," + + " primary key (uid) NOT ENFORCED" + + ") with ('auto-increment.fields' = 'uid_int32', 'bucket.num' = '1')", + uidMappingTable)); + + // Create ods_table as a Flink temporary source table + List<Row> odsData = + Arrays.asList( + Row.of("CN", "Beijing", "Haidian", "2025-01-01", "user_a"), + Row.of("CN", "Shanghai", "Pudong", "2025-01-02", "user_b"), + Row.of("US", "California", "LA", "2025-01-03", "user_a"), + Row.of("JP", "Tokyo", "Shibuya", "2025-01-04", "user_c")); + + Schema odsSchema = + Schema.newBuilder() + .column("country", DataTypes.STRING()) + .column("prov", DataTypes.STRING()) + .column("city", DataTypes.STRING()) + .column("ymd", DataTypes.STRING()) + .column("uid", DataTypes.STRING()) + .columnByExpression("proctime", "PROCTIME()") + .build(); + RowTypeInfo odsTypeInfo = + new RowTypeInfo( + new TypeInformation[] { + Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING + }, + new String[] {"country", "prov", "city", "ymd", "uid"}); + DataStream<Row> odsDs = execEnv.fromCollection(odsData).returns(odsTypeInfo); + tEnv.dropTemporaryView("ods_src"); + tEnv.createTemporaryView("ods_src", tEnv.fromDataStream(odsDs, odsSchema)); + + // Perform lookup join with insertIfNotExists hint + String lookupAsyncOption = async ? "'lookup.async' = 'true'" : "'lookup.async' = 'false'"; + String joinQuery = + String.format( + "SELECT ods.country, ods.prov, ods.city, ods.ymd, ods.uid, dim.uid_int32 " + + "FROM ods_src AS ods " + + "JOIN %s /*+ OPTIONS('lookup.insert-if-not-exists' = 'true', %s) */ " + + "FOR SYSTEM_TIME AS OF ods.proctime AS dim " + + "ON dim.uid = ods.uid", + uidMappingTable, lookupAsyncOption); + + CloseableIterator<Row> joinResult = tEnv.executeSql(joinQuery).collect(); + + // user_a appears twice, user_b and user_c appear once each + // With insertIfNotExists, uid_mapping should auto-create entries: + // user_a -> uid_int32=1, user_b -> uid_int32=2, user_a (2nd) reuses uid_int32=1, + // user_c -> uid_int32=3 + List<String> expectedJoinResults = + Arrays.asList( + "+I[CN, Beijing, Haidian, 2025-01-01, user_a, 1]", + "+I[CN, Shanghai, Pudong, 2025-01-02, user_b, 2]", + "+I[US, California, LA, 2025-01-03, user_a, 1]", + "+I[JP, Tokyo, Shibuya, 2025-01-04, user_c, 3]"); + assertResultsIgnoreOrder(joinResult, expectedJoinResults, true); + + // Verify uid_mapping table has the correct data + List<String> expectedUidMappingResults = + Arrays.asList("+I[user_a, 1]", "+I[user_b, 2]", "+I[user_c, 3]"); + assertQueryResultExactOrder( + tEnv, + String.format( + "SELECT uid, uid_int32 FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */", + uidMappingTable), + expectedUidMappingResults); + } + @Test void testStreamingReadSinglePartitionPushDown() throws Exception { tEnv.executeSql( diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java index 4d136c865..760849dfb 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java @@ -65,7 +65,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase { flinkRowType, createPrimaryKeyLookupNormalizer(new int[] {0}, flinkRowType), // no projection when job compiling, new column added after that. - null); + null, + false); ListOutputCollector collector = new ListOutputCollector(); lookupFunction.setCollector(collector); @@ -104,7 +105,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase { flinkRowType, createPrimaryKeyLookupNormalizer(new int[] {0}, flinkRowType), // no projection when job compiling, new column added after that. - null); + null, + false); asyncLookupFunction.open(null); int[] rowKeys = new int[] {0, 1, 2, 3, 4, 3, 0}; @@ -156,7 +158,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase { tablePath, flinkRowType, createPrimaryKeyLookupNormalizer(new int[] {0}, flinkRowType), - new int[] {1, 0}); + new int[] {1, 0}, + false); ListOutputCollector collector = new ListOutputCollector(); lookupFunction.setCollector(collector); @@ -202,7 +205,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase { tablePath, flinkRowType, createPrimaryKeyLookupNormalizer(new int[] {0}, flinkRowType), - null); + null, + false); collector = new ListOutputCollector(); lookupFunction.setCollector(collector); lookupFunction.open(null); diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index 5f18a4b79..7f9463a73 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -110,20 +110,21 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) ## Lookup Options -| Option | Type | Default | Description | -|------------------------------------------|------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------| -| lookup.async | Boolean | true | Whether to use asynchronous lookup. Asynchronous lookup has better throughput performance than synchronous lookup. | -| lookup.cache | Enum | NONE | The caching strategy for this lookup table, including NONE, PARTIAL. | -| lookup.max-retries | Integer | 3 | The maximum allowed retries if a lookup operation fails. Setting this value will override option 'client.lookup.max-retries'. | -| lookup.partial-cache.expire-after-access | Duration | (None) | Duration to expire an entry in the cache after accessing. | -| lookup.partial-cache.expire-after-write | Duration | (None) | Duration to expire an entry in the cache after writing. | -| lookup.partial-cache.cache-missing-key | Boolean | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. | -| lookup.partial-cache.max-rows | Long | (None) | The maximum number of rows to store in the cache. | -| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. | -| client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. | -| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. | -| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | -| client.lookup.max-retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any lookup request that fails with a potentially transient error. | +| Option | Type | Default | Description | +|------------------------------------------|------------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| lookup.async | Boolean | true | Whether to use asynchronous lookup. Asynchronous lookup has better throughput performance than synchronous lookup. | +| lookup.insert-if-not-exists | Boolean | false | Whether to enable insert-if-not-exists behavior for the lookup operation. When enabled, if a lookup does not find a matching row, a new row will be inserted with the lookup key values. | +| lookup.cache | Enum | NONE | The caching strategy for this lookup table, including NONE, PARTIAL. | +| lookup.max-retries | Integer | 3 | The maximum allowed retries if a lookup operation fails. Setting this value will override option 'client.lookup.max-retries'. | +| lookup.partial-cache.expire-after-access | Duration | (None) | Duration to expire an entry in the cache after accessing. | +| lookup.partial-cache.expire-after-write | Duration | (None) | Duration to expire an entry in the cache after writing. | +| lookup.partial-cache.cache-missing-key | Boolean | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. | +| lookup.partial-cache.max-rows | Long | (None) | The maximum number of rows to store in the cache. | +| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. | +| client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. | +| client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. | +| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | +| client.lookup.max-retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any lookup request that fails with a potentially transient error. | ## Write Options
