This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5ddc6a89f90 [HUDI-6923] Fixing bug with sanitization for rowSource
(#9834)
5ddc6a89f90 is described below
commit 5ddc6a89f90c2d43d6dc13d17c81dd8620232626
Author: harshal <[email protected]>
AuthorDate: Fri Oct 27 08:55:53 2023 +0530
[HUDI-6923] Fixing bug with sanitization for rowSource (#9834)
---
.../utilities/schema/FilebasedSchemaProvider.java | 2 +-
.../apache/hudi/utilities/sources/RowSource.java | 6 ++--
.../sources/helpers/SanitizationUtils.java | 7 ++++-
.../utilities/streamer/SourceFormatAdapter.java | 20 ++------------
.../deltastreamer/TestSourceFormatAdapter.java | 32 ++++++++++++++--------
5 files changed, 34 insertions(+), 33 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 4149535ed3b..3ca97b01f95 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -53,7 +53,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
super(props, jssc);
checkRequiredConfigProperties(props,
Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE));
String sourceFile = getStringWithAltKeys(props,
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
- boolean shouldSanitize = SanitizationUtils.getShouldSanitize(props);
+ boolean shouldSanitize = SanitizationUtils.shouldSanitize(props);
String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true);
this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs,
shouldSanitize, invalidCharMask);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
index bd29ccae699..f2cc48f280c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -42,9 +43,10 @@ public abstract class RowSource extends Source<Dataset<Row>>
{
protected final InputBatch<Dataset<Row>> fetchNewData(Option<String>
lastCkptStr, long sourceLimit) {
Pair<Option<Dataset<Row>>, String> res = fetchNextBatch(lastCkptStr,
sourceLimit);
return res.getKey().map(dsr -> {
+ Dataset<Row> sanitizedRows =
SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
SchemaProvider rowSchemaProvider =
- UtilHelpers.createRowBasedSchemaProvider(dsr.schema(), props,
sparkContext);
- return new InputBatch<>(res.getKey(), res.getValue(), rowSchemaProvider);
+ UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(),
props, sparkContext);
+ return new InputBatch<>(Option.of(sanitizedRows), res.getValue(),
rowSchemaProvider);
}).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue()));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
index d09b88d54b7..ac1d33f6b53 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
@@ -65,7 +65,7 @@ public class SanitizationUtils {
private static final String AVRO_FIELD_NAME_KEY = "name";
- public static boolean getShouldSanitize(TypedProperties props) {
+ public static boolean shouldSanitize(TypedProperties props) {
return getBooleanWithAltKeys(props,
HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES);
}
@@ -120,6 +120,11 @@ public class SanitizationUtils {
return targetDataset;
}
+ public static Dataset<Row> sanitizeColumnNamesForAvro(Dataset<Row>
inputDataset, TypedProperties props) {
+ return shouldSanitize(props) ? sanitizeColumnNamesForAvro(inputDataset,
getInvalidCharMask(props))
+ : inputDataset;
+ }
+
/*
* We first rely on Avro to parse and then try to rename only for those
failed.
* This way we can improve our parsing capabilities without breaking
existing functionality.
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index 4b2dff803a9..9f1b087900d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -78,7 +78,7 @@ public final class SourceFormatAdapter implements Closeable {
this.source = source;
this.errorTableWriter = errorTableWriter;
if (props.isPresent()) {
- this.shouldSanitize = SanitizationUtils.getShouldSanitize(props.get());
+ this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
}
if (this.shouldSanitize && source.getSourceType() ==
Source.SourceType.PROTO) {
@@ -102,20 +102,6 @@ public final class SourceFormatAdapter implements
Closeable {
return invalidCharMask;
}
- /**
- * Sanitize all columns including nested ones as per Avro conventions.
- * @param srcBatch
- * @return sanitized batch.
- */
- private InputBatch<Dataset<Row>>
maybeSanitizeFieldNames(InputBatch<Dataset<Row>> srcBatch) {
- if (!isFieldNameSanitizingEnabled() || !srcBatch.getBatch().isPresent()) {
- return srcBatch;
- }
- Dataset<Row> srcDs = srcBatch.getBatch().get();
- Dataset<Row> targetDs =
SanitizationUtils.sanitizeColumnNamesForAvro(srcDs, getInvalidCharMask());
- return new InputBatch<>(Option.ofNullable(targetDs),
srcBatch.getCheckpointForNextBatch(), srcBatch.getSchemaProvider());
- }
-
/**
* transform input rdd of json string to generic records with support for
adding error events to error table
* @param inputBatch
@@ -172,7 +158,7 @@ public final class SourceFormatAdapter implements Closeable
{
}
case ROW: {
//we do the sanitizing here if enabled
- InputBatch<Dataset<Row>> r =
maybeSanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr,
sourceLimit));
+ InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>)
source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> {
SchemaProvider originalProvider =
UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
@@ -219,7 +205,7 @@ public final class SourceFormatAdapter implements Closeable
{
switch (source.getSourceType()) {
case ROW:
//we do the sanitizing here if enabled
- InputBatch<Dataset<Row>> datasetInputBatch =
maybeSanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr,
sourceLimit));
+ InputBatch<Dataset<Row>> datasetInputBatch = ((Source<Dataset<Row>>)
source).fetchNext(lastCkptStr, sourceLimit);
return new
InputBatch<>(processErrorEvents(datasetInputBatch.getBatch(),
ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE),
datasetInputBatch.getCheckpointForNextBatch(),
datasetInputBatch.getSchemaProvider());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index 8b74ab7bc20..30b997e856a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -19,11 +19,15 @@
package org.apache.hudi.utilities.deltastreamer;
+import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
+import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.SanitizationTestUtils;
@@ -80,10 +84,9 @@ public class TestSourceFormatAdapter {
testJsonDataSource = null;
}
- private void setupRowSource(Dataset<Row> ds) {
- SchemaProvider nullSchemaProvider = new InputBatch.NullSchemaProvider();
- InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds),
DUMMY_CHECKPOINT, nullSchemaProvider);
- testRowDataSource = new TestRowDataSource(new TypedProperties(), jsc,
spark, nullSchemaProvider, batch);
+ private void setupRowSource(Dataset<Row> ds, TypedProperties properties,
SchemaProvider schemaProvider) {
+ InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds),
DUMMY_CHECKPOINT, schemaProvider);
+ testRowDataSource = new TestRowDataSource(properties, jsc, spark,
schemaProvider, batch);
}
private void setupJsonSource(JavaRDD<String> ds, Schema schema) {
@@ -92,11 +95,11 @@ public class TestSourceFormatAdapter {
testJsonDataSource = new TestJsonDataSource(new TypedProperties(), jsc,
spark, basicSchemaProvider, batch);
}
- private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> rdd,
StructType unsanitizedSchema) {
+ private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> rdd,
StructType unsanitizedSchema, SchemaProvider schemaProvider) {
TypedProperties typedProperties = new TypedProperties();
typedProperties.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(),
true);
typedProperties.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),
"__");
- setupRowSource(spark.read().schema(unsanitizedSchema).json(rdd));
+ setupRowSource(spark.read().schema(unsanitizedSchema).json(rdd),
typedProperties, schemaProvider);
SourceFormatAdapter sourceFormatAdapter = new
SourceFormatAdapter(testRowDataSource, Option.empty(),
Option.of(typedProperties));
return
sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
}
@@ -116,6 +119,10 @@ public class TestSourceFormatAdapter {
Dataset<Row> ds = inputBatch.getBatch().get();
assertEquals(2, ds.collectAsList().size());
assertEquals(sanitizedSchema, ds.schema());
+ if (inputBatch.getSchemaProvider() instanceof RowBasedSchemaProvider) {
+
assertEquals(AvroConversionUtils.convertStructTypeToAvroSchema(sanitizedSchema,
+ "hoodie_source", "hoodie.source"),
inputBatch.getSchemaProvider().getSourceSchema());
+ }
assertEquals(expectedRDD.collect(), ds.toJSON().collectAsList());
}
@@ -123,7 +130,9 @@ public class TestSourceFormatAdapter {
@MethodSource("provideDataFiles")
public void testRowSanitization(String unsanitizedDataFile, String
sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
JavaRDD<String> unsanitizedRDD = jsc.textFile(unsanitizedDataFile);
- verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema),
sanitizedDataFile, sanitizedSchema);
+ SchemaProvider schemaProvider = new InputBatch.NullSchemaProvider();
+ verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema,
schemaProvider), sanitizedDataFile, sanitizedSchema);
+ verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema, null),
sanitizedDataFile, sanitizedSchema);
}
@@ -134,18 +143,17 @@ public class TestSourceFormatAdapter {
verifySanitization(fetchJsonData(unsanitizedRDD, sanitizedSchema),
sanitizedDataFile, sanitizedSchema);
}
- public static class TestRowDataSource extends Source<Dataset<Row>> {
+ public static class TestRowDataSource extends RowSource {
private final InputBatch<Dataset<Row>> batch;
-
public TestRowDataSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider,
InputBatch<Dataset<Row>> batch) {
- super(props, sparkContext, sparkSession, schemaProvider, SourceType.ROW);
+ super(props, sparkContext, sparkSession, schemaProvider);
this.batch = batch;
}
@Override
- protected InputBatch<Dataset<Row>> fetchNewData(Option<String>
lastCkptStr, long sourceLimit) {
- return batch;
+ protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
+ return Pair.of(batch.getBatch(), batch.getCheckpointForNextBatch());
}
}