This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eea01d1d76f [HUDI-8841] Fix schema validating exception during flink 
async clustering (#12598)
eea01d1d76f is described below

commit eea01d1d76f475bbda9f9b1ffb12025d1a020ee8
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jan 16 10:06:21 2025 +0800

    [HUDI-8841] Fix schema validating exception during flink async clustering 
(#12598)
---
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |  24 +++++
 .../hudi/sink/clustering/ClusteringOperator.java   |   6 +-
 .../sink/cluster/ITTestHoodieFlinkClustering.java  | 100 ++++++++++++++++-----
 3 files changed, 107 insertions(+), 23 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 18fdedca50f..e4a5fe8ecf1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -28,6 +28,9 @@ import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaCompatibility;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +45,9 @@ import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.CollectionUtils.reduce;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert;
 
 /**
  * Utils for Avro Schema.
@@ -591,4 +596,23 @@ public class AvroSchemaUtils {
   public static String createSchemaErrorString(String errorMessage, Schema 
writerSchema, Schema tableSchema) {
     return String.format("%s\nwriterSchema: %s\ntableSchema: %s", 
errorMessage, writerSchema, tableSchema);
   }
+
+  /**
+   * Create a new schema by force changing all the fields as nullable.
+   *
+   * @param schema original schema
+   * @return a new schema with all the fields updated as nullable.
+   */
+  public static Schema asNullable(Schema schema) {
+    List<String> filterCols = schema.getFields().stream()
+            .filter(f -> 
!f.schema().isNullable()).map(Schema.Field::name).collect(Collectors.toList());
+    if (filterCols.isEmpty()) {
+      return schema;
+    }
+    InternalSchema internalSchema = convert(schema);
+    TableChanges.ColumnUpdateChange schemaChange = 
TableChanges.ColumnUpdateChange.get(internalSchema);
+    schemaChange = reduce(filterCols, schemaChange,
+            (change, field) -> change.updateColumnNullability(field, true));
+    return convert(SchemaChangeUtils.applyTableChanges2Schema(internalSchema, 
schemaChange), schema.getFullName());
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 89f203c6eda..5ba2a5c3922 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.clustering;
 
 import org.apache.hudi.adapter.MaskingOutputAdapter;
 import org.apache.hudi.adapter.Utils;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -167,7 +168,10 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     this.table = writeClient.getHoodieTable();
 
     this.schema = AvroSchemaConverter.convertToSchema(rowType);
-    this.readerSchema = this.schema;
+    // Since there exists discrepancies between flink and spark dealing with 
nullability of primary key field,
+    // and there may be some files written by spark, force update schema as 
nullable to make sure clustering
+    // scan successfully without schema validating exception.
+    this.readerSchema = AvroSchemaUtils.asNullable(schema);
     this.requiredPos = getRequiredPositions();
 
     this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(rowType);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index f1e72a9ad01..5d0c8dc61e2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -73,6 +73,8 @@ import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.util.HashMap;
@@ -533,6 +535,82 @@ public class ITTestHoodieFlinkClustering {
     // wait for the asynchronous commit to finish
     TimeUnit.SECONDS.sleep(3);
 
+    runCluster(rowType);
+
+    // test output
+    final Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, 
id2,par1,id2,Stephen,33,2100001,par1]");
+    expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, 
id4,par2,id4,Fabian,31,4100001,par2]");
+    expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, 
id6,par3,id6,Emma,20,6100001,par3]");
+    expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, 
id8,par4,id8,Han,56,8100001,par4]");
+    TestData.checkWrittenData(tempFile, expected, 4);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testInsertWithDifferentRecordKeyNullabilityAndClustering(boolean 
withPk) throws Exception {
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+
+    // if create a table without primary key, the nullability of the record 
key field is nullable
+    // otherwise, the nullability is not nullable.
+    String pkConstraint = withPk ? ",  primary key (uuid) not enforced\n" : "";
+    String tblWithoutPkDDL = "create table t1(\n"
+        + "  `uuid` VARCHAR(20)\n"
+        + ",  `name` VARCHAR(10)\n"
+        + ",  `age` INT\n"
+        + ",  `ts` TIMESTAMP(3)\n"
+        + ",  `partition` VARCHAR(10)\n"
+        + pkConstraint
+        + ")\n"
+        + "PARTITIONED BY (`partition`)\n"
+        + "with (\n"
+        + "  'connector' = 'hudi',\n"
+        + "  'hoodie.datasource.write.recordkey.field' = 'uuid',\n"
+        + "  'path' = '" + tempFile.getAbsolutePath() + "'\n"
+        + ")";
+    tableEnv.executeSql(tblWithoutPkDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+    final RowType rowType = (RowType) DataTypes.ROW(
+            DataTypes.FIELD("uuid", DataTypes.VARCHAR(20).notNull()), // 
primary key set as not null
+            DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+            DataTypes.FIELD("age", DataTypes.INT()),
+            DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
+            DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+        .notNull().getLogicalType();
+
+    // run cluster with row type
+    runCluster(rowType);
+
+    final Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1000,par1, 
id2,par1,id2,Stephen,33,2000,par1]");
+    expected.put("par2", "[id3,par2,id3,Julian,53,3000,par2, 
id4,par2,id4,Fabian,31,4000,par2]");
+    expected.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, 
id6,par3,id6,Emma,20,6000,par3]");
+    expected.put("par4", "[id7,par4,id7,Bob,44,7000,par4, 
id8,par4,id8,Han,56,8000,par4]");
+    TestData.checkWrittenData(tempFile, expected, 4);
+  }
+
+  @Test
+  public void testOfflineClusterFailoverAfterCommit() throws Exception {
+    StreamTableEnvironment tableEnv = prepareEnvAndTable();
+
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.targetPartitions = 4;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+    assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
+
+    Table result = tableEnv.sqlQuery("select count(*) from t1");
+    assertEquals(16L, tableEnv.toDataStream(result, 
Row.class).executeAndCollect(1).get(0).getField(0));
+  }
+
+  /**
+   * schedule clustering, run clustering.
+   */
+  private void runCluster(RowType rowType) throws Exception {
     // make configuration and setAvroSchema.
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
     FlinkClusteringConfig cfg = new FlinkClusteringConfig();
@@ -604,31 +682,9 @@ public class ITTestHoodieFlinkClustering {
           .setParallelism(1);
 
       env.execute("flink_hudi_clustering");
-
-      // test output
-      final Map<String, String> expected = new HashMap<>();
-      expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, 
id2,par1,id2,Stephen,33,2100001,par1]");
-      expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, 
id4,par2,id4,Fabian,31,4100001,par2]");
-      expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, 
id6,par3,id6,Emma,20,6100001,par3]");
-      expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, 
id8,par4,id8,Han,56,8100001,par4]");
-      TestData.checkWrittenData(tempFile, expected, 4);
     }
   }
 
-  @Test
-  public void testOfflineClusterFailoverAfterCommit() throws Exception {
-    StreamTableEnvironment tableEnv = prepareEnvAndTable();
-
-    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
-    cfg.path = tempFile.getAbsolutePath();
-    cfg.targetPartitions = 4;
-    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
-    assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
-
-    Table result = tableEnv.sqlQuery("select count(*) from t1");
-    assertEquals(16L, tableEnv.toDataStream(result, 
Row.class).executeAndCollect(1).get(0).getField(0));
-  }
-
   private StreamTableEnvironment prepareEnvAndTable() {
     // Create hoodie table and insert into data.
     Configuration conf = new org.apache.flink.configuration.Configuration();

Reply via email to