yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source 
support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r390761706
 
 

 ##########
 File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 ##########
 @@ -693,6 +699,146 @@ public void 
testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception
     testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
   }
 
+  private void prepareCsvDFSSource(
+      boolean hasHeader, char sep, boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
+    String sourceRoot = dfsBasePath + "/csvFiles";
+    String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : 
"_c0";
+
+    // Properties used for testing delta-streamer with CSV source
+    TypedProperties csvProps = new TypedProperties();
+    csvProps.setProperty("include", "base.properties");
+    csvProps.setProperty("hoodie.datasource.write.recordkey.field", 
recordKeyField);
+    csvProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+    if (useSchemaProvider) {
+      
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source-flattened.avsc");
+      if (hasTransformer) {
+        
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target-flattened.avsc");
+      }
+    }
+    csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
+
+    if (sep != ',') {
+      if (sep == '\t') {
+        csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
+      } else {
+        csvProps.setProperty("hoodie.deltastreamer.csv.sep", 
Character.toString(sep));
+      }
+    }
+    if (hasHeader) {
+      csvProps.setProperty("hoodie.deltastreamer.csv.header", 
Boolean.toString(hasHeader));
+    }
+
+    UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" 
+ PROPS_FILENAME_TEST_CSV);
+
+    String path = sourceRoot + "/1.csv";
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    UtilitiesTestBase.Helpers.saveCsvToDFS(
+        hasHeader, sep,
+        Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 
CSV_NUM_RECORDS, true)),
+        dfs, path);
+  }
+
+  private void testCsvDFSSource(
+      boolean hasHeader, char sep, boolean useSchemaProvider, String 
transformerClassName) throws Exception {
+    prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, 
transformerClassName != null);
+    String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
+    String sourceOrderingField = (hasHeader || useSchemaProvider) ? 
"timestamp" : "_c0";
+    HoodieDeltaStreamer deltaStreamer =
+        new HoodieDeltaStreamer(TestHelpers.makeConfig(
+            tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
+            transformerClassName, PROPS_FILENAME_TEST_CSV, false,
+            useSchemaProvider, 1000, false, null, null, sourceOrderingField), 
jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
+    testNum++;
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws 
Exception {
+    // The CSV files have header, the columns are separated by ',', the 
default separator
+    // No schema provider is specified, no transformer is applied
+    // In this case, the source schema comes from the inferred schema of the 
CSV files
+    testCsvDFSSource(true, ',', false, null);
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws 
Exception {
+    // The CSV files have header, the columns are separated by '\t',
+    // which is passed in through the Hudi CSV properties
+    // No schema provider is specified, no transformer is applied
+    // In this case, the source schema comes from the inferred schema of the 
CSV files
+    testCsvDFSSource(true, '\t', false, null);
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws 
Exception {
+    // The CSV files have header, the columns are separated by '\t'
+    // File schema provider is used, no transformer is applied
+    // In this case, the source schema comes from the source Avro schema file
+    testCsvDFSSource(true, '\t', true, null);
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() 
throws Exception {
+    // The CSV files have header, the columns are separated by '\t'
+    // No schema provider is specified, transformer is applied
+    // In this case, the source schema comes from the inferred schema of the 
CSV files.
+    // Target schema is determined based on the Dataframe after transformation
+    testCsvDFSSource(true, '\t', false, 
TripsWithDistanceTransformer.class.getName());
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws 
Exception {
+    // The CSV files have header, the columns are separated by '\t'
+    // File schema provider is used, transformer is applied
+    // In this case, the source and target schema come from the Avro schema 
files
+    testCsvDFSSource(true, '\t', true, 
TripsWithDistanceTransformer.class.getName());
+  }
+
+  @Test
+  public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() 
throws Exception {
+    // The CSV files do not have header, the columns are separated by '\t',
+    // which is passed in through the Hudi CSV properties
+    // No schema provider is specified, no transformer is applied
+    // In this case, the source schema comes from the inferred schema of the 
CSV files
+    // No CSV header and no schema provider at the same time are not 
recommended
 
 Review comment:
   The schema can still be inferred from the CSV source.  Although this might 
not be a common use case, I don't want to rule this out, since Spark CSV data 
source supports schema inference.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to