This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 95d0fb5d327 [HUDI-6627] Fix NPE when spark client writer schema is 
null (#9335)
95d0fb5d327 is described below

commit 95d0fb5d3276936a3638baed31edc4d9fe0d1f34
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Aug 3 06:39:13 2023 +0530

    [HUDI-6627] Fix NPE when spark client writer schema is null (#9335)
---
 .../java/org/apache/hudi/table/HoodieTable.java    |  5 +-
 .../hudi/testutils/HoodieClientTestBase.java       |  6 +-
 .../apache/hudi/functional/TestWriteClient.java    | 87 ++++++++++++++++++++++
 3 files changed, 96 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 71295098f03..12584be55a4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -62,6 +62,7 @@ import 
org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -825,7 +826,9 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
     boolean shouldValidate = config.shouldValidateAvroSchema();
     boolean allowProjection = config.shouldAllowAutoEvolutionColumnDrop();
     if ((!shouldValidate && allowProjection)
-        || 
getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
+        || 
getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()
+        || StringUtils.isNullOrEmpty(config.getSchema())
+    ) {
       // Check not required
       return;
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 454236b4278..569e8d36d89 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -158,7 +158,7 @@ public class HoodieClientTestBase extends 
HoodieClientTestHarness {
    */
   public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, 
IndexType indexType,
                                                     
HoodieFailedWritesCleaningPolicy cleaningPolicy) {
-    return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+    HoodieWriteConfig.Builder builder = 
HoodieWriteConfig.newBuilder().withPath(basePath)
         .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
         .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
@@ -172,6 +172,10 @@ public class HoodieClientTestBase extends 
HoodieClientTestHarness {
             .withEnableBackupForRemoteFileSystemView(false) // Fail test if 
problem connecting to timeline-server
             .withRemoteServerPort(timelineServicePort)
             
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+    if (StringUtils.nonEmpty(schemaStr)) {
+      builder.withSchema(schemaStr);
+    }
+    return builder;
   }
 
   public HoodieSparkTable getHoodieTable(HoodieTableMetaClient metaClient, 
HoodieWriteConfig config) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestWriteClient.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestWriteClient.java
new file mode 100644
index 00000000000..7acf6b2b6b0
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestWriteClient.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests write client functionality.
+ */
+@Tag("functional")
+public class TestWriteClient extends HoodieSparkClientTestBase {
+
+  @Test
+  public void testInertsWithEmptyCommitsHavingWriterSchemaAsNull() throws 
Exception {
+    HoodieWriteConfig.Builder cfgBuilder = 
getConfigBuilder().withAutoCommit(false);
+    addConfigsForPopulateMetaFields(cfgBuilder, false);
+    // Re-init meta client with write config props.
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, 
cfgBuilder.build().getProps());
+    SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());
+    try {
+      String firstCommit = "001";
+      int numRecords = 200;
+      JavaRDD<WriteStatus> result = insertFirstBatch(cfgBuilder.build(), 
client, firstCommit, "000", numRecords, SparkRDDWriteClient::insert,
+          false, false, numRecords);
+      assertTrue(client.commit(firstCommit, result), "Commit should succeed");
+
+      // Re-init client with null writer schema.
+      cfgBuilder = getConfigBuilder((String) null).withAutoCommit(false);
+      addConfigsForPopulateMetaFields(cfgBuilder, false);
+      client = getHoodieWriteClient(cfgBuilder.build());
+      String secondCommit = "002";
+      client.startCommitWithTime(secondCommit);
+      JavaRDD<HoodieRecord> emptyRdd = context.emptyRDD();
+      result = client.insert(emptyRdd, secondCommit);
+      assertTrue(client.commit(secondCommit, result), "Commit should succeed");
+      // Schema Validations.
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
+      HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+      HoodieCommitMetadata metadata = 
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(),
 HoodieCommitMetadata.class);
+      assertTrue(metadata.getExtraMetadata().get("schema").isEmpty());
+      TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+      assertEquals(Schema.parse(TRIP_EXAMPLE_SCHEMA), 
tableSchemaResolver.getTableAvroSchema(false));
+      // Data Validations.
+      Dataset<Row> df = sparkSession.read().format("hudi").load(basePath);
+      assertEquals(numRecords, df.collectAsList().size());
+    } finally {
+      client.close();
+    }
+  }
+}

Reply via email to