This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 136d0755ad7 [HUDI-7500] fix gaps with deduce schema and null schema 
(#10858)
136d0755ad7 is described below

commit 136d0755ad7f86d2f4b7b4813a59096c233055e7
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Mar 27 17:27:27 2024 -0400

    [HUDI-7500] fix gaps with deduce schema and null schema (#10858)
    
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../main/scala/org/apache/hudi/DefaultSource.scala |   7 +-
 .../utilities/streamer/SourceFormatAdapter.java    |   2 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |  51 ++++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   4 +-
 .../streamer/TestStreamSyncUnitTests.java          | 192 +++++++++++++++++++++
 5 files changed, 241 insertions(+), 15 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index c346f7665df..be3d2f4ed4b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -74,7 +74,12 @@ class DefaultSource extends RelationProvider
   override def createRelation(sqlContext: SQLContext,
                               parameters: Map[String, String]): BaseRelation = 
{
     try {
-      createRelation(sqlContext, parameters, null)
+      val relation = createRelation(sqlContext, parameters, null)
+      if (relation.schema.isEmpty) {
+        new EmptyRelation(sqlContext, new StructType())
+      } else {
+        relation
+      }
     } catch {
       case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, 
new StructType())
       case e => throw e
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index f29404701db..1796c96dab8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -62,7 +62,7 @@ import static 
org.apache.hudi.utilities.streamer.BaseErrorTableWriter.ERROR_TABL
 /**
  * Adapts data-format provided by the source to the data-format required by 
the client (DeltaStreamer).
  */
-public final class SourceFormatAdapter implements Closeable {
+public class SourceFormatAdapter implements Closeable {
 
   private final Source source;
   private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 1453e9fd07c..ded5348ed8f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -255,6 +256,31 @@ public class StreamSync implements Serializable, Closeable 
{
 
   private final boolean useRowWriter;
 
+  @VisibleForTesting
+  StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
+             TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
+             Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient, SchemaProvider userProvidedSchemaProvider,
+             Option<BaseErrorTableWriter> errorTableWriter, 
SourceFormatAdapter formatAdapter, Option<Transformer> transformer,
+             boolean useRowWriter, boolean autoGenerateRecordKeys) {
+    this.cfg = cfg;
+    this.hoodieSparkContext = hoodieSparkContext;
+    this.sparkSession = sparkSession;
+    this.fs = fs;
+    this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
+    this.props = props;
+    this.userProvidedSchemaProvider = userProvidedSchemaProvider;
+    this.processedSchema = new SchemaSet();
+    this.autoGenerateRecordKeys = autoGenerateRecordKeys;
+    this.keyGenClassName = getKeyGeneratorClassName(new 
TypedProperties(props));
+    this.conf = conf;
+
+    this.errorTableWriter = errorTableWriter;
+    this.formatAdapter = formatAdapter;
+    this.transformer = transformer;
+    this.useRowWriter = useRowWriter;
+
+  }
+
   @Deprecated
   public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                     TypedProperties props, JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
@@ -553,7 +579,8 @@ public class StreamSync implements Serializable, Closeable {
    * @param resumeCheckpointStr checkpoint to resume from source.
    * @return {@link InputBatch} containing the new batch of data from source 
along with new checkpoint and schema provider instance to use.
    */
-  private InputBatch fetchNextBatchFromSource(Option<String> 
resumeCheckpointStr, HoodieTableMetaClient metaClient) {
+  @VisibleForTesting
+  InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr, 
HoodieTableMetaClient metaClient) {
     Option<JavaRDD<GenericRecord>> avroRDDOptional = null;
     String checkpointStr = null;
     SchemaProvider schemaProvider = null;
@@ -574,12 +601,12 @@ public class StreamSync implements Serializable, 
Closeable {
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null
           && this.userProvidedSchemaProvider.getTargetSchema() != 
InputBatch.NULL_SCHEMA) {
+        // Let's deduce the schema provider for writer side first!
+        schemaProvider = 
getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(), 
this.userProvidedSchemaProvider, metaClient);
         if (useRowWriter) {
-          inputBatchForWriter = new InputBatch(transformed, checkpointStr, 
this.userProvidedSchemaProvider);
+          inputBatchForWriter = new InputBatch(transformed, checkpointStr, 
schemaProvider);
         } else {
           // non row writer path
-          // Let's deduce the schema provider for writer side first!
-          schemaProvider = 
getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(), 
this.userProvidedSchemaProvider, metaClient);
           SchemaProvider finalSchemaProvider = schemaProvider;
           // If the target schema is specified through Avro schema,
           // pass in the schema for the Row-to-Avro conversion
@@ -607,11 +634,10 @@ public class StreamSync implements Serializable, 
Closeable {
       } else {
         // Deduce proper target (writer's) schema for the input dataset, 
reconciling its
         // schema w/ the table's one
-        Option<Schema> incomingSchemaOpt = transformed.map(df ->
-            AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), 
getAvroRecordQualifiedName(cfg.targetTableName)));
-
-        schemaProvider = incomingSchemaOpt.map(incomingSchema -> 
getDeducedSchemaProvider(incomingSchema, dataAndCheckpoint.getSchemaProvider(), 
metaClient))
-            .orElseGet(dataAndCheckpoint::getSchemaProvider);
+        Schema incomingSchema = transformed.map(df ->
+                AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), 
getAvroRecordQualifiedName(cfg.targetTableName)))
+            .orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetSchema);
+        schemaProvider = getDeducedSchemaProvider(incomingSchema, 
dataAndCheckpoint.getSchemaProvider(), metaClient);
 
         if (useRowWriter) {
           inputBatchForWriter = new InputBatch(transformed, checkpointStr, 
schemaProvider);
@@ -623,7 +649,9 @@ public class StreamSync implements Serializable, Closeable {
       }
     } else {
       if (useRowWriter) {
-        inputBatchForWriter = 
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
+        InputBatch inputBatchNeedsDeduceSchema = 
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
+        inputBatchForWriter = new 
InputBatch<>(inputBatchNeedsDeduceSchema.getBatch(), 
inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(),
+            
getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema(),
 inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
       } else {
         // Pull the data from the source & prepare the write
         InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint = 
formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit);
@@ -662,7 +690,8 @@ public class StreamSync implements Serializable, Closeable {
    * @param sourceSchemaProvider Source schema provider.
    * @return the SchemaProvider that can be used as writer schema.
    */
-  private SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, 
SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
+  @VisibleForTesting
+  SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, 
SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
     Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, 
cfg.targetBasePath, metaClient);
     Option<InternalSchema> internalSchemaOpt = 
HoodieConversionUtils.toJavaOption(
         HoodieSchemaUtils.getLatestTableInternalSchema(
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 9f045bd606e..2a2c4dafb1e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -2134,8 +2134,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     String tableBasePath = basePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT, ParquetDFSSource.class.getName(),
-        null, PROPS_FILENAME_TEST_PARQUET, false,
-        false, 100000, false, null, null, "timestamp", null);
+        Collections.singletonList(TestIdentityTransformer.class.getName()), 
PROPS_FILENAME_TEST_PARQUET, false,
+        false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
 
     config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
     config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
new file mode 100644
index 00000000000..99148eb4b07
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieErrorTableConfig;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.transform.Transformer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestStreamSyncUnitTests {
+
+  @ParameterizedTest
+  @MethodSource("testCasesFetchNextBatchFromSource")
+  void testFetchNextBatchFromSource(Boolean useRowWriter, Boolean 
hasTransformer, Boolean hasSchemaProvider,
+                                    Boolean isNullTargetSchema, Boolean 
hasErrorTable, Boolean shouldTryWriteToErrorTable) {
+    //basic deltastreamer inputs
+    HoodieSparkEngineContext hoodieSparkEngineContext = 
mock(HoodieSparkEngineContext.class);
+    FileSystem fs = mock(FileSystem.class);
+    SparkSession sparkSession = mock(SparkSession.class);
+    Configuration configuration = mock(Configuration.class);
+    HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+    cfg.targetTableName = "testTableName";
+    cfg.targetBasePath = "/fake/table/name";
+    cfg.tableType = "MERGE_ON_READ";
+
+    //Source format adapter
+    SourceFormatAdapter sourceFormatAdapter = mock(SourceFormatAdapter.class);
+    SchemaProvider inputBatchSchemaProvider = getSchemaProvider("InputBatch", 
false);
+    Option<Dataset<Row>> fakeDataFrame = Option.of(mock(Dataset.class));
+    InputBatch<Dataset<Row>> fakeRowInputBatch = new 
InputBatch<>(fakeDataFrame, "chkpt", inputBatchSchemaProvider);
+    when(sourceFormatAdapter.fetchNewDataInRowFormat(any(), 
anyLong())).thenReturn(fakeRowInputBatch);
+    //batch is empty because we don't want getBatch().map() to do anything 
because it calls static method we can't mock
+    InputBatch<JavaRDD<GenericRecord>> fakeAvroInputBatch = new 
InputBatch<>(Option.empty(), "chkpt", inputBatchSchemaProvider);
+    
when(sourceFormatAdapter.fetchNewDataInAvroFormat(any(),anyLong())).thenReturn(fakeAvroInputBatch);
+
+    //transformer
+    //return empty because we don't want .map() to do anything because it 
calls static method we can't mock
+    when(sourceFormatAdapter.processErrorEvents(any(), 
any())).thenReturn(Option.empty());
+    Option<Transformer> transformerOption = Option.empty();
+    if (hasTransformer) {
+      transformerOption = Option.of(mock(Transformer.class));
+    }
+
+    //user provided schema provider
+    SchemaProvider schemaProvider = null;
+    if (hasSchemaProvider) {
+      schemaProvider = getSchemaProvider("UserProvided", isNullTargetSchema);
+    }
+
+    //error table
+    TypedProperties props = new TypedProperties();
+    props.put(DataSourceWriteOptions.RECONCILE_SCHEMA().key(), false);
+    Option<BaseErrorTableWriter> errorTableWriterOption = Option.empty();
+    if (hasErrorTable) {
+      errorTableWriterOption = Option.of(mock(BaseErrorTableWriter.class));
+      props.put(ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), true);
+    }
+    TypedProperties propsSpy = spy(props);
+
+
+    //Actually create the deltastreamer
+    StreamSync streamSync = new StreamSync(cfg, sparkSession, propsSpy, 
hoodieSparkEngineContext,
+        fs, configuration, client -> true, schemaProvider, 
errorTableWriterOption, sourceFormatAdapter, transformerOption, useRowWriter, 
false);
+    StreamSync spy = spy(streamSync);
+    SchemaProvider deducedSchemaProvider;
+    deducedSchemaProvider = getSchemaProvider("deduced", false);
+    doReturn(deducedSchemaProvider).when(spy).getDeducedSchemaProvider(any(), 
any(), any());
+
+    //run the method we are unit testing:
+    InputBatch batch = spy.fetchNextBatchFromSource(Option.empty(), 
mock(HoodieTableMetaClient.class));
+
+    //make sure getDeducedSchemaProvider is always called once
+    verify(spy, times(1)).getDeducedSchemaProvider(any(), any(), any());
+
+    //make sure the deduced schema is actually used
+    assertEquals(deducedSchemaProvider.getTargetSchema(), 
batch.getSchemaProvider().getTargetSchema());
+
+    //make sure we use error table when we should
+    verify(propsSpy, shouldTryWriteToErrorTable ? times(1) : never())
+        
.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+            
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue());
+  }
+
+  private SchemaProvider getSchemaProvider(String name, boolean 
isNullTargetSchema) {
+    SchemaProvider schemaProvider = mock(SchemaProvider.class);
+    Schema sourceSchema = mock(Schema.class);
+    Schema targetSchema = isNullTargetSchema ? InputBatch.NULL_SCHEMA : 
mock(Schema.class);
+    when(schemaProvider.getSourceSchema()).thenReturn(sourceSchema);
+    when(schemaProvider.getTargetSchema()).thenReturn(targetSchema);
+    when(sourceSchema.toString()).thenReturn(name + "SourceSchema");
+    if (!isNullTargetSchema) {
+      when(targetSchema.toString()).thenReturn(name + "TargetSchema");
+    }
+    return schemaProvider;
+  }
+
+  static Stream<Arguments> testCasesFetchNextBatchFromSource() {
+    Stream.Builder<Arguments> b = Stream.builder();
+
+    //no transformer
+    for (Boolean useRowWriter : new Boolean[]{false, true}) {
+      for (Boolean hasErrorTable : new Boolean[]{false, true}) {
+        boolean errorTableEnabled = hasErrorTable && !useRowWriter;
+        b.add(Arguments.of(useRowWriter, false, false, false,
+            hasErrorTable, errorTableEnabled));
+      }
+    }
+
+    //with transformer
+    for (Boolean useRowWriter : new Boolean[]{false, true}) {
+      for (Boolean hasSchemaProvider : new Boolean[]{false, true}) {
+        for (Boolean isNullTargetSchema : new Boolean[]{false, true}) {
+          for (Boolean hasErrorTable : new Boolean[]{false, true}) {
+            boolean errorTableEnabled = hasErrorTable && !useRowWriter;
+            boolean schemaProviderNullOrMissing = isNullTargetSchema || 
!hasSchemaProvider;
+            boolean shouldTryWriteToErrorTable = errorTableEnabled && 
!schemaProviderNullOrMissing;
+            b.add(Arguments.of(useRowWriter, true, hasSchemaProvider, 
isNullTargetSchema,
+                hasErrorTable, shouldTryWriteToErrorTable));
+          }
+        }
+      }
+    }
+    return b.build();
+  }
+}

Reply via email to