This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a97b01c026 [HUDI-7147] Fix npe stream sync first batch, empty schema, 
upsert (#10689)
3a97b01c026 is described below

commit 3a97b01c0263c4790ffa958b865c682f40b4ada4
Author: Jon Vexler <[email protected]>
AuthorDate: Sat Feb 17 01:14:38 2024 -0500

    [HUDI-7147] Fix npe stream sync first batch, empty schema, upsert (#10689)
    
    * fix npe
    
    * add empty table support as well
    
    * use empty relation
    
    * fix failing tests
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/common/HoodieSchemaNotFoundException.java | 28 ++++++++++++++++++++
 .../hudi/common/table/TableSchemaResolver.java     |  3 ++-
 .../convert/AvroInternalSchemaConverter.java       |  4 ++-
 .../main/scala/org/apache/hudi/DefaultSource.scala | 15 ++++++++---
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  4 +--
 .../hudi/HoodieHadoopFsRelationFactory.scala       |  3 +--
 .../apache/hudi/functional/TestCOWDataSource.scala | 30 +++++++++++++++++++++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 30 ++++++++++++++++++++++
 8 files changed, 105 insertions(+), 12 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java
new file mode 100644
index 00000000000..12d1498b974
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/HoodieSchemaNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+
+public class HoodieSchemaNotFoundException extends HoodieSchemaException {
+  public HoodieSchemaNotFoundException(String message) {
+    super(message);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index b71073abf0f..a8f46c416f9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.HoodieSchemaNotFoundException;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -588,6 +589,6 @@ public class TableSchemaResolver {
   }
 
   private Supplier<Exception> schemaNotFoundError() {
-    return () -> new IllegalArgumentException("No schema found for table at " 
+ metaClient.getBasePathV2().toString());
+    return () -> new HoodieSchemaNotFoundException("No schema found for table 
at " + metaClient.getBasePathV2().toString());
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
index 601718e6702..69977563e85 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
@@ -83,7 +83,9 @@ public class AvroInternalSchemaConverter {
    * @return an avro Schema where null is the first.
    */
   public static Schema fixNullOrdering(Schema schema) {
-    if (schema.getType() == Schema.Type.NULL) {
+    if (schema == null) {
+      return Schema.create(Schema.Type.NULL);
+    } else if (schema.getType() == Schema.Type.NULL) {
       return schema;
     }
     return convert(convert(schema), schema.getFullName());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 4f4ae20e310..c346f7665df 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.HoodieSchemaNotFoundException
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieReaderConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
@@ -33,14 +34,13 @@ import 
org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.util.PathUtils
-
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
 import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, 
HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, 
HoodieStreamSource}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConversions.mapAsJavaMap
@@ -73,7 +73,12 @@ class DefaultSource extends RelationProvider
 
   override def createRelation(sqlContext: SQLContext,
                               parameters: Map[String, String]): BaseRelation = 
{
-    createRelation(sqlContext, parameters, null)
+    try {
+      createRelation(sqlContext, parameters, null)
+    } catch {
+      case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, 
new StructType())
+      case e => throw e
+    }
   }
 
   override def createRelation(sqlContext: SQLContext,
@@ -373,7 +378,9 @@ object DefaultSource {
         AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
       } catch {
         case _: Exception =>
-          require(schema.isDefined, "Fail to resolve source schema")
+          if (schema.isEmpty || schema.get == null) {
+            throw new HoodieSchemaNotFoundException("Failed to resolve source 
schema")
+          }
           schema.get
       }
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index c69ca4815aa..9a870e6f9c5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -176,9 +176,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     } getOrElse {
       Try(schemaResolver.getTableAvroSchema) match {
         case Success(schema) => schema
-        case Failure(e) =>
-          logError("Failed to fetch schema from the table", e)
-          throw new HoodieSchemaException("Failed to fetch schema from the 
table")
+        case Failure(e) => throw e
       }
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 34088f2e699..e8ca19e2421 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -101,8 +101,7 @@ abstract class HoodieBaseHadoopFsRelationFactory(val 
sqlContext: SQLContext,
     } getOrElse {
       Try(schemaResolver.getTableAvroSchema) match {
         case Success(schema) => schema
-        case Failure(e) =>
-          throw new HoodieSchemaException("Failed to fetch schema from the 
table")
+        case Failure(e) => throw e
       }
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 39d093b7ffc..cb0209de979 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,7 +18,7 @@
 package org.apache.hudi.functional
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, 
KEYGENERATOR_CLASS_NAME}
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
@@ -1855,6 +1855,34 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     })
     assertEquals(3, clusterInstants.size)
   }
+
+
+  @Test
+  def testReadOfAnEmptyTable(): Unit = {
+    val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val fileStatuses = fs.listStatus(new Path(basePath + Path.SEPARATOR + 
HoodieTableMetaClient.METAFOLDER_NAME), new PathFilter {
+      override def accept(path: Path): Boolean = {
+        path.getName.endsWith(HoodieTimeline.COMMIT_ACTION)
+      }
+    })
+
+    // delete completed instant
+    fs.delete(fileStatuses.toList.get(0).getPath)
+    // try reading the empty table
+    val count = spark.read.format("hudi").load(basePath).count()
+    assertEquals(count, 0)
+  }
+
 }
 
 object TestCOWDataSource {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 4bfbee6e0e0..5294ae1b4c4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -2111,6 +2111,36 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     deltaStreamer2.shutdownGracefully();
   }
 
+  @Test
+  public void testEmptyBatchWithNullSchemaFirstBatch() throws Exception {
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 10;
+    prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, 
false, null, null);
+    prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", 
PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
+    HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT, ParquetDFSSource.class.getName(),
+        null, PROPS_FILENAME_TEST_PARQUET, false,
+        false, 100000, false, null, null, "timestamp", null);
+
+    config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
+    config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
+    HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
+    deltaStreamer1.sync();
+    deltaStreamer1.shutdownGracefully();
+    assertRecordCount(0, tableBasePath, sqlContext);
+
+    config.schemaProviderClassName = null;
+    config.sourceClassName = ParquetDFSSource.class.getName();
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
"2.parquet", false, null, null);
+    HoodieDeltaStreamer deltaStreamer2 = new HoodieDeltaStreamer(config, jsc);
+    deltaStreamer2.sync();
+    deltaStreamer2.shutdownGracefully();
+    //since first batch has empty schema, only records from the second batch 
should be written
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+  }
+
   @Test
   public void testDeltaStreamerRestartAfterMissingHoodieProps() throws 
Exception {
     testDeltaStreamerRestartAfterMissingHoodieProps(true);

Reply via email to