This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 95d0fb5d327 [HUDI-6627] Fix NPE when spark client writer schema is
null (#9335)
95d0fb5d327 is described below
commit 95d0fb5d3276936a3638baed31edc4d9fe0d1f34
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Aug 3 06:39:13 2023 +0530
[HUDI-6627] Fix NPE when spark client writer schema is null (#9335)
---
.../java/org/apache/hudi/table/HoodieTable.java | 5 +-
.../hudi/testutils/HoodieClientTestBase.java | 6 +-
.../apache/hudi/functional/TestWriteClient.java | 87 ++++++++++++++++++++++
3 files changed, 96 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 71295098f03..12584be55a4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -62,6 +62,7 @@ import
org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -825,7 +826,9 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
boolean shouldValidate = config.shouldValidateAvroSchema();
boolean allowProjection = config.shouldAllowAutoEvolutionColumnDrop();
if ((!shouldValidate && allowProjection)
- ||
getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
+ ||
getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()
+ || StringUtils.isNullOrEmpty(config.getSchema())
+ ) {
// Check not required
return;
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 454236b4278..569e8d36d89 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -158,7 +158,7 @@ public class HoodieClientTestBase extends
HoodieClientTestHarness {
*/
public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr,
IndexType indexType,
HoodieFailedWritesCleaningPolicy cleaningPolicy) {
- return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+ HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(2,
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withWriteStatusClass(MetadataMergeWriteStatus.class)
@@ -172,6 +172,10 @@ public class HoodieClientTestBase extends
HoodieClientTestHarness {
.withEnableBackupForRemoteFileSystemView(false) // Fail test if
problem connecting to timeline-server
.withRemoteServerPort(timelineServicePort)
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+ if (StringUtils.nonEmpty(schemaStr)) {
+ builder.withSchema(schemaStr);
+ }
+ return builder;
}
public HoodieSparkTable getHoodieTable(HoodieTableMetaClient metaClient,
HoodieWriteConfig config) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestWriteClient.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestWriteClient.java
new file mode 100644
index 00000000000..7acf6b2b6b0
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestWriteClient.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests write client functionality.
+ */
+@Tag("functional")
+public class TestWriteClient extends HoodieSparkClientTestBase {
+
+ @Test
+ public void testInertsWithEmptyCommitsHavingWriterSchemaAsNull() throws
Exception {
+ HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder().withAutoCommit(false);
+ addConfigsForPopulateMetaFields(cfgBuilder, false);
+ // Re-init meta client with write config props.
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ,
cfgBuilder.build().getProps());
+ SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());
+ try {
+ String firstCommit = "001";
+ int numRecords = 200;
+ JavaRDD<WriteStatus> result = insertFirstBatch(cfgBuilder.build(),
client, firstCommit, "000", numRecords, SparkRDDWriteClient::insert,
+ false, false, numRecords);
+ assertTrue(client.commit(firstCommit, result), "Commit should succeed");
+
+ // Re-init client with null writer schema.
+ cfgBuilder = getConfigBuilder((String) null).withAutoCommit(false);
+ addConfigsForPopulateMetaFields(cfgBuilder, false);
+ client = getHoodieWriteClient(cfgBuilder.build());
+ String secondCommit = "002";
+ client.startCommitWithTime(secondCommit);
+ JavaRDD<HoodieRecord> emptyRdd = context.emptyRDD();
+ result = client.insert(emptyRdd, secondCommit);
+ assertTrue(client.commit(secondCommit, result), "Commit should succeed");
+ // Schema Validations.
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ HoodieCommitMetadata metadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(),
HoodieCommitMetadata.class);
+ assertTrue(metadata.getExtraMetadata().get("schema").isEmpty());
+ TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
+ assertEquals(Schema.parse(TRIP_EXAMPLE_SCHEMA),
tableSchemaResolver.getTableAvroSchema(false));
+ // Data Validations.
+ Dataset<Row> df = sparkSession.read().format("hudi").load(basePath);
+ assertEquals(numRecords, df.collectAsList().size());
+ } finally {
+ client.close();
+ }
+ }
+}