garyli1019 commented on a change in pull request #949: HUDI-40 Support Parquet
in delta streamer
URL: https://github.com/apache/incubator-hudi/pull/949#discussion_r335665089
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
##########
@@ -113,5 +114,66 @@ public void testJsonDFSSource() throws IOException {
InputBatch<JavaRDD<GenericRecord>> fetch4 =
jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()),
Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
+
+ // 5. Extract from the beginning
+ InputBatch<JavaRDD<GenericRecord>> fetch5 =
jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(10100, fetch5.getBatch().get().count());
+ }
+
+ @Test
+ public void testParquetDFSSource() throws IOException {
+ dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles"));
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath +
"/parquetFiles");
+ ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc,
sparkSession, schemaProvider);
+ SourceFormatAdapter parquetSource = new
SourceFormatAdapter(parquetDFSSource);
+
+ // 1. Extract without any checkpoint => get all the data, respecting
sourceLimit
+ assertEquals(Option.empty(),
parquetSource.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch());
+ List<GenericRecord> batch1 =
Helpers.toGenericRecord(dataGenerator.generateInserts("000", 100),
dataGenerator);
+ Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet");
+ Helpers.saveParquetToDFS(batch1, file1);
+ assertEquals(Option.empty(),
parquetSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
Review comment:
Not sure if I understand correctly. We assert `1.parquet` is equal to 100 at
line141. I think it achieves the same goal of `1.parquet` much be larger than
10. What do you think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services