This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 136d0755ad7 [HUDI-7500] fix gaps with deduce schema and null schema
(#10858)
136d0755ad7 is described below
commit 136d0755ad7f86d2f4b7b4813a59096c233055e7
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Mar 27 17:27:27 2024 -0400
[HUDI-7500] fix gaps with deduce schema and null schema (#10858)
---------
Co-authored-by: Jonathan Vexler <=>
---
.../main/scala/org/apache/hudi/DefaultSource.scala | 7 +-
.../utilities/streamer/SourceFormatAdapter.java | 2 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 51 ++++--
.../deltastreamer/TestHoodieDeltaStreamer.java | 4 +-
.../streamer/TestStreamSyncUnitTests.java | 192 +++++++++++++++++++++
5 files changed, 241 insertions(+), 15 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index c346f7665df..be3d2f4ed4b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -74,7 +74,12 @@ class DefaultSource extends RelationProvider
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation =
{
try {
- createRelation(sqlContext, parameters, null)
+ val relation = createRelation(sqlContext, parameters, null)
+ if (relation.schema.isEmpty) {
+ new EmptyRelation(sqlContext, new StructType())
+ } else {
+ relation
+ }
} catch {
case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext,
new StructType())
case e => throw e
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index f29404701db..1796c96dab8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -62,7 +62,7 @@ import static
org.apache.hudi.utilities.streamer.BaseErrorTableWriter.ERROR_TABL
/**
* Adapts data-format provided by the source to the data-format required by
the client (DeltaStreamer).
*/
-public final class SourceFormatAdapter implements Closeable {
+public class SourceFormatAdapter implements Closeable {
private final Source source;
private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 1453e9fd07c..ded5348ed8f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -255,6 +256,31 @@ public class StreamSync implements Serializable, Closeable
{
private final boolean useRowWriter;
+ @VisibleForTesting
+ StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
+ TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
+ Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient, SchemaProvider userProvidedSchemaProvider,
+ Option<BaseErrorTableWriter> errorTableWriter,
SourceFormatAdapter formatAdapter, Option<Transformer> transformer,
+ boolean useRowWriter, boolean autoGenerateRecordKeys) {
+ this.cfg = cfg;
+ this.hoodieSparkContext = hoodieSparkContext;
+ this.sparkSession = sparkSession;
+ this.fs = fs;
+ this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
+ this.props = props;
+ this.userProvidedSchemaProvider = userProvidedSchemaProvider;
+ this.processedSchema = new SchemaSet();
+ this.autoGenerateRecordKeys = autoGenerateRecordKeys;
+ this.keyGenClassName = getKeyGeneratorClassName(new
TypedProperties(props));
+ this.conf = conf;
+
+ this.errorTableWriter = errorTableWriter;
+ this.formatAdapter = formatAdapter;
+ this.transformer = transformer;
+ this.useRowWriter = useRowWriter;
+
+ }
+
@Deprecated
public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem
fs, Configuration conf,
@@ -553,7 +579,8 @@ public class StreamSync implements Serializable, Closeable {
* @param resumeCheckpointStr checkpoint to resume from source.
* @return {@link InputBatch} containing the new batch of data from source
along with new checkpoint and schema provider instance to use.
*/
- private InputBatch fetchNextBatchFromSource(Option<String>
resumeCheckpointStr, HoodieTableMetaClient metaClient) {
+ @VisibleForTesting
+ InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
HoodieTableMetaClient metaClient) {
Option<JavaRDD<GenericRecord>> avroRDDOptional = null;
String checkpointStr = null;
SchemaProvider schemaProvider = null;
@@ -574,12 +601,12 @@ public class StreamSync implements Serializable,
Closeable {
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
if (this.userProvidedSchemaProvider != null &&
this.userProvidedSchemaProvider.getTargetSchema() != null
&& this.userProvidedSchemaProvider.getTargetSchema() !=
InputBatch.NULL_SCHEMA) {
+ // Let's deduce the schema provider for writer side first!
+ schemaProvider =
getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(),
this.userProvidedSchemaProvider, metaClient);
if (useRowWriter) {
- inputBatchForWriter = new InputBatch(transformed, checkpointStr,
this.userProvidedSchemaProvider);
+ inputBatchForWriter = new InputBatch(transformed, checkpointStr,
schemaProvider);
} else {
// non row writer path
- // Let's deduce the schema provider for writer side first!
- schemaProvider =
getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(),
this.userProvidedSchemaProvider, metaClient);
SchemaProvider finalSchemaProvider = schemaProvider;
// If the target schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
@@ -607,11 +634,10 @@ public class StreamSync implements Serializable,
Closeable {
} else {
// Deduce proper target (writer's) schema for the input dataset,
reconciling its
// schema w/ the table's one
- Option<Schema> incomingSchemaOpt = transformed.map(df ->
- AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
getAvroRecordQualifiedName(cfg.targetTableName)));
-
- schemaProvider = incomingSchemaOpt.map(incomingSchema ->
getDeducedSchemaProvider(incomingSchema, dataAndCheckpoint.getSchemaProvider(),
metaClient))
- .orElseGet(dataAndCheckpoint::getSchemaProvider);
+ Schema incomingSchema = transformed.map(df ->
+ AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
getAvroRecordQualifiedName(cfg.targetTableName)))
+ .orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetSchema);
+ schemaProvider = getDeducedSchemaProvider(incomingSchema,
dataAndCheckpoint.getSchemaProvider(), metaClient);
if (useRowWriter) {
inputBatchForWriter = new InputBatch(transformed, checkpointStr,
schemaProvider);
@@ -623,7 +649,9 @@ public class StreamSync implements Serializable, Closeable {
}
} else {
if (useRowWriter) {
- inputBatchForWriter =
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
+ InputBatch inputBatchNeedsDeduceSchema =
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
+ inputBatchForWriter = new
InputBatch<>(inputBatchNeedsDeduceSchema.getBatch(),
inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(),
+
getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema(),
inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
} else {
// Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit);
@@ -662,7 +690,8 @@ public class StreamSync implements Serializable, Closeable {
* @param sourceSchemaProvider Source schema provider.
* @return the SchemaProvider that can be used as writer schema.
*/
- private SchemaProvider getDeducedSchemaProvider(Schema incomingSchema,
SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
+ @VisibleForTesting
+ SchemaProvider getDeducedSchemaProvider(Schema incomingSchema,
SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs,
cfg.targetBasePath, metaClient);
Option<InternalSchema> internalSchemaOpt =
HoodieConversionUtils.toJavaOption(
HoodieSchemaUtils.getLatestTableInternalSchema(
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 9f045bd606e..2a2c4dafb1e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -2134,8 +2134,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
String tableBasePath = basePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT, ParquetDFSSource.class.getName(),
- null, PROPS_FILENAME_TEST_PARQUET, false,
- false, 100000, false, null, null, "timestamp", null);
+ Collections.singletonList(TestIdentityTransformer.class.getName()),
PROPS_FILENAME_TEST_PARQUET, false,
+ false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
new file mode 100644
index 00000000000..99148eb4b07
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieErrorTableConfig;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.transform.Transformer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestStreamSyncUnitTests {
+
+ @ParameterizedTest
+ @MethodSource("testCasesFetchNextBatchFromSource")
+ void testFetchNextBatchFromSource(Boolean useRowWriter, Boolean
hasTransformer, Boolean hasSchemaProvider,
+ Boolean isNullTargetSchema, Boolean
hasErrorTable, Boolean shouldTryWriteToErrorTable) {
+ //basic deltastreamer inputs
+ HoodieSparkEngineContext hoodieSparkEngineContext =
mock(HoodieSparkEngineContext.class);
+ FileSystem fs = mock(FileSystem.class);
+ SparkSession sparkSession = mock(SparkSession.class);
+ Configuration configuration = mock(Configuration.class);
+ HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+ cfg.targetTableName = "testTableName";
+ cfg.targetBasePath = "/fake/table/name";
+ cfg.tableType = "MERGE_ON_READ";
+
+ //Source format adapter
+ SourceFormatAdapter sourceFormatAdapter = mock(SourceFormatAdapter.class);
+ SchemaProvider inputBatchSchemaProvider = getSchemaProvider("InputBatch",
false);
+ Option<Dataset<Row>> fakeDataFrame = Option.of(mock(Dataset.class));
+ InputBatch<Dataset<Row>> fakeRowInputBatch = new
InputBatch<>(fakeDataFrame, "chkpt", inputBatchSchemaProvider);
+ when(sourceFormatAdapter.fetchNewDataInRowFormat(any(),
anyLong())).thenReturn(fakeRowInputBatch);
+ //batch is empty because we don't want getBatch().map() to do anything
because it calls static method we can't mock
+ InputBatch<JavaRDD<GenericRecord>> fakeAvroInputBatch = new
InputBatch<>(Option.empty(), "chkpt", inputBatchSchemaProvider);
+
when(sourceFormatAdapter.fetchNewDataInAvroFormat(any(),anyLong())).thenReturn(fakeAvroInputBatch);
+
+ //transformer
+ //return empty because we don't want .map() to do anything because it
calls static method we can't mock
+ when(sourceFormatAdapter.processErrorEvents(any(),
any())).thenReturn(Option.empty());
+ Option<Transformer> transformerOption = Option.empty();
+ if (hasTransformer) {
+ transformerOption = Option.of(mock(Transformer.class));
+ }
+
+ //user provided schema provider
+ SchemaProvider schemaProvider = null;
+ if (hasSchemaProvider) {
+ schemaProvider = getSchemaProvider("UserProvided", isNullTargetSchema);
+ }
+
+ //error table
+ TypedProperties props = new TypedProperties();
+ props.put(DataSourceWriteOptions.RECONCILE_SCHEMA().key(), false);
+ Option<BaseErrorTableWriter> errorTableWriterOption = Option.empty();
+ if (hasErrorTable) {
+ errorTableWriterOption = Option.of(mock(BaseErrorTableWriter.class));
+ props.put(ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), true);
+ }
+ TypedProperties propsSpy = spy(props);
+
+
+ //Actually create the deltastreamer
+ StreamSync streamSync = new StreamSync(cfg, sparkSession, propsSpy,
hoodieSparkEngineContext,
+ fs, configuration, client -> true, schemaProvider,
errorTableWriterOption, sourceFormatAdapter, transformerOption, useRowWriter,
false);
+ StreamSync spy = spy(streamSync);
+ SchemaProvider deducedSchemaProvider;
+ deducedSchemaProvider = getSchemaProvider("deduced", false);
+ doReturn(deducedSchemaProvider).when(spy).getDeducedSchemaProvider(any(),
any(), any());
+
+ //run the method we are unit testing:
+ InputBatch batch = spy.fetchNextBatchFromSource(Option.empty(),
mock(HoodieTableMetaClient.class));
+
+ //make sure getDeducedSchemaProvider is always called once
+ verify(spy, times(1)).getDeducedSchemaProvider(any(), any(), any());
+
+ //make sure the deduced schema is actually used
+ assertEquals(deducedSchemaProvider.getTargetSchema(),
batch.getSchemaProvider().getTargetSchema());
+
+ //make sure we use error table when we should
+ verify(propsSpy, shouldTryWriteToErrorTable ? times(1) : never())
+
.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue());
+ }
+
+ private SchemaProvider getSchemaProvider(String name, boolean
isNullTargetSchema) {
+ SchemaProvider schemaProvider = mock(SchemaProvider.class);
+ Schema sourceSchema = mock(Schema.class);
+ Schema targetSchema = isNullTargetSchema ? InputBatch.NULL_SCHEMA :
mock(Schema.class);
+ when(schemaProvider.getSourceSchema()).thenReturn(sourceSchema);
+ when(schemaProvider.getTargetSchema()).thenReturn(targetSchema);
+ when(sourceSchema.toString()).thenReturn(name + "SourceSchema");
+ if (!isNullTargetSchema) {
+ when(targetSchema.toString()).thenReturn(name + "TargetSchema");
+ }
+ return schemaProvider;
+ }
+
+ static Stream<Arguments> testCasesFetchNextBatchFromSource() {
+ Stream.Builder<Arguments> b = Stream.builder();
+
+ //no transformer
+ for (Boolean useRowWriter : new Boolean[]{false, true}) {
+ for (Boolean hasErrorTable : new Boolean[]{false, true}) {
+ boolean errorTableEnabled = hasErrorTable && !useRowWriter;
+ b.add(Arguments.of(useRowWriter, false, false, false,
+ hasErrorTable, errorTableEnabled));
+ }
+ }
+
+ //with transformer
+ for (Boolean useRowWriter : new Boolean[]{false, true}) {
+ for (Boolean hasSchemaProvider : new Boolean[]{false, true}) {
+ for (Boolean isNullTargetSchema : new Boolean[]{false, true}) {
+ for (Boolean hasErrorTable : new Boolean[]{false, true}) {
+ boolean errorTableEnabled = hasErrorTable && !useRowWriter;
+ boolean schemaProviderNullOrMissing = isNullTargetSchema ||
!hasSchemaProvider;
+ boolean shouldTryWriteToErrorTable = errorTableEnabled &&
!schemaProviderNullOrMissing;
+ b.add(Arguments.of(useRowWriter, true, hasSchemaProvider,
isNullTargetSchema,
+ hasErrorTable, shouldTryWriteToErrorTable));
+ }
+ }
+ }
+ }
+ return b.build();
+ }
+}