This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new da244e12fc07 feat(flink): Support create table DDL without primary key 
(#18086)
da244e12fc07 is described below

commit da244e12fc078667b73561cfa295f8dbae8540c6
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Mar 10 15:41:55 2026 -0700

    feat(flink): Support create table DDL without primary key (#18086)
    
    Modified HoodiePipeline.getCreateHoodieTableDDL() to conditionally include
    the PRIMARY KEY clause only when pkField is not null, enabling keyless
    table creation for append-only/INSERT workloads.
    
    Added parameterized test for keyless writes in testHoodiePipelineBuilderSink
    that verifies data can be written without a primary key using INSERT 
operation.
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../java/org/apache/hudi/util/HoodiePipeline.java  | 20 +++++++++-----
 .../apache/hudi/sink/ITTestDataStreamWrite.java    | 31 +++++++++++++++++++---
 2 files changed, 40 insertions(+), 11 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
index cb0094f6e473..74b923dc99ab 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
@@ -200,15 +200,21 @@ public class HoodiePipeline {
     builder.append("create table ")
         .append(tableName)
         .append("(\n");
-    for (String field : fields) {
+    for (int i = 0; i < fields.size(); i++) {
       builder.append("  ")
-          .append(field)
-          .append(",\n");
+              .append(fields.get(i));
+      if (i == fields.size() - 1 && pkField == null) {
+        builder.append(")\n");
+      } else {
+        builder.append(",\n");
+      }
+    }
+    if (pkField != null) {
+      builder.append("  PRIMARY KEY(")
+              .append(pkField)
+              .append(") NOT ENFORCED\n")
+              .append(")\n");
     }
-    builder.append("  PRIMARY KEY(")
-        .append(pkField)
-        .append(") NOT ENFORCED\n")
-        .append(")\n");
     if (!partitionField.isEmpty()) {
       String partitions = partitionField
           .stream()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 595cea5f7b94..ca78aa704e83 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
@@ -93,6 +94,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class ITTestDataStreamWrite extends TestLogger {
 
   private static final Map<String, List<String>> EXPECTED = new HashMap<>();
+  private static final Map<String, List<String>> EXPECTED_KEYLESS = new 
HashMap<>();
   private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new 
HashMap<>();
   private static final Map<String, List<String>> EXPECTED_CHAINED_TRANSFORMER 
= new HashMap<>();
 
@@ -102,6 +104,13 @@ public class ITTestDataStreamWrite extends TestLogger {
     EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", 
"id6,par3,id6,Emma,20,6000,par3"));
     EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", 
"id8,par4,id8,Han,56,8000,par4"));
 
+    // Expected data for keyless writes (without _hoodie_record_key and 
_hoodie_partition_path)
+    // Format: uuid,name,age,ts(as timestamp string from 
Timestamp.toString()),partition
+    EXPECTED_KEYLESS.put("par1", Arrays.asList("id1,Danny,23,1970-01-01 
00:00:01.0,par1", "id2,Stephen,33,1970-01-01 00:00:02.0,par1"));
+    EXPECTED_KEYLESS.put("par2", Arrays.asList("id3,Julian,53,1970-01-01 
00:00:03.0,par2", "id4,Fabian,31,1970-01-01 00:00:04.0,par2"));
+    EXPECTED_KEYLESS.put("par3", Arrays.asList("id5,Sophia,18,1970-01-01 
00:00:05.0,par3", "id6,Emma,20,1970-01-01 00:00:06.0,par3"));
+    EXPECTED_KEYLESS.put("par4", Arrays.asList("id7,Bob,44,1970-01-01 
00:00:07.0,par4", "id8,Han,56,1970-01-01 00:00:08.0,par4"));
+
     EXPECTED_TRANSFORMER.put("par1", 
Arrays.asList("id1,par1,id1,Danny,24,1000,par1", 
"id2,par1,id2,Stephen,34,2000,par1"));
     EXPECTED_TRANSFORMER.put("par2", 
Arrays.asList("id3,par2,id3,Julian,54,3000,par2", 
"id4,par2,id4,Fabian,32,4000,par2"));
     EXPECTED_TRANSFORMER.put("par3", 
Arrays.asList("id5,par3,id5,Sophia,19,5000,par3", 
"id6,par3,id6,Emma,21,6000,par3"));
@@ -403,8 +412,9 @@ public class ITTestDataStreamWrite extends TestLogger {
     TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
   }
 
-  @Test
-  public void testHoodiePipelineBuilderSink() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testHoodiePipelineBuilderSink(boolean usePrimaryKey) throws 
Exception {
     StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
     Map<String, String> options = new HashMap<>();
     execEnv.getConfig().disableObjectReuse();
@@ -435,14 +445,27 @@ public class ITTestDataStreamWrite extends TestLogger {
         .column("age int")
         .column("`ts` timestamp(3)")
         .column("`partition` string")
-        .pk("uuid")
         .partition("partition")
         .options(options);
 
+    if (usePrimaryKey) {
+      builder = builder.pk("uuid");
+    } else {
+      // For keyless writes, use insert operation and set RECORD_KEY_FIELD to 
empty string
+      builder = builder
+          .option(FlinkOptions.RECORD_KEY_FIELD.key(), "")
+          .option(FlinkOptions.OPERATION.key(), 
WriteOperationType.INSERT.value());
+    }
+
     builder.sink(dataStream, false);
 
     execute(execEnv, false, "Api_Sink_Test");
-    TestData.checkWrittenDataCOW(tempFile, EXPECTED);
+    if (usePrimaryKey) {
+      TestData.checkWrittenDataCOW(tempFile, EXPECTED);
+    } else {
+      // For keyless writes, verify data was written (record keys are 
auto-generated)
+      TestData.checkWrittenDataCOW(tempFile, EXPECTED_KEYLESS, 
TestData::filterOutVariablesWithoutHudiMetadata);
+    }
   }
 
   @Test

Reply via email to