This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2f6d24f53d9 [HUDI-6235] Update and Delete statements for Flink (#8749)
2f6d24f53d9 is described below
commit 2f6d24f53d9937d44bf208f26d7edcaaa4eda19b
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 24 11:06:47 2023 +0800
[HUDI-6235] Update and Delete statements for Flink (#8749)
* support UPDATE statement
* support DELETE statement
* keep backward compatibility for pre 1.17.0 releases
* fix the 1.17.0 bundle validation for incorrect flink runtime jar
---
.../org/apache/hudi/sink/StreamWriteFunction.java | 1 +
.../org/apache/hudi/table/HoodieTableSink.java | 24 +++++++++-
.../apache/hudi/util/DataModificationInfos.java | 34 ++++++++++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 54 ++++++++++++++++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 32 +++++++++++++
.../adapter/SupportsRowLevelDeleteAdapter.java | 33 +++++++++++++
.../adapter/SupportsRowLevelUpdateAdapter.java | 37 +++++++++++++++
.../adapter/SupportsRowLevelDeleteAdapter.java | 33 +++++++++++++
.../adapter/SupportsRowLevelUpdateAdapter.java | 37 +++++++++++++++
.../adapter/SupportsRowLevelDeleteAdapter.java | 33 +++++++++++++
.../adapter/SupportsRowLevelUpdateAdapter.java | 37 +++++++++++++++
.../adapter/SupportsRowLevelDeleteAdapter.java | 33 +++++++++++++
.../adapter/SupportsRowLevelUpdateAdapter.java | 37 +++++++++++++++
.../adapter/SupportsRowLevelDeleteAdapter.java | 42 +++++++++++++++++
.../adapter/SupportsRowLevelUpdateAdapter.java | 45 ++++++++++++++++++
.../base/build_flink1170hive313spark332.sh | 27 +++++++++++
packaging/bundle-validation/ci_run.sh | 4 +-
17 files changed, 540 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index c37b2325ca7..7dc6ea9c0f4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -186,6 +186,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
this.writeFunction = (records, instantTime) ->
this.writeClient.insert(records, instantTime);
break;
case UPSERT:
+ case DELETE: // shares the code path with UPSERT
this.writeFunction = (records, instantTime) ->
this.writeClient.upsert(records, instantTime);
break;
case INSERT_OVERWRITE:
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index af10b620e69..81b3a6eefd5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -19,6 +19,8 @@
package org.apache.hudi.table;
import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
+import org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter;
+import org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
@@ -26,10 +28,12 @@ import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes;
+import org.apache.hudi.util.DataModificationInfos;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -37,12 +41,18 @@ import
org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.types.logical.RowType;
+import java.util.List;
import java.util.Map;
/**
* Hoodie table sink.
*/
-public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
+public class HoodieTableSink implements
+ DynamicTableSink,
+ SupportsPartitioning,
+ SupportsOverwrite,
+ SupportsRowLevelDeleteAdapter,
+ SupportsRowLevelUpdateAdapter {
private final Configuration conf;
private final ResolvedSchema schema;
@@ -148,4 +158,16 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning,
// if there are explicit partitions, #applyStaticPartition would overwrite
the option.
this.conf.setString(FlinkOptions.OPERATION,
WriteOperationType.INSERT_OVERWRITE_TABLE.value());
}
+
+ @Override
+ public RowLevelDeleteInfoAdapter applyRowLevelDelete() {
+ this.conf.setString(FlinkOptions.OPERATION,
WriteOperationType.DELETE.value());
+ return DataModificationInfos.DEFAULT_DELETE_INFO;
+ }
+
+ @Override
+ public RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> list) {
+ this.conf.setString(FlinkOptions.OPERATION,
WriteOperationType.UPSERT.value());
+ return DataModificationInfos.DEFAULT_UPDATE_INFO;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java
new file mode 100644
index 00000000000..cc1e77607bb
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter;
+import org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter;
+
+/**
+ * Utilities for all kinds of data modification infos.
+ *
+ * @see SupportsRowLevelUpdateAdapter
+ * @see SupportsRowLevelDeleteAdapter
+ */
+public class DataModificationInfos {
+ public static final SupportsRowLevelDeleteAdapter.RowLevelDeleteInfoAdapter
DEFAULT_DELETE_INFO = new
SupportsRowLevelDeleteAdapter.RowLevelDeleteInfoAdapter() {};
+
+ public static final SupportsRowLevelUpdateAdapter.RowLevelUpdateInfoAdapter
DEFAULT_UPDATE_INFO = new
SupportsRowLevelUpdateAdapter.RowLevelUpdateInfoAdapter() {};
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 9f2e1e13cf8..9021e88c057 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1952,6 +1952,60 @@ public class ITTestHoodieDataSource {
assertRowsEquals(actualResult,
TestData.DATA_SET_INSERT_SEPARATE_PARTITION);
}
+ @ParameterizedTest
+ @MethodSource("indexAndTableTypeParams")
+ void testUpdateDelete(String indexType, HoodieTableType tableType) {
+ TableEnvironment tableEnv = batchTableEnv;
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.INDEX_TYPE, indexType)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+ // update EQ(IN)
+ final String update1 = "update t1 set age=18 where uuid in('id1', 'id2')";
+
+ execInsertSql(tableEnv, update1);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ List<RowData> expected1 = TestData.update(TestData.DATA_SET_SOURCE_INSERT,
2, 18, 0, 1);
+ assertRowsEquals(result1, expected1);
+
+ // update GT(>)
+ final String update2 = "update t1 set age=19 where uuid > 'id5'";
+
+ execInsertSql(tableEnv, update2);
+
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ List<RowData> expected2 = TestData.update(expected1, 2, 19, 5, 6, 7);
+ assertRowsEquals(result2, expected2);
+
+ // delete EQ(=)
+ final String update3 = "delete from t1 where uuid = 'id1'";
+
+ execInsertSql(tableEnv, update3);
+
+ List<Row> result3 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ List<RowData> expected3 = TestData.delete(expected2, 0);
+ assertRowsEquals(result3, expected3);
+
+ // delete LTE(<=)
+ final String update4 = "delete from t1 where uuid <= 'id5'";
+
+ execInsertSql(tableEnv, update4);
+
+ List<Row> result4 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ List<RowData> expected4 = TestData.delete(expected3, 0, 1, 2, 3);
+ assertRowsEquals(result4, expected4);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 263b72f489e..c2e505d9304 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -391,6 +391,38 @@ public class TestData {
return inserts;
}
+ /**
+ * Updates the rows with given value {@code val} at field index {@code idx}.
+ * All the target rows specified with range {@code targets} would be updated.
+ *
+ * <p>NOTE: only INT type is supported.
+ *
+ * @param dataset The rows to update
+ * @param idx The target field index
+ * @param val The new value
+ * @param targets The target row numbers to update, the number starts from 0
+ *
+ * @return Copied rows with new values setup.
+ */
+ public static List<RowData> update(List<RowData> dataset, int idx, int val,
int... targets) {
+ List<RowData> copied =
dataset.stream().map(TestConfigurations.SERIALIZER::copy).collect(Collectors.toList());
+ Arrays.stream(targets).forEach(target -> {
+ BinaryRowData rowData = (BinaryRowData) copied.get(target);
+ rowData.setInt(idx, val);
+ });
+ return copied;
+ }
+
+ /**
+ * Returns a copy of the given rows excluding the rows at indices {@code
targets}.
+ */
+ public static List<RowData> delete(List<RowData> dataset, int... targets) {
+ Set<Integer> exclude =
Arrays.stream(targets).boxed().collect(Collectors.toSet());
+ return IntStream.range(0, dataset.size())
+ .filter(i -> !exclude.contains(i))
+ .mapToObj(i ->
TestConfigurations.SERIALIZER.copy(dataset.get(i))).collect(Collectors.toList());
+ }
+
public static List<RowData> filterOddRows(List<RowData> rows) {
return filterRowsByIndexPredicate(rows, i -> i % 2 != 0);
}
diff --git
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..cd5c4eb891b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter clazz for {@code
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter {
+
+ RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+ /**
+ * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
+ */
+ interface RowLevelDeleteInfoAdapter {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..6a62763ec5b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@code
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter {
+
+ RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+ /**
+ * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+ */
+ interface RowLevelUpdateInfoAdapter {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..cd5c4eb891b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter clazz for {@code
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter {
+
+ RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+ /**
+ * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
+ */
+ interface RowLevelDeleteInfoAdapter {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..6a62763ec5b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@code
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter {
+
+ RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+ /**
+ * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+ */
+ interface RowLevelUpdateInfoAdapter {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..cd5c4eb891b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter clazz for {@code
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter {
+
+ RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+ /**
+ * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
+ */
+ interface RowLevelDeleteInfoAdapter {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..6a62763ec5b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@code
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter {
+
+ RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+ /**
+ * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+ */
+ interface RowLevelUpdateInfoAdapter {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..cd5c4eb891b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter clazz for {@code
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter {
+
+ RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+ /**
+ * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
+ */
+ interface RowLevelDeleteInfoAdapter {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..6a62763ec5b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@code
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter {
+
+ RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+ /**
+ * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+ */
+ interface RowLevelUpdateInfoAdapter {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..de0019d41bd
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+
+import javax.annotation.Nullable;
+
+/**
+ * Adapter clazz for {@link
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter extends SupportsRowLevelDelete {
+ @Override
+ default RowLevelDeleteInfo applyRowLevelDelete(@Nullable
RowLevelModificationScanContext context) {
+ return applyRowLevelDelete();
+ }
+
+ RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+ /**
+ * Adapter clazz for {@link SupportsRowLevelDelete.RowLevelDeleteInfo}.
+ */
+ interface RowLevelDeleteInfoAdapter extends RowLevelDeleteInfo {
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..17c785d4845
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@link
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter extends SupportsRowLevelUpdate {
+ @Override
+ default RowLevelUpdateInfo applyRowLevelUpdate(List<Column> updatedColumns,
@Nullable RowLevelModificationScanContext context) {
+ return applyRowLevelUpdate(updatedColumns);
+ }
+
+ RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+ /**
+ * Adapter clazz for {@link SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+ */
+ interface RowLevelUpdateInfoAdapter extends RowLevelUpdateInfo {
+ }
+}
diff --git a/packaging/bundle-validation/base/build_flink1170hive313spark332.sh
b/packaging/bundle-validation/base/build_flink1170hive313spark332.sh
new file mode 100755
index 00000000000..ae4858afcab
--- /dev/null
+++ b/packaging/bundle-validation/base/build_flink1170hive313spark332.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+docker build \
+ --build-arg HIVE_VERSION=3.1.3 \
+ --build-arg FLINK_VERSION=1.17.0 \
+ --build-arg SPARK_VERSION=3.3.2 \
+ --build-arg SPARK_HADOOP_VERSION=3 \
+ --build-arg HADOOP_VERSION=3.3.5 \
+ -t hudi-ci-bundle-validation-base:flink1170hive313spark332 .
+docker image tag hudi-ci-bundle-validation-base:flink1170hive313spark332
apachehudi/hudi-ci-bundle-validation-base:flink1170hive313spark332
diff --git a/packaging/bundle-validation/ci_run.sh
b/packaging/bundle-validation/ci_run.sh
index 17105d46e5b..42538f7a59f 100755
--- a/packaging/bundle-validation/ci_run.sh
+++ b/packaging/bundle-validation/ci_run.sh
@@ -88,12 +88,12 @@ elif [[ ${SPARK_RUNTIME} == 'spark3.3.2' ]]; then
HADOOP_VERSION=2.7.7
HIVE_VERSION=3.1.3
DERBY_VERSION=10.14.1.0
- FLINK_VERSION=1.15.3
+ FLINK_VERSION=1.17.0
SPARK_VERSION=3.3.2
SPARK_HADOOP_VERSION=2
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
- IMAGE_TAG=flink1153hive313spark332
+ IMAGE_TAG=flink1170hive313spark332
fi
# Copy bundle jars to temp dir for mounting