pratyakshsharma commented on a change in pull request #1165: [HUDI-76] Add CSV
Source support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r379922144
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
##########
@@ -692,6 +698,146 @@ public void
testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception
testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
}
+ private void prepareCsvDFSSource(
+ boolean hasHeader, char sep, boolean useSchemaProvider, boolean
hasTransformer) throws IOException {
+ String sourceRoot = dfsBasePath + "/csvFiles";
+ String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" :
"_c0";
+
+ // Properties used for testing delta-streamer with CSV source
+ TypedProperties csvProps = new TypedProperties();
+ csvProps.setProperty("include", "base.properties");
+ csvProps.setProperty("hoodie.datasource.write.recordkey.field",
recordKeyField);
+ csvProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+ if (useSchemaProvider) {
+
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source-flattened.avsc");
+ if (hasTransformer) {
+
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target-flattened.avsc");
+ }
+ }
+ csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
+
+ if (sep != ',') {
+ if (sep == '\t') {
+ csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
+ } else {
+ csvProps.setProperty("hoodie.deltastreamer.csv.sep",
Character.toString(sep));
+ }
+ }
+ if (hasHeader) {
+ csvProps.setProperty("hoodie.deltastreamer.csv.header",
Boolean.toString(hasHeader));
+ }
+
+ UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/"
+ PROPS_FILENAME_TEST_CSV);
+
+ String path = sourceRoot + "/1.csv";
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ UtilitiesTestBase.Helpers.saveCsvToDFS(
+ hasHeader, sep,
+ Helpers.jsonifyRecords(dataGenerator.generateInserts("000",
CSV_NUM_RECORDS, true)),
+ dfs, path);
+ }
+
+ private void testCsvDFSSource(
+ boolean hasHeader, char sep, boolean useSchemaProvider, String
transformerClassName) throws Exception {
+ prepareCsvDFSSource(hasHeader, sep, useSchemaProvider,
transformerClassName != null);
+ String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
+ String sourceOrderingField = (hasHeader || useSchemaProvider) ?
"timestamp" : "_c0";
+ HoodieDeltaStreamer deltaStreamer =
+ new HoodieDeltaStreamer(TestHelpers.makeConfig(
+ tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
+ transformerClassName, PROPS_FILENAME_TEST_CSV, false,
+ useSchemaProvider, 1000, false, null, null, sourceOrderingField),
jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
+ testNum++;
+ }
+
+ @Test
+ public void
testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws
Exception {
+ // The CSV files have header, the columns are separated by ',', the
default separator
+ // No schema provider is specified, no transformer is applied
+ // In this case, the source schema comes from the inferred schema of the
CSV files
+ testCsvDFSSource(true, ',', false, null);
+ }
+
+ @Test
+ public void
testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws
Exception {
+ // The CSV files have header, the columns are separated by '\t',
+ // which is passed in through the Hudi CSV properties
+ // No schema provider is specified, no transformer is applied
+ // In this case, the source schema comes from the inferred schema of the
CSV files
+ testCsvDFSSource(true, '\t', false, null);
+ }
+
+ @Test
+ public void
testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws
Exception {
+ // The CSV files have header, the columns are separated by '\t'
+ // File schema provider is used, no transformer is applied
+ // In this case, the source schema comes from the source Arvo schema file
Review comment:
Typo - Arvo should be Avro
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services