This is an automated email from the ASF dual-hosted git repository.
pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new da244e12fc07 feat(flink): Support create table DDL without primary key
(#18086)
da244e12fc07 is described below
commit da244e12fc078667b73561cfa295f8dbae8540c6
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Mar 10 15:41:55 2026 -0700
feat(flink): Support create table DDL without primary key (#18086)
Modified HoodiePipeline.getCreateHoodieTableDDL() to conditionally include
the PRIMARY KEY clause only when pkField is not null, enabling keyless
table creation for append-only/INSERT workloads.
Added parameterized test for keyless writes in testHoodiePipelineBuilderSink
that verifies data can be written without a primary key using INSERT
operation.
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../java/org/apache/hudi/util/HoodiePipeline.java | 20 +++++++++-----
.../apache/hudi/sink/ITTestDataStreamWrite.java | 31 +++++++++++++++++++---
2 files changed, 40 insertions(+), 11 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
index cb0094f6e473..74b923dc99ab 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
@@ -200,15 +200,21 @@ public class HoodiePipeline {
builder.append("create table ")
.append(tableName)
.append("(\n");
- for (String field : fields) {
+ for (int i = 0; i < fields.size(); i++) {
builder.append(" ")
- .append(field)
- .append(",\n");
+ .append(fields.get(i));
+ if (i == fields.size() - 1 && pkField == null) {
+ builder.append(")\n");
+ } else {
+ builder.append(",\n");
+ }
+ }
+ if (pkField != null) {
+ builder.append(" PRIMARY KEY(")
+ .append(pkField)
+ .append(") NOT ENFORCED\n")
+ .append(")\n");
}
- builder.append(" PRIMARY KEY(")
- .append(pkField)
- .append(") NOT ENFORCED\n")
- .append(")\n");
if (!partitionField.isEmpty()) {
String partitions = partitionField
.stream()
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 595cea5f7b94..ca78aa704e83 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
@@ -93,6 +94,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITTestDataStreamWrite extends TestLogger {
private static final Map<String, List<String>> EXPECTED = new HashMap<>();
+ private static final Map<String, List<String>> EXPECTED_KEYLESS = new
HashMap<>();
private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new
HashMap<>();
private static final Map<String, List<String>> EXPECTED_CHAINED_TRANSFORMER
= new HashMap<>();
@@ -102,6 +104,13 @@ public class ITTestDataStreamWrite extends TestLogger {
EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3",
"id6,par3,id6,Emma,20,6000,par3"));
EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4",
"id8,par4,id8,Han,56,8000,par4"));
+ // Expected data for keyless writes (without _hoodie_record_key and
_hoodie_partition_path)
+ // Format: uuid,name,age,ts(as timestamp string from
Timestamp.toString()),partition
+ EXPECTED_KEYLESS.put("par1", Arrays.asList("id1,Danny,23,1970-01-01
00:00:01.0,par1", "id2,Stephen,33,1970-01-01 00:00:02.0,par1"));
+ EXPECTED_KEYLESS.put("par2", Arrays.asList("id3,Julian,53,1970-01-01
00:00:03.0,par2", "id4,Fabian,31,1970-01-01 00:00:04.0,par2"));
+ EXPECTED_KEYLESS.put("par3", Arrays.asList("id5,Sophia,18,1970-01-01
00:00:05.0,par3", "id6,Emma,20,1970-01-01 00:00:06.0,par3"));
+ EXPECTED_KEYLESS.put("par4", Arrays.asList("id7,Bob,44,1970-01-01
00:00:07.0,par4", "id8,Han,56,1970-01-01 00:00:08.0,par4"));
+
EXPECTED_TRANSFORMER.put("par1",
Arrays.asList("id1,par1,id1,Danny,24,1000,par1",
"id2,par1,id2,Stephen,34,2000,par1"));
EXPECTED_TRANSFORMER.put("par2",
Arrays.asList("id3,par2,id3,Julian,54,3000,par2",
"id4,par2,id4,Fabian,32,4000,par2"));
EXPECTED_TRANSFORMER.put("par3",
Arrays.asList("id5,par3,id5,Sophia,19,5000,par3",
"id6,par3,id6,Emma,21,6000,par3"));
@@ -403,8 +412,9 @@ public class ITTestDataStreamWrite extends TestLogger {
TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
}
- @Test
- public void testHoodiePipelineBuilderSink() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testHoodiePipelineBuilderSink(boolean usePrimaryKey) throws
Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
Map<String, String> options = new HashMap<>();
execEnv.getConfig().disableObjectReuse();
@@ -435,14 +445,27 @@ public class ITTestDataStreamWrite extends TestLogger {
.column("age int")
.column("`ts` timestamp(3)")
.column("`partition` string")
- .pk("uuid")
.partition("partition")
.options(options);
+ if (usePrimaryKey) {
+ builder = builder.pk("uuid");
+ } else {
+ // For keyless writes, use insert operation and set RECORD_KEY_FIELD to
empty string
+ builder = builder
+ .option(FlinkOptions.RECORD_KEY_FIELD.key(), "")
+ .option(FlinkOptions.OPERATION.key(),
WriteOperationType.INSERT.value());
+ }
+
builder.sink(dataStream, false);
execute(execEnv, false, "Api_Sink_Test");
- TestData.checkWrittenDataCOW(tempFile, EXPECTED);
+ if (usePrimaryKey) {
+ TestData.checkWrittenDataCOW(tempFile, EXPECTED);
+ } else {
+ // For keyless writes, verify data was written (record keys are
auto-generated)
+ TestData.checkWrittenDataCOW(tempFile, EXPECTED_KEYLESS,
TestData::filterOutVariablesWithoutHudiMetadata);
+ }
}
@Test