yihua commented on issue #1165: [HUDI-76] Add CSV Source support for Hudi Delta 
Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#issuecomment-597457998
 
 
   > One question about using nested schema. Can you remind me what happens if 
someone passes in a nested schema for CsvDeltaStreamer?
   
   I used some code below to test the nested schema for CSV reader in Spark.  
It throws the following exception, which means that Spark CSV source does not 
support nested schema currently.
   
   In most cases, the CSV schemas should be flattened.  It depends on Spark's 
behavior whether nested schema is supported for CSV source (in the future 
nested schema may be supported for CSV).  So we don't enforce the check in our 
Hudi code. 
   
   ```
   org.apache.spark.sql.AnalysisException: CSV data source does not support 
struct<amount:double,currency:string> data type.;
   
        at 
org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:69)
        at 
org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:67)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
        at 
org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:67)
        at 
org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyReadSchema(DataSourceUtils.scala:41)
        at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:400)
        at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188)
        at 
org.apache.hudi.utilities.sources.CsvDFSSource.fromFiles(CsvDFSSource.java:120)
        at 
org.apache.hudi.utilities.sources.CsvDFSSource.fetchNextBatch(CsvDFSSource.java:93)
        at 
org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
        at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:73)
        at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:66)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:317)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:226)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
        at 
org.apache.hudi.utilities.TestHoodieDeltaStreamer.testCsvDFSSourceWithNestedSchema(TestHoodieDeltaStreamer.java:812)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
   ```
   
   Appendix: a simple diff for testing nested CSV schema:
   ```
   diff --git a/hudi-utilities/null/parquetFiles/.1.parquet.crc 
b/hudi-utilities/null/parquetFiles/.1.parquet.crc
   new file mode 100644
   index 00000000..f48941c4
   Binary files /dev/null and b/hudi-utilities/null/parquetFiles/.1.parquet.crc 
differ
   diff --git a/hudi-utilities/null/parquetFiles/1.parquet 
b/hudi-utilities/null/parquetFiles/1.parquet
   new file mode 100644
   index 00000000..7780cb89
   Binary files /dev/null and b/hudi-utilities/null/parquetFiles/1.parquet 
differ
   diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
   index 4b69d223..e2921a5f 100644
   --- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
   +++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
   @@ -21,7 +21,6 @@ package org.apache.hudi.utilities.deltastreamer;
    import org.apache.hudi.AvroConversionUtils;
    import org.apache.hudi.DataSourceUtils;
    import org.apache.hudi.client.HoodieWriteClient;
   -import org.apache.hudi.keygen.KeyGenerator;
    import org.apache.hudi.client.WriteStatus;
    import org.apache.hudi.common.model.HoodieCommitMetadata;
    import org.apache.hudi.common.model.HoodieRecord;
   @@ -40,6 +39,7 @@ import org.apache.hudi.exception.HoodieException;
    import org.apache.hudi.hive.HiveSyncConfig;
    import org.apache.hudi.hive.HiveSyncTool;
    import org.apache.hudi.index.HoodieIndex;
   +import org.apache.hudi.keygen.KeyGenerator;
    import org.apache.hudi.utilities.UtilHelpers;
    import 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
    import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
   @@ -332,6 +332,7 @@ public class DeltaSync implements Serializable {
        }
    
        JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
   +    List<GenericRecord> r = avroRDD.collect();
        JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
          HoodieRecordPayload payload = 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
              (Comparable) DataSourceUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false));
   diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
   index 9e289f10..6f0cc8f8 100644
   --- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
   +++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
   @@ -41,6 +41,7 @@ public abstract class RowSource extends 
Source<Dataset<Row>> {
      @Override
      protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
        Pair<Option<Dataset<Row>>, String> res = fetchNextBatch(lastCkptStr, 
sourceLimit);
   +    Row[] x = (Row[]) res.getKey().get().collect();
        return res.getKey().map(dsr -> {
          SchemaProvider rowSchemaProvider = new 
RowBasedSchemaProvider(dsr.schema());
          return new InputBatch<>(res.getKey(), res.getValue(), 
rowSchemaProvider);
   diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
   index 43f76904..46761703 100644
   --- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
   +++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
   @@ -19,7 +19,6 @@
    package org.apache.hudi.utilities;
    
    import org.apache.hudi.DataSourceWriteOptions;
   -import org.apache.hudi.keygen.SimpleKeyGenerator;
    import org.apache.hudi.common.HoodieTestDataGenerator;
    import org.apache.hudi.common.model.HoodieCommitMetadata;
    import org.apache.hudi.common.model.HoodieTableType;
   @@ -33,11 +32,12 @@ import org.apache.hudi.common.util.FSUtils;
    import org.apache.hudi.common.util.Option;
    import org.apache.hudi.common.util.TypedProperties;
    import org.apache.hudi.config.HoodieCompactionConfig;
   -import org.apache.hudi.exception.TableNotFoundException;
    import org.apache.hudi.exception.HoodieException;
   +import org.apache.hudi.exception.TableNotFoundException;
    import org.apache.hudi.hive.HiveSyncConfig;
    import org.apache.hudi.hive.HoodieHiveClient;
    import org.apache.hudi.hive.MultiPartKeysValueExtractor;
   +import org.apache.hudi.keygen.SimpleKeyGenerator;
    import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
    import 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
    import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
   @@ -101,6 +101,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
      private static final String PROPS_FILENAME_TEST_SOURCE = 
"test-source.properties";
      private static final String PROPS_FILENAME_TEST_INVALID = 
"test-invalid.properties";
      private static final String PROPS_FILENAME_TEST_CSV = 
"test-csv-dfs-source.properties";
   +  private static final String PROPS_FILENAME_TEST_CSV_NESTED = 
"test-csv-dfs-source-nested.properties";
      private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
      private static final String PARQUET_SOURCE_ROOT = dfsBasePath + 
"/parquetFiles";
      private static final int PARQUET_NUM_RECORDS = 5;
   @@ -728,7 +729,51 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
          csvProps.setProperty("hoodie.deltastreamer.csv.header", 
Boolean.toString(hasHeader));
        }
    
   -    UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_CSV);
   +    UtilitiesTestBase.Helpers
   +        .savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_CSV);
   +
   +    String path = sourceRoot + "/1.csv";
   +    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
   +    UtilitiesTestBase.Helpers.saveCsvToDFS(
   +        hasHeader, sep,
   +        Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 
CSV_NUM_RECORDS, true)),
   +        dfs, path);
   +  }
   +
   +  private void prepareCsvDFSSourceNested(
   +      boolean hasHeader, char sep, boolean useSchemaProvider, boolean 
hasTransformer)
   +      throws IOException {
   +    String sourceRoot = dfsBasePath + "/csvFiles";
   +    String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : 
"_c0";
   +
   +    // Properties used for testing delta-streamer with CSV source
   +    TypedProperties csvProps = new TypedProperties();
   +    csvProps.setProperty("include", "base.properties");
   +    csvProps.setProperty("hoodie.datasource.write.recordkey.field", 
recordKeyField);
   +    csvProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
   +    if (useSchemaProvider) {
   +      
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
   +          dfsBasePath + "/source.avsc");
   +      if (hasTransformer) {
   +        
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
   +            dfsBasePath + "/target.avsc");
   +      }
   +    }
   +    csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
sourceRoot);
   +
   +    if (sep != ',') {
   +      if (sep == '\t') {
   +        csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
   +      } else {
   +        csvProps.setProperty("hoodie.deltastreamer.csv.sep", 
Character.toString(sep));
   +      }
   +    }
   +    if (hasHeader) {
   +      csvProps.setProperty("hoodie.deltastreamer.csv.header", 
Boolean.toString(hasHeader));
   +    }
   +
   +    UtilitiesTestBase.Helpers
   +        .savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_CSV_NESTED);
    
        String path = sourceRoot + "/1.csv";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
   @@ -739,7 +784,8 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
      }
    
      private void testCsvDFSSource(
   -      boolean hasHeader, char sep, boolean useSchemaProvider, String 
transformerClassName) throws Exception {
   +      boolean hasHeader, char sep, boolean useSchemaProvider, String 
transformerClassName)
   +      throws Exception {
        prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, 
transformerClassName != null);
        String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
        String sourceOrderingField = (hasHeader || useSchemaProvider) ? 
"timestamp" : "_c0";
   @@ -753,6 +799,25 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
        testNum++;
      }
    
   +  @Test
   +  public void testCsvDFSSourceWithNestedSchema() throws Exception {
   +    prepareCsvDFSSourceNested(true, ',', true, false);
   +    String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
   +    String sourceOrderingField = "timestamp";
   +    HoodieDeltaStreamer deltaStreamer =
   +        new HoodieDeltaStreamer(TestHelpers.makeConfig(
   +            tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
   +            null, PROPS_FILENAME_TEST_CSV_NESTED, false,
   +            true, 1000, false, null, null, sourceOrderingField), jsc);
   +    deltaStreamer.sync();
   +
   +    Row[] x = (Row[]) sqlContext.read().format("org.apache.hudi")
   +        .load(tableBasePath + "/*/*.parquet")
   +        .collect();
   +    TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
   +    testNum++;
   +  }
   +
      @Test
      public void 
testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws 
Exception {
        // The CSV files have header, the columns are separated by ',', the 
default separator
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to