This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6436ef3ee2c [HUDI-5329] Spark reads hudi table error when flink 
creates the table without preCombine fields (#7378)
6436ef3ee2c is described below

commit 6436ef3ee2cb887cf25643c5c99f1fe34a64ec68
Author: wuwenchi <[email protected]>
AuthorDate: Tue Feb 7 14:15:39 2023 +0800

    [HUDI-5329] Spark reads hudi table error when flink creates the table 
without preCombine fields (#7378)
    
    * add default precombine fileds when create table
    
    ---------
    
    Co-authored-by: 吴文池 <[email protected]>
---
 .../apache/hudi/table/catalog/HoodieCatalog.java   | 18 ++++++++++++
 .../hudi/table/catalog/TestHoodieCatalog.java      | 33 ++++++++++++++++++++--
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 374bad86ab2..15637f575b5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -20,13 +20,16 @@ package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
@@ -306,6 +309,21 @@ public class HoodieCatalog extends AbstractCatalog {
     options.put(TableOptionProperties.PK_CONSTRAINT_NAME, 
resolvedSchema.getPrimaryKey().get().getName());
     options.put(TableOptionProperties.PK_COLUMNS, pkColumns);
 
+    // check preCombine
+    final String preCombineField = 
conf.getString(FlinkOptions.PRECOMBINE_FIELD);
+    if (!resolvedSchema.getColumnNames().contains(preCombineField)) {
+      if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
+        throw new HoodieValidationException("Option '" + 
FlinkOptions.PRECOMBINE_FIELD.key()
+            + "' is required for payload class: " + 
DefaultHoodieRecordPayload.class.getName());
+      }
+      if 
(preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
+        conf.setString(FlinkOptions.PRECOMBINE_FIELD, 
FlinkOptions.NO_PRE_COMBINE);
+      } else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) {
+        throw new HoodieValidationException("Field " + preCombineField + " 
does not exist in the table schema."
+            + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' 
option.");
+      }
+    }
+
     if (resolvedTable.isPartitioned()) {
       final String partitions = String.join(",", 
resolvedTable.getPartitionKeys());
       conf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 1246e140a5f..12b766ba18d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -18,12 +18,14 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -150,14 +152,19 @@ public class TestHoodieCatalog {
         
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
     File testDb = new File(tempFile, TEST_DEFAULT_DATABASE);
     testDb.mkdir();
+
+    catalog = new HoodieCatalog("hudi", 
Configuration.fromMap(getDefaultCatalogOption()));
+    catalog.open();
+  }
+
+  Map<String, String> getDefaultCatalogOption() {
     Map<String, String> catalogOptions = new HashMap<>();
     assertThrows(ValidationException.class,
         () -> catalog = new HoodieCatalog("hudi", 
Configuration.fromMap(catalogOptions)));
     catalogPathStr = tempFile.getAbsolutePath();
     catalogOptions.put(CATALOG_PATH.key(), catalogPathStr);
     catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE);
-    catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions));
-    catalog.open();
+    return catalogOptions;
   }
 
   @AfterEach
@@ -227,6 +234,28 @@ public class TestHoodieCatalog {
         () -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, false));
   }
 
+  @Test
+  void testCreateTableWithoutPreCombineKey() {
+    Map<String, String> options = getDefaultCatalogOption();
+    options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), 
DefaultHoodieRecordPayload.class.getName());
+    catalog = new HoodieCatalog("hudi", Configuration.fromMap(options));
+    catalog.open();
+    ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+    assertThrows(HoodieValidationException.class,
+        () -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true),
+        "Option 'precombine.field' is required for payload class: "
+            + "org.apache.hudi.common.model.DefaultHoodieRecordPayload");
+
+    Map<String, String> options2 = getDefaultCatalogOption();
+    options2.put(FlinkOptions.PRECOMBINE_FIELD.key(), "not_exists");
+    catalog = new HoodieCatalog("hudi", Configuration.fromMap(options2));
+    catalog.open();
+    ObjectPath tablePath2 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2");
+    assertThrows(HoodieValidationException.class,
+        () -> catalog.createTable(tablePath2, EXPECTED_CATALOG_TABLE, true),
+        "Field not_exists does not exist in the table schema. Please check 
'precombine.field' option.");
+  }
+
   @Test
   public void testListTable() throws Exception {
     ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");

Reply via email to