This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ddc6a89f90 [HUDI-6923] Fixing bug with sanitization for rowSource 
(#9834)
5ddc6a89f90 is described below

commit 5ddc6a89f90c2d43d6dc13d17c81dd8620232626
Author: harshal <[email protected]>
AuthorDate: Fri Oct 27 08:55:53 2023 +0530

    [HUDI-6923] Fixing bug with sanitization for rowSource (#9834)
---
 .../utilities/schema/FilebasedSchemaProvider.java  |  2 +-
 .../apache/hudi/utilities/sources/RowSource.java   |  6 ++--
 .../sources/helpers/SanitizationUtils.java         |  7 ++++-
 .../utilities/streamer/SourceFormatAdapter.java    | 20 ++------------
 .../deltastreamer/TestSourceFormatAdapter.java     | 32 ++++++++++++++--------
 5 files changed, 34 insertions(+), 33 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 4149535ed3b..3ca97b01f95 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -53,7 +53,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     super(props, jssc);
     checkRequiredConfigProperties(props, 
Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE));
     String sourceFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
-    boolean shouldSanitize = SanitizationUtils.getShouldSanitize(props);
+    boolean shouldSanitize = SanitizationUtils.shouldSanitize(props);
     String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
     this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true);
     this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs, 
shouldSanitize, invalidCharMask);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
index bd29ccae699..f2cc48f280c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
+import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -42,9 +43,10 @@ public abstract class RowSource extends Source<Dataset<Row>> 
{
   protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
     Pair<Option<Dataset<Row>>, String> res = fetchNextBatch(lastCkptStr, 
sourceLimit);
     return res.getKey().map(dsr -> {
+      Dataset<Row> sanitizedRows = 
SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
       SchemaProvider rowSchemaProvider =
-          UtilHelpers.createRowBasedSchemaProvider(dsr.schema(), props, 
sparkContext);
-      return new InputBatch<>(res.getKey(), res.getValue(), rowSchemaProvider);
+          UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), 
props, sparkContext);
+      return new InputBatch<>(Option.of(sanitizedRows), res.getValue(), 
rowSchemaProvider);
     }).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue()));
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
index d09b88d54b7..ac1d33f6b53 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
@@ -65,7 +65,7 @@ public class SanitizationUtils {
 
   private static final String AVRO_FIELD_NAME_KEY = "name";
 
-  public static boolean getShouldSanitize(TypedProperties props) {
+  public static boolean shouldSanitize(TypedProperties props) {
     return getBooleanWithAltKeys(props, 
HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES);
   }
 
@@ -120,6 +120,11 @@ public class SanitizationUtils {
     return targetDataset;
   }
 
+  public static Dataset<Row> sanitizeColumnNamesForAvro(Dataset<Row> 
inputDataset, TypedProperties props) {
+    return shouldSanitize(props) ? sanitizeColumnNamesForAvro(inputDataset, 
getInvalidCharMask(props))
+        : inputDataset;
+  }
+
   /*
    * We first rely on Avro to parse and then try to rename only for those 
failed.
    * This way we can improve our parsing capabilities without breaking 
existing functionality.
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index 4b2dff803a9..9f1b087900d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -78,7 +78,7 @@ public final class SourceFormatAdapter implements Closeable {
     this.source = source;
     this.errorTableWriter = errorTableWriter;
     if (props.isPresent()) {
-      this.shouldSanitize = SanitizationUtils.getShouldSanitize(props.get());
+      this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
       this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
     }
     if (this.shouldSanitize && source.getSourceType() == 
Source.SourceType.PROTO) {
@@ -102,20 +102,6 @@ public final class SourceFormatAdapter implements 
Closeable {
     return invalidCharMask;
   }
 
-  /**
-   * Sanitize all columns including nested ones as per Avro conventions.
-   * @param srcBatch
-   * @return sanitized batch.
-   */
-  private InputBatch<Dataset<Row>> 
maybeSanitizeFieldNames(InputBatch<Dataset<Row>> srcBatch) {
-    if (!isFieldNameSanitizingEnabled() || !srcBatch.getBatch().isPresent()) {
-      return srcBatch;
-    }
-    Dataset<Row> srcDs = srcBatch.getBatch().get();
-    Dataset<Row> targetDs = 
SanitizationUtils.sanitizeColumnNamesForAvro(srcDs, getInvalidCharMask());
-    return new InputBatch<>(Option.ofNullable(targetDs), 
srcBatch.getCheckpointForNextBatch(), srcBatch.getSchemaProvider());
-  }
-
   /**
    * transform input rdd of json string to generic records with support for 
adding error events to error table
    * @param inputBatch
@@ -172,7 +158,7 @@ public final class SourceFormatAdapter implements Closeable 
{
       }
       case ROW: {
         //we do the sanitizing here if enabled
-        InputBatch<Dataset<Row>> r = 
maybeSanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, 
sourceLimit));
+        InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) 
source).fetchNext(lastCkptStr, sourceLimit);
         return new InputBatch<>(Option.ofNullable(r.getBatch().map(
             rdd -> {
                 SchemaProvider originalProvider = 
UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
@@ -219,7 +205,7 @@ public final class SourceFormatAdapter implements Closeable 
{
     switch (source.getSourceType()) {
       case ROW:
         //we do the sanitizing here if enabled
-        InputBatch<Dataset<Row>> datasetInputBatch = 
maybeSanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, 
sourceLimit));
+        InputBatch<Dataset<Row>> datasetInputBatch = ((Source<Dataset<Row>>) 
source).fetchNext(lastCkptStr, sourceLimit);
         return new 
InputBatch<>(processErrorEvents(datasetInputBatch.getBatch(),
             ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE),
             datasetInputBatch.getCheckpointForNextBatch(), 
datasetInputBatch.getSchemaProvider());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index 8b74ab7bc20..30b997e856a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -19,11 +19,15 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
+import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
+import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.RowSource;
 import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.testutils.SanitizationTestUtils;
@@ -80,10 +84,9 @@ public class TestSourceFormatAdapter {
     testJsonDataSource = null;
   }
 
-  private void setupRowSource(Dataset<Row> ds) {
-    SchemaProvider nullSchemaProvider = new InputBatch.NullSchemaProvider();
-    InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds), 
DUMMY_CHECKPOINT, nullSchemaProvider);
-    testRowDataSource = new TestRowDataSource(new TypedProperties(), jsc, 
spark, nullSchemaProvider, batch);
+  private void setupRowSource(Dataset<Row> ds, TypedProperties properties, 
SchemaProvider schemaProvider) {
+    InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds), 
DUMMY_CHECKPOINT, schemaProvider);
+    testRowDataSource = new TestRowDataSource(properties, jsc, spark, 
schemaProvider, batch);
   }
 
   private void setupJsonSource(JavaRDD<String> ds, Schema schema) {
@@ -92,11 +95,11 @@ public class TestSourceFormatAdapter {
     testJsonDataSource = new TestJsonDataSource(new TypedProperties(), jsc, 
spark, basicSchemaProvider, batch);
   }
 
-  private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> rdd, 
StructType unsanitizedSchema) {
+  private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> rdd, 
StructType unsanitizedSchema, SchemaProvider schemaProvider) {
     TypedProperties typedProperties = new TypedProperties();
     
typedProperties.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), 
true);
     
typedProperties.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),
 "__");
-    setupRowSource(spark.read().schema(unsanitizedSchema).json(rdd));
+    setupRowSource(spark.read().schema(unsanitizedSchema).json(rdd), 
typedProperties, schemaProvider);
     SourceFormatAdapter sourceFormatAdapter = new 
SourceFormatAdapter(testRowDataSource, Option.empty(), 
Option.of(typedProperties));
     return 
sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
   }
@@ -116,6 +119,10 @@ public class TestSourceFormatAdapter {
     Dataset<Row> ds = inputBatch.getBatch().get();
     assertEquals(2, ds.collectAsList().size());
     assertEquals(sanitizedSchema, ds.schema());
+    if (inputBatch.getSchemaProvider() instanceof RowBasedSchemaProvider) {
+      
assertEquals(AvroConversionUtils.convertStructTypeToAvroSchema(sanitizedSchema,
+          "hoodie_source", "hoodie.source"), 
inputBatch.getSchemaProvider().getSourceSchema());
+    }
     assertEquals(expectedRDD.collect(), ds.toJSON().collectAsList());
   }
 
@@ -123,7 +130,9 @@ public class TestSourceFormatAdapter {
   @MethodSource("provideDataFiles")
   public void testRowSanitization(String unsanitizedDataFile, String 
sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
     JavaRDD<String> unsanitizedRDD = jsc.textFile(unsanitizedDataFile);
-    verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema), 
sanitizedDataFile, sanitizedSchema);
+    SchemaProvider schemaProvider = new InputBatch.NullSchemaProvider();
+    verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema, 
schemaProvider), sanitizedDataFile, sanitizedSchema);
+    verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema, null), 
sanitizedDataFile, sanitizedSchema);
 
   }
 
@@ -134,18 +143,17 @@ public class TestSourceFormatAdapter {
     verifySanitization(fetchJsonData(unsanitizedRDD, sanitizedSchema), 
sanitizedDataFile, sanitizedSchema);
   }
 
-  public static class TestRowDataSource extends Source<Dataset<Row>> {
+  public static class TestRowDataSource extends RowSource {
     private final InputBatch<Dataset<Row>> batch;
-
     public TestRowDataSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
                              SchemaProvider schemaProvider, 
InputBatch<Dataset<Row>> batch) {
-      super(props, sparkContext, sparkSession, schemaProvider, SourceType.ROW);
+      super(props, sparkContext, sparkSession, schemaProvider);
       this.batch = batch;
     }
 
     @Override
-    protected InputBatch<Dataset<Row>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
-      return batch;
+    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
+      return Pair.of(batch.getBatch(), batch.getCheckpointForNextBatch());
     }
   }
 

Reply via email to