This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit df90640116c7c6123e2faa883b954732bccba55b Author: harshal <[email protected]> AuthorDate: Wed Aug 23 13:20:09 2023 +0530 [HUDI-4115] Adding support for schema while loading spark dataset in S3/GCS source (#9502) `CloudObjectsSelectorCommon` now takes optional schemaProvider. Spark datasource read will use `schemaProvider` schema instead of inferred schema if `schemaProvider` is there . --------- Co-authored-by: Sagar Sumit <[email protected]> --- .../sources/GcsEventsHoodieIncrSource.java | 5 +++- .../sources/S3EventsHoodieIncrSource.java | 5 +++- .../sources/helpers/CloudDataFetcher.java | 6 ++-- .../helpers/CloudObjectsSelectorCommon.java | 17 ++++++++++- .../sources/TestGcsEventsHoodieIncrSource.java | 34 +++++++++++++++------- .../sources/TestS3EventsHoodieIncrSource.java | 28 +++++++++++++----- .../helpers/TestCloudObjectsSelectorCommon.java | 17 +++++++++++ .../test/resources/schema/sample_data_schema.avsc | 27 +++++++++++++++++ .../src/test/resources/schema/sample_gcs_data.avsc | 31 ++++++++++++++++++++ 9 files changed, 147 insertions(+), 23 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index 6eb9a7fdbf7..891881095fd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -113,6 +113,8 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { private final GcsObjectMetadataFetcher gcsObjectMetadataFetcher; private final CloudDataFetcher gcsObjectDataFetcher; private final QueryRunner queryRunner; + private final Option<SchemaProvider> schemaProvider; + public static final String GCS_OBJECT_KEY = "name"; public static final String GCS_OBJECT_SIZE = "size"; @@ -142,6 +144,7 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { this.gcsObjectMetadataFetcher = gcsObjectMetadataFetcher; this.gcsObjectDataFetcher = gcsObjectDataFetcher; this.queryRunner = queryRunner; + this.schemaProvider = Option.ofNullable(schemaProvider); LOG.info("srcPath: " + srcPath); LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy); @@ -186,7 +189,7 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Dataset<Row> cloudObjectMetadataDF) { List<CloudObjectMetadata> cloudObjectMetadata = gcsObjectMetadataFetcher.getGcsObjectMetadata(sparkContext, cloudObjectMetadataDF, checkIfFileExists); LOG.info("Total number of files to process :" + cloudObjectMetadata.size()); - Option<Dataset<Row>> fileDataRows = gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props); + Option<Dataset<Row>> fileDataRows = gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, schemaProvider); return Pair.of(fileDataRows, queryInfo.getEndInstant()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 927a8fc3ebb..4b9be847c75 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -78,6 +78,8 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { private final QueryRunner queryRunner; private final CloudDataFetcher cloudDataFetcher; + private final Option<SchemaProvider> schemaProvider; + public static class Config { // control whether we do existence check for files before consuming them @Deprecated @@ -135,6 +137,7 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { this.missingCheckpointStrategy = getMissingCheckpointStrategy(props); this.queryRunner = queryRunner; this.cloudDataFetcher = cloudDataFetcher; + this.schemaProvider = Option.ofNullable(schemaProvider); } @Override @@ -181,7 +184,7 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { .collectAsList(); LOG.info("Total number of files to process :" + cloudObjectMetadata.size()); - Option<Dataset<Row>> datasetOption = cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props); + Option<Dataset<Row>> datasetOption = cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, schemaProvider); return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java index dfa6c68ec6f..9595ec1a9e6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.schema.SchemaProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +51,9 @@ public class CloudDataFetcher implements Serializable { this.props = props; } - public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props) { - return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat); + public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, + TypedProperties props, Option<SchemaProvider> schemaProviderOption) { + return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, schemaProviderOption); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 6791b47b129..19da6aada9b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -18,6 +18,8 @@ package org.apache.hudi.utilities.sources.helpers; +import org.apache.avro.Schema; +import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; @@ -27,6 +29,8 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.config.CloudSourceConfig; import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.InputBatch; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; @@ -146,7 +150,8 @@ public class CloudObjectsSelectorCommon { } } - public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String fileFormat) { + public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, + TypedProperties props, String fileFormat, Option<SchemaProvider> schemaProviderOption) { if (LOG.isDebugEnabled()) { LOG.debug("Extracted distinct files " + cloudObjectMetadata.size() + " and some samples " + cloudObjectMetadata.stream().map(CloudObjectMetadata::getPath).limit(10).collect(Collectors.toList())); @@ -157,6 +162,12 @@ public class CloudObjectsSelectorCommon { } DataFrameReader reader = spark.read().format(fileFormat); String datasourceOpts = getStringWithAltKeys(props, CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true); + if (schemaProviderOption.isPresent()) { + Schema sourceSchema = schemaProviderOption.get().getSourceSchema(); + if (sourceSchema != null && !sourceSchema.equals(InputBatch.NULL_SCHEMA)) { + reader = reader.schema(AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)); + } + } if (StringUtils.isNullOrEmpty(datasourceOpts)) { // fall back to legacy config for BWC. TODO consolidate in HUDI-6020 datasourceOpts = getStringWithAltKeys(props, S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true); @@ -204,4 +215,8 @@ public class CloudObjectsSelectorCommon { } return Option.of(dataset); } + + public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String fileFormat) { + return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, Option.empty()); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 9414bbec4fd..2d76c1b3d2e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -37,9 +37,10 @@ import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; -import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher; @@ -104,7 +105,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn @Mock QueryRunner queryRunner; - protected FilebasedSchemaProvider schemaProvider; + protected Option<SchemaProvider> schemaProvider; private HoodieTableMetaClient metaClient; private JavaSparkContext jsc; @@ -114,6 +115,11 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn public void setUp() throws IOException { metaClient = getHoodieMetaClient(hadoopConf(), basePath()); jsc = JavaSparkContext.fromSparkContext(spark().sparkContext()); + String schemaFilePath = TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); + TypedProperties props = new TypedProperties(); + props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc)); MockitoAnnotations.initMocks(this); } @@ -134,7 +140,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn verify(gcsObjectMetadataFetcher, times(0)).getGcsObjectMetadata(Mockito.any(), Mockito.any(), anyBoolean()); verify(gcsObjectDataFetcher, times(0)).getCloudObjectDataDF( - Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider)); } @Test @@ -166,7 +172,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows)); + when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any(), + eq(schemaProvider))).thenReturn(Option.of(rows)); when(queryRunner.run(Mockito.any())).thenReturn(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, 4, "1#path/to/file1.json"); @@ -174,7 +181,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn verify(gcsObjectMetadataFetcher, times(1)).getGcsObjectMetadata(Mockito.any(), Mockito.any(), anyBoolean()); verify(gcsObjectDataFetcher, times(1)).getCloudObjectDataDF(Mockito.any(), - eq(cloudObjectMetadataList), Mockito.any()); + eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider)); } @Test @@ -208,7 +215,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows)); + when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any(), + eq(schemaProvider))).thenReturn(Option.of(rows)); when(queryRunner.run(Mockito.any())).thenReturn(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, 4, "1#path/to/file2.json"); @@ -217,7 +225,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn verify(gcsObjectMetadataFetcher, times(2)).getGcsObjectMetadata(Mockito.any(), Mockito.any(), anyBoolean()); verify(gcsObjectDataFetcher, times(2)).getCloudObjectDataDF(Mockito.any(), - eq(cloudObjectMetadataList), Mockito.any()); + eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider)); } @Test @@ -253,7 +261,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); - when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows)); + when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any(), + eq(schemaProvider))).thenReturn(Option.of(rows)); when(queryRunner.run(Mockito.any())).thenReturn(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, 4, "1#path/to/file1.json"); @@ -263,7 +272,12 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn verify(gcsObjectMetadataFetcher, times(3)).getGcsObjectMetadata(Mockito.any(), Mockito.any(), anyBoolean()); verify(gcsObjectDataFetcher, times(3)).getCloudObjectDataDF(Mockito.any(), - eq(cloudObjectMetadataList), Mockito.any()); + eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider)); + + schemaProvider = Option.empty(); + when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), eq(cloudObjectMetadataList), Mockito.any(), + eq(schemaProvider))).thenReturn(Option.of(rows)); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, 4, "1#path/to/file1.json"); } private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, @@ -271,7 +285,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn TypedProperties typedProperties = setProps(missingCheckpointStrategy); GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(), - spark(), schemaProvider, gcsObjectMetadataFetcher, gcsObjectDataFetcher, queryRunner); + spark(), schemaProvider.orElse(null), gcsObjectMetadataFetcher, gcsObjectDataFetcher, queryRunner); Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, sourceLimit); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 9ff90678e5f..d40d7adce52 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; @@ -46,6 +47,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -69,6 +71,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -80,8 +83,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne private static final String MY_BUCKET = "some-bucket"; - @Mock - private SchemaProvider mockSchemaProvider; + private Option<SchemaProvider> schemaProvider; @Mock QueryRunner mockQueryRunner; @Mock @@ -93,6 +95,11 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne public void setUp() throws IOException { jsc = JavaSparkContext.fromSparkContext(spark().sparkContext()); metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); + TypedProperties props = new TypedProperties(); + props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc)); } private List<String> getSampleS3ObjectKeys(List<Triple<String, Long, String>> filePathSizeAndCommitTime) { @@ -241,7 +248,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any())) + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); @@ -266,7 +273,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any())) + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json"); @@ -294,7 +301,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any())) + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, "1#path/to/file1.json"); @@ -354,7 +361,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any())) + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); @@ -386,19 +393,24 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); - when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any())) + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties); + + schemaProvider = Option.empty(); + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) + .thenReturn(Option.empty()); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties); } private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint, TypedProperties typedProperties) { S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource(typedProperties, jsc(), - spark(), mockSchemaProvider, mockQueryRunner, mockCloudDataFetcher); + spark(), schemaProvider.orElse(null), mockQueryRunner, mockCloudDataFetcher); Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, sourceLimit); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index 13818d98c76..b4b6507e074 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -75,6 +76,22 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); } + @Test + public void loadDatasetWithSchema() { + TypedProperties props = new TypedProperties(); + TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc"); + String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath(); + props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "country,state"); + List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); + Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", Option.of(new FilebasedSchemaProvider(props, jsc))); + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(1, result.get().count()); + Row expected = RowFactory.create("some data", "US", "CA"); + Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); + } + @Test public void partitionKeyNotPresentInPath() { List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); diff --git a/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc b/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc new file mode 100644 index 00000000000..13cbcfff4be --- /dev/null +++ b/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type": "record", + "name": "MySchema", + "fields": [ + { + "name": "data", + "type": "string" + } + ] +} diff --git a/hudi-utilities/src/test/resources/schema/sample_gcs_data.avsc b/hudi-utilities/src/test/resources/schema/sample_gcs_data.avsc new file mode 100644 index 00000000000..de8c79fee2e --- /dev/null +++ b/hudi-utilities/src/test/resources/schema/sample_gcs_data.avsc @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type": "record", + "name": "MySchema", + "fields": [ + { + "name": "id", + "type": ["null", "string"] + }, + { + "name": "text", + "type": ["null", "string"] + } + ] +}
