This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6436ef3ee2c [HUDI-5329] Spark reads hudi table error when flink
creates the table without preCombine fields (#7378)
6436ef3ee2c is described below
commit 6436ef3ee2cb887cf25643c5c99f1fe34a64ec68
Author: wuwenchi <[email protected]>
AuthorDate: Tue Feb 7 14:15:39 2023 +0800
[HUDI-5329] Spark reads hudi table error when flink creates the table
without preCombine fields (#7378)
* add default precombine fileds when create table
---------
Co-authored-by: 吴文池 <[email protected]>
---
.../apache/hudi/table/catalog/HoodieCatalog.java | 18 ++++++++++++
.../hudi/table/catalog/TestHoodieCatalog.java | 33 ++++++++++++++++++++--
2 files changed, 49 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 374bad86ab2..15637f575b5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -20,13 +20,16 @@ package org.apache.hudi.table.catalog;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
@@ -306,6 +309,21 @@ public class HoodieCatalog extends AbstractCatalog {
options.put(TableOptionProperties.PK_CONSTRAINT_NAME,
resolvedSchema.getPrimaryKey().get().getName());
options.put(TableOptionProperties.PK_COLUMNS, pkColumns);
+ // check preCombine
+ final String preCombineField =
conf.getString(FlinkOptions.PRECOMBINE_FIELD);
+ if (!resolvedSchema.getColumnNames().contains(preCombineField)) {
+ if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
+ throw new HoodieValidationException("Option '" +
FlinkOptions.PRECOMBINE_FIELD.key()
+ + "' is required for payload class: " +
DefaultHoodieRecordPayload.class.getName());
+ }
+ if
(preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
+ conf.setString(FlinkOptions.PRECOMBINE_FIELD,
FlinkOptions.NO_PRE_COMBINE);
+ } else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) {
+ throw new HoodieValidationException("Field " + preCombineField + "
does not exist in the table schema."
+ + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "'
option.");
+ }
+ }
+
if (resolvedTable.isPartitioned()) {
final String partitions = String.join(",",
resolvedTable.getPartitionKeys());
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 1246e140a5f..12b766ba18d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -18,12 +18,14 @@
package org.apache.hudi.table.catalog;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -150,14 +152,19 @@ public class TestHoodieCatalog {
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
File testDb = new File(tempFile, TEST_DEFAULT_DATABASE);
testDb.mkdir();
+
+ catalog = new HoodieCatalog("hudi",
Configuration.fromMap(getDefaultCatalogOption()));
+ catalog.open();
+ }
+
+ Map<String, String> getDefaultCatalogOption() {
Map<String, String> catalogOptions = new HashMap<>();
assertThrows(ValidationException.class,
() -> catalog = new HoodieCatalog("hudi",
Configuration.fromMap(catalogOptions)));
catalogPathStr = tempFile.getAbsolutePath();
catalogOptions.put(CATALOG_PATH.key(), catalogPathStr);
catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE);
- catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions));
- catalog.open();
+ return catalogOptions;
}
@AfterEach
@@ -227,6 +234,28 @@ public class TestHoodieCatalog {
() -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, false));
}
+ @Test
+ void testCreateTableWithoutPreCombineKey() {
+ Map<String, String> options = getDefaultCatalogOption();
+ options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(),
DefaultHoodieRecordPayload.class.getName());
+ catalog = new HoodieCatalog("hudi", Configuration.fromMap(options));
+ catalog.open();
+ ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+ assertThrows(HoodieValidationException.class,
+ () -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true),
+ "Option 'precombine.field' is required for payload class: "
+ + "org.apache.hudi.common.model.DefaultHoodieRecordPayload");
+
+ Map<String, String> options2 = getDefaultCatalogOption();
+ options2.put(FlinkOptions.PRECOMBINE_FIELD.key(), "not_exists");
+ catalog = new HoodieCatalog("hudi", Configuration.fromMap(options2));
+ catalog.open();
+ ObjectPath tablePath2 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2");
+ assertThrows(HoodieValidationException.class,
+ () -> catalog.createTable(tablePath2, EXPECTED_CATALOG_TABLE, true),
+ "Field not_exists does not exist in the table schema. Please check
'precombine.field' option.");
+ }
+
@Test
public void testListTable() throws Exception {
ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");