yihua commented on issue #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer URL: https://github.com/apache/incubator-hudi/pull/1165#issuecomment-597457998 > One question about using nested schema. Can you remind me what happens if someone passes in a nested schema for CsvDeltaStreamer? I used some code below to test the nested schema for CSV reader in Spark. It throws the following exception, which means that Spark CSV source does not support nested schema currently. In most cases, the CSV schemas should be flattened. It depends on Spark's behavior whether nested schema is supported for CSV source (in the future nested schema may be supported for CSV). So we don't enforce the check in our Hudi code. ``` org.apache.spark.sql.AnalysisException: CSV data source does not support struct<amount:double,currency:string> data type.; at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:69) at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:67) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99) at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:67) at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyReadSchema(DataSourceUtils.scala:41) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:400) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188) at org.apache.hudi.utilities.sources.CsvDFSSource.fromFiles(CsvDFSSource.java:120) at org.apache.hudi.utilities.sources.CsvDFSSource.fetchNextBatch(CsvDFSSource.java:93) at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:73) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:66) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:317) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:226) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121) at org.apache.hudi.utilities.TestHoodieDeltaStreamer.testCsvDFSSourceWithNestedSchema(TestHoodieDeltaStreamer.java:812) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) ``` Appendix: a simple diff for testing nested CSV schema: ``` diff --git a/hudi-utilities/null/parquetFiles/.1.parquet.crc b/hudi-utilities/null/parquetFiles/.1.parquet.crc new file mode 100644 index 00000000..f48941c4 Binary files /dev/null and b/hudi-utilities/null/parquetFiles/.1.parquet.crc differ diff --git a/hudi-utilities/null/parquetFiles/1.parquet b/hudi-utilities/null/parquetFiles/1.parquet new file mode 100644 index 00000000..7780cb89 Binary files /dev/null and b/hudi-utilities/null/parquetFiles/1.parquet differ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 4b69d223..e2921a5f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -21,7 +21,6 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.client.HoodieWriteClient; -import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -40,6 +39,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; @@ -332,6 +332,7 @@ public class DeltaSync implements Serializable { } JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); + List<GenericRecord> r = avroRDD.collect(); JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java index 9e289f10..6f0cc8f8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java @@ -41,6 +41,7 @@ public abstract class RowSource extends Source<Dataset<Row>> { @Override protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) { Pair<Option<Dataset<Row>>, String> res = fetchNextBatch(lastCkptStr, sourceLimit); + Row[] x = (Row[]) res.getKey().get().collect(); return res.getKey().map(dsr -> { SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(dsr.schema()); return new InputBatch<>(res.getKey(), res.getValue(), rowSchemaProvider); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 43f76904..46761703 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -19,7 +19,6 @@ package org.apache.hudi.utilities; import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; @@ -33,11 +32,12 @@ import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.MultiPartKeysValueExtractor; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; @@ -101,6 +101,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; + private static final String PROPS_FILENAME_TEST_CSV_NESTED = "test-csv-dfs-source-nested.properties"; private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; private static final int PARQUET_NUM_RECORDS = 5; @@ -728,7 +729,51 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader)); } - UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV); + UtilitiesTestBase.Helpers + .savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV); + + String path = sourceRoot + "/1.csv"; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + UtilitiesTestBase.Helpers.saveCsvToDFS( + hasHeader, sep, + Helpers.jsonifyRecords(dataGenerator.generateInserts("000", CSV_NUM_RECORDS, true)), + dfs, path); + } + + private void prepareCsvDFSSourceNested( + boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) + throws IOException { + String sourceRoot = dfsBasePath + "/csvFiles"; + String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0"; + + // Properties used for testing delta-streamer with CSV source + TypedProperties csvProps = new TypedProperties(); + csvProps.setProperty("include", "base.properties"); + csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField); + csvProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + if (useSchemaProvider) { + csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", + dfsBasePath + "/source.avsc"); + if (hasTransformer) { + csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", + dfsBasePath + "/target.avsc"); + } + } + csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot); + + if (sep != ',') { + if (sep == '\t') { + csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t"); + } else { + csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep)); + } + } + if (hasHeader) { + csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader)); + } + + UtilitiesTestBase.Helpers + .savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV_NESTED); String path = sourceRoot + "/1.csv"; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); @@ -739,7 +784,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } private void testCsvDFSSource( - boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName) throws Exception { + boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName) + throws Exception { prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null); String tableBasePath = dfsBasePath + "/test_csv_table" + testNum; String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0"; @@ -753,6 +799,25 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { testNum++; } + @Test + public void testCsvDFSSourceWithNestedSchema() throws Exception { + prepareCsvDFSSourceNested(true, ',', true, false); + String tableBasePath = dfsBasePath + "/test_csv_table" + testNum; + String sourceOrderingField = "timestamp"; + HoodieDeltaStreamer deltaStreamer = + new HoodieDeltaStreamer(TestHelpers.makeConfig( + tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(), + null, PROPS_FILENAME_TEST_CSV_NESTED, false, + true, 1000, false, null, null, sourceOrderingField), jsc); + deltaStreamer.sync(); + + Row[] x = (Row[]) sqlContext.read().format("org.apache.hudi") + .load(tableBasePath + "/*/*.parquet") + .collect(); + TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + testNum++; + } + @Test public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception { // The CSV files have header, the columns are separated by ',', the default separator ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
