This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit df90640116c7c6123e2faa883b954732bccba55b
Author: harshal <[email protected]>
AuthorDate: Wed Aug 23 13:20:09 2023 +0530

    [HUDI-4115] Adding support for schema while loading spark dataset in S3/GCS 
source (#9502)
    
    `CloudObjectsSelectorCommon` now takes optional schemaProvider.
    Spark datasource read will use `schemaProvider` schema
    instead of inferred schema if `schemaProvider` is there .
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../sources/GcsEventsHoodieIncrSource.java         |  5 +++-
 .../sources/S3EventsHoodieIncrSource.java          |  5 +++-
 .../sources/helpers/CloudDataFetcher.java          |  6 ++--
 .../helpers/CloudObjectsSelectorCommon.java        | 17 ++++++++++-
 .../sources/TestGcsEventsHoodieIncrSource.java     | 34 +++++++++++++++-------
 .../sources/TestS3EventsHoodieIncrSource.java      | 28 +++++++++++++-----
 .../helpers/TestCloudObjectsSelectorCommon.java    | 17 +++++++++++
 .../test/resources/schema/sample_data_schema.avsc  | 27 +++++++++++++++++
 .../src/test/resources/schema/sample_gcs_data.avsc | 31 ++++++++++++++++++++
 9 files changed, 147 insertions(+), 23 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 6eb9a7fdbf7..891881095fd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -113,6 +113,8 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
   private final GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
   private final CloudDataFetcher gcsObjectDataFetcher;
   private final QueryRunner queryRunner;
+  private final Option<SchemaProvider> schemaProvider;
+
 
   public static final String GCS_OBJECT_KEY = "name";
   public static final String GCS_OBJECT_SIZE = "size";
@@ -142,6 +144,7 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
     this.gcsObjectMetadataFetcher = gcsObjectMetadataFetcher;
     this.gcsObjectDataFetcher = gcsObjectDataFetcher;
     this.queryRunner = queryRunner;
+    this.schemaProvider = Option.ofNullable(schemaProvider);
 
     LOG.info("srcPath: " + srcPath);
     LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
@@ -186,7 +189,7 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
   private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, 
Dataset<Row> cloudObjectMetadataDF) {
     List<CloudObjectMetadata> cloudObjectMetadata = 
gcsObjectMetadataFetcher.getGcsObjectMetadata(sparkContext, 
cloudObjectMetadataDF, checkIfFileExists);
     LOG.info("Total number of files to process :" + 
cloudObjectMetadata.size());
-    Option<Dataset<Row>> fileDataRows = 
gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, 
props);
+    Option<Dataset<Row>> fileDataRows = 
gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, 
props, schemaProvider);
     return Pair.of(fileDataRows, queryInfo.getEndInstant());
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 927a8fc3ebb..4b9be847c75 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -78,6 +78,8 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
   private final QueryRunner queryRunner;
   private final CloudDataFetcher cloudDataFetcher;
 
+  private final Option<SchemaProvider> schemaProvider;
+
   public static class Config {
     // control whether we do existence check for files before consuming them
     @Deprecated
@@ -135,6 +137,7 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
     this.queryRunner = queryRunner;
     this.cloudDataFetcher = cloudDataFetcher;
+    this.schemaProvider = Option.ofNullable(schemaProvider);
   }
 
   @Override
@@ -181,7 +184,7 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
         .collectAsList();
     LOG.info("Total number of files to process :" + 
cloudObjectMetadata.size());
 
-    Option<Dataset<Row>> datasetOption = 
cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props);
+    Option<Dataset<Row>> datasetOption = 
cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, 
schemaProvider);
     return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index dfa6c68ec6f..9595ec1a9e6 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +51,9 @@ public class CloudDataFetcher implements Serializable {
     this.props = props;
   }
 
-  public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props) {
-    return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat);
+  public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata,
+                                                   TypedProperties props, 
Option<SchemaProvider> schemaProviderOption) {
+    return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, 
schemaProviderOption);
   }
 
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 6791b47b129..19da6aada9b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
+import org.apache.avro.Schema;
+import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
@@ -27,6 +29,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.config.CloudSourceConfig;
 import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.InputBatch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.conf.Configuration;
@@ -146,7 +150,8 @@ public class CloudObjectsSelectorCommon {
     }
   }
 
-  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String 
fileFormat) {
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata,
+                                                   TypedProperties props, 
String fileFormat, Option<SchemaProvider> schemaProviderOption) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Extracted distinct files " + cloudObjectMetadata.size()
           + " and some samples " + 
cloudObjectMetadata.stream().map(CloudObjectMetadata::getPath).limit(10).collect(Collectors.toList()));
@@ -157,6 +162,12 @@ public class CloudObjectsSelectorCommon {
     }
     DataFrameReader reader = spark.read().format(fileFormat);
     String datasourceOpts = getStringWithAltKeys(props, 
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
+    if (schemaProviderOption.isPresent()) {
+      Schema sourceSchema = schemaProviderOption.get().getSourceSchema();
+      if (sourceSchema != null && 
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
+        reader = 
reader.schema(AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema));
+      }
+    }
     if (StringUtils.isNullOrEmpty(datasourceOpts)) {
       // fall back to legacy config for BWC. TODO consolidate in HUDI-6020
       datasourceOpts = getStringWithAltKeys(props, 
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
@@ -204,4 +215,8 @@ public class CloudObjectsSelectorCommon {
     }
     return Option.of(dataset);
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String 
fileFormat) {
+    return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, 
Option.empty());
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 9414bbec4fd..2d76c1b3d2e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -37,9 +37,10 @@ import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
-import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
 
@@ -104,7 +105,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   @Mock
   QueryRunner queryRunner;
 
-  protected FilebasedSchemaProvider schemaProvider;
+  protected Option<SchemaProvider> schemaProvider;
   private HoodieTableMetaClient metaClient;
   private JavaSparkContext jsc;
 
@@ -114,6 +115,11 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   public void setUp() throws IOException {
     metaClient = getHoodieMetaClient(hadoopConf(), basePath());
     jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+    String schemaFilePath = 
TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
+    TypedProperties props = new TypedProperties();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc));
     MockitoAnnotations.initMocks(this);
   }
 
@@ -134,7 +140,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     verify(gcsObjectMetadataFetcher, 
times(0)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
             anyBoolean());
     verify(gcsObjectDataFetcher, times(0)).getCloudObjectDataDF(
-            Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider));
   }
 
   @Test
@@ -166,7 +172,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any(),
+        eq(schemaProvider))).thenReturn(Option.of(rows));
     when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 4, "1#path/to/file1.json");
@@ -174,7 +181,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     verify(gcsObjectMetadataFetcher, 
times(1)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
             anyBoolean());
     verify(gcsObjectDataFetcher, times(1)).getCloudObjectDataDF(Mockito.any(),
-            eq(cloudObjectMetadataList), Mockito.any());
+        eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
   }
 
   @Test
@@ -208,7 +215,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any(),
+        eq(schemaProvider))).thenReturn(Option.of(rows));
     when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, 4, "1#path/to/file2.json");
@@ -217,7 +225,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     verify(gcsObjectMetadataFetcher, 
times(2)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
         anyBoolean());
     verify(gcsObjectDataFetcher, times(2)).getCloudObjectDataDF(Mockito.any(),
-        eq(cloudObjectMetadataList), Mockito.any());
+        eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
   }
 
   @Test
@@ -253,7 +261,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any(),
+        eq(schemaProvider))).thenReturn(Option.of(rows));
     when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 4, "1#path/to/file1.json");
@@ -263,7 +272,12 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     verify(gcsObjectMetadataFetcher, 
times(3)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
         anyBoolean());
     verify(gcsObjectDataFetcher, times(3)).getCloudObjectDataDF(Mockito.any(),
-        eq(cloudObjectMetadataList), Mockito.any());
+        eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
+
+    schemaProvider = Option.empty();
+    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any(),
+        eq(schemaProvider))).thenReturn(Option.of(rows));
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 4, "1#path/to/file1.json");
   }
 
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
@@ -271,7 +285,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     TypedProperties typedProperties = setProps(missingCheckpointStrategy);
 
     GcsEventsHoodieIncrSource incrSource = new 
GcsEventsHoodieIncrSource(typedProperties, jsc(),
-            spark(), schemaProvider, gcsObjectMetadataFetcher, 
gcsObjectDataFetcher, queryRunner);
+        spark(), schemaProvider.orElse(null), gcsObjectMetadataFetcher, 
gcsObjectDataFetcher, queryRunner);
 
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 9ff90678e5f..d40d7adce52 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
@@ -46,6 +47,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import 
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -69,6 +71,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -80,8 +83,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
   private static final String MY_BUCKET = "some-bucket";
 
-  @Mock
-  private SchemaProvider mockSchemaProvider;
+  private Option<SchemaProvider> schemaProvider;
   @Mock
   QueryRunner mockQueryRunner;
   @Mock
@@ -93,6 +95,11 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
   public void setUp() throws IOException {
     jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
     metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+    String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
+    TypedProperties props = new TypedProperties();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc));
   }
 
   private List<String> getSampleS3ObjectKeys(List<Triple<String, Long, 
String>> filePathSizeAndCommitTime) {
@@ -241,7 +248,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
@@ -266,7 +273,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file2.json");
@@ -294,7 +301,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, 
"1#path/to/file1.json");
@@ -354,7 +361,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
     
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
@@ -386,19 +393,24 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
     
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
50L, "3#path/to/file4.json", typedProperties);
+
+    schemaProvider = Option.empty();
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
+        .thenReturn(Option.empty());
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
50L, "3#path/to/file4.json", typedProperties);
   }
 
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
                              Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint,
                              TypedProperties typedProperties) {
     S3EventsHoodieIncrSource incrSource = new 
S3EventsHoodieIncrSource(typedProperties, jsc(),
-        spark(), mockSchemaProvider, mockQueryRunner, mockCloudDataFetcher);
+        spark(), schemaProvider.orElse(null), mockQueryRunner, 
mockCloudDataFetcher);
 
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index 13818d98c76..b4b6507e074 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -75,6 +76,22 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     Assertions.assertEquals(Collections.singletonList(expected), 
result.get().collectAsList());
   }
 
+  @Test
+  public void loadDatasetWithSchema() {
+    TypedProperties props = new TypedProperties();
+    
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
+    String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", 
"country,state");
+    List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
+    Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)));
+    Assertions.assertTrue(result.isPresent());
+    Assertions.assertEquals(1, result.get().count());
+    Row expected = RowFactory.create("some data", "US", "CA");
+    Assertions.assertEquals(Collections.singletonList(expected), 
result.get().collectAsList());
+  }
+
   @Test
   public void partitionKeyNotPresentInPath() {
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
diff --git a/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc 
b/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
new file mode 100644
index 00000000000..13cbcfff4be
--- /dev/null
+++ b/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type": "record",
+  "name": "MySchema",
+  "fields": [
+    {
+      "name": "data",
+      "type": "string"
+    }
+  ]
+}
diff --git a/hudi-utilities/src/test/resources/schema/sample_gcs_data.avsc 
b/hudi-utilities/src/test/resources/schema/sample_gcs_data.avsc
new file mode 100644
index 00000000000..de8c79fee2e
--- /dev/null
+++ b/hudi-utilities/src/test/resources/schema/sample_gcs_data.avsc
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type": "record",
+  "name": "MySchema",
+  "fields": [
+    {
+      "name": "id",
+      "type": ["null", "string"]
+    },
+    {
+      "name": "text",
+      "type": ["null", "string"]
+    }
+  ]
+}

Reply via email to