This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cf765df [HUDI-76] Add CSV Source support for Hudi Delta Streamer
new a752b7b Merge pull request #1165 from
yihua/HUDI-76-deltastreamer-csv-source
cf765df is described below
commit cf765df6062f896ad6bf0a8ddf711aad19dbf59b
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat Jan 18 23:11:14 2020 -0800
[HUDI-76] Add CSV Source support for Hudi Delta Streamer
---
.../hudi/common/HoodieTestDataGenerator.java | 92 +++++++++---
hudi-utilities/pom.xml | 5 +
.../hudi/utilities/sources/CsvDFSSource.java | 126 ++++++++++++++++
.../hudi/utilities/TestHoodieDeltaStreamer.java | 164 +++++++++++++++++++--
.../apache/hudi/utilities/UtilitiesTestBase.java | 60 +++++++-
.../utilities/sources/AbstractBaseTestSource.java | 2 +-
.../sources/AbstractDFSSourceTestBase.java | 3 +-
.../hudi/utilities/sources/TestCsvDFSSource.java | 61 ++++++++
.../delta-streamer-config/source-flattened.avsc | 57 +++++++
.../delta-streamer-config/target-flattened.avsc | 60 ++++++++
10 files changed, 597 insertions(+), 33 deletions(-)
diff --git
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 6d86e93..3307881 100644
---
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -74,20 +74,30 @@ public class HoodieTestDataGenerator {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH,
DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3;
- public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," +
"\"name\": \"triprec\"," + "\"fields\": [ "
+ public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," +
"\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\":
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\":
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\":
\"begin_lon\", \"type\": \"double\"},"
- + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\":
\"end_lon\", \"type\": \"double\"},"
- + "{\"name\": \"fare\",\"type\": {\"type\":\"record\",
\"name\":\"fare\",\"fields\": ["
- + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\",
\"type\": \"string\"}]}},"
- + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\",
\"default\": false} ]}";
+ + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\":
\"end_lon\", \"type\": \"double\"},";
+ public static final String TRIP_SCHEMA_SUFFIX = "{\"name\":
\"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
+ public static final String FARE_NESTED_SCHEMA = "{\"name\":
\"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
+ + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\",
\"type\": \"string\"}]}},";
+ public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\",
\"type\": \"double\"},"
+ + "{\"name\": \"currency\", \"type\": \"string\"},";
+
+ public static final String TRIP_EXAMPLE_SCHEMA =
+ TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+ public static final String TRIP_FLATTENED_SCHEMA =
+ TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+
public static final String NULL_SCHEMA =
Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES =
"double,string,string,string,double,double,double,double,"
+
"struct<amount:double,currency:string>,boolean";
+
public static final Schema AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
+ public static final Schema FLATTENED_AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
private static final Random RAND = new Random(46474747);
@@ -115,10 +125,33 @@ public class HoodieTestDataGenerator {
}
/**
- * Generates a new avro record of the above schema format, retaining the key
if optionally provided.
+ * Generates a new avro record of the above nested schema format,
+ * retaining the key if optionally provided.
+ *
+ * @param key Hoodie key.
+ * @param commitTime Commit time to use.
+ * @return Raw paylaod of a test record.
+ * @throws IOException
*/
public static TestRawTripPayload generateRandomValue(HoodieKey key, String
commitTime) throws IOException {
- GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" +
commitTime, "driver-" + commitTime, 0.0);
+ return generateRandomValue(key, commitTime, false);
+ }
+
+ /**
+ * Generates a new avro record with the specified schema (nested or
flattened),
+ * retaining the key if optionally provided.
+ *
+ * @param key Hoodie key.
+ * @param commitTime Commit time to use.
+ * @param isFlattened whether the schema of the record should be flattened.
+ * @return Raw paylaod of a test record.
+ * @throws IOException
+ */
+ public static TestRawTripPayload generateRandomValue(
+ HoodieKey key, String commitTime, boolean isFlattened) throws
IOException {
+ GenericRecord rec = generateGenericRecord(
+ key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
+ false, isFlattened);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(),
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}
@@ -127,7 +160,7 @@ public class HoodieTestDataGenerator {
*/
public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key,
String commitTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" +
commitTime, "driver-" + commitTime, 0.0,
- true);
+ true, false);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(),
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}
@@ -141,12 +174,13 @@ public class HoodieTestDataGenerator {
public static GenericRecord generateGenericRecord(String rowKey, String
riderName, String driverName,
double timestamp) {
- return generateGenericRecord(rowKey, riderName, driverName, timestamp,
false);
+ return generateGenericRecord(rowKey, riderName, driverName, timestamp,
false, false);
}
public static GenericRecord generateGenericRecord(String rowKey, String
riderName, String driverName,
- double timestamp, boolean
isDeleteRecord) {
- GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
+ double timestamp, boolean
isDeleteRecord,
+ boolean isFlattened) {
+ GenericRecord rec = new GenericData.Record(isFlattened ?
FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
rec.put("rider", riderName);
@@ -156,10 +190,15 @@ public class HoodieTestDataGenerator {
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
- GenericRecord fareRecord = new
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
- fareRecord.put("amount", RAND.nextDouble() * 100);
- fareRecord.put("currency", "USD");
- rec.put("fare", fareRecord);
+ if (isFlattened) {
+ rec.put("fare", RAND.nextDouble() * 100);
+ rec.put("currency", "USD");
+ } else {
+ GenericRecord fareRecord = new
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
+ fareRecord.put("amount", RAND.nextDouble() * 100);
+ fareRecord.put("currency", "USD");
+ rec.put("fare", fareRecord);
+ }
if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true);
@@ -230,16 +269,31 @@ public class HoodieTestDataGenerator {
}
/**
- * Generates new inserts, uniformly across the partition paths above. It
also updates the list of existing keys.
+ * Generates new inserts with nested schema, uniformly across the partition
paths above.
+ * It also updates the list of existing keys.
*/
public List<HoodieRecord> generateInserts(String commitTime, Integer n) {
- return generateInsertsStream(commitTime, n).collect(Collectors.toList());
+ return generateInserts(commitTime, n, false);
+ }
+
+ /**
+ * Generates new inserts, uniformly across the partition paths above.
+ * It also updates the list of existing keys.
+ *
+ * @param commitTime Commit time to use.
+ * @param n Number of records.
+ * @param isFlattened whether the schema of the generated record is
flattened
+ * @return List of {@link HoodieRecord}s
+ */
+ public List<HoodieRecord> generateInserts(String commitTime, Integer n,
boolean isFlattened) {
+ return generateInsertsStream(commitTime, n,
isFlattened).collect(Collectors.toList());
}
/**
* Generates new inserts, uniformly across the partition paths above. It
also updates the list of existing keys.
*/
- public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer
n) {
+ public Stream<HoodieRecord> generateInsertsStream(
+ String commitTime, Integer n, boolean isFlattened) {
int currSize = getNumExistingKeys();
return IntStream.range(0, n).boxed().map(i -> {
@@ -251,7 +305,7 @@ public class HoodieTestDataGenerator {
existingKeys.put(currSize + i, kp);
numExistingKeys++;
try {
- return new HoodieRecord(key, generateRandomValue(key, commitTime));
+ return new HoodieRecord(key, generateRandomValue(key, commitTime,
isFlattened));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index f82243b..2324ae8 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -137,6 +137,11 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-csv</artifactId>
+ <version>${fasterxml.version}</version>
+ </dependency>
<!-- Parquet -->
<dependency>
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
new file mode 100644
index 0000000..b8ccd6e
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.avro.SchemaConverters;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Reads data from CSV files on DFS as the data source.
+ *
+ * Internally, we use Spark to read CSV files thus any limitation of Spark CSV
also applies here
+ * (e.g., limited support for nested schema).
+ *
+ * You can set the CSV-specific configs in the format of
hoodie.deltastreamer.csv.*
+ * that are Spark compatible to deal with CSV files in Hudi. The supported
options are:
+ *
+ * "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping",
"comment",
+ * "header", "enforceSchema", "inferSchema", "samplingRatio",
"ignoreLeadingWhiteSpace",
+ * "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue",
"positiveInf",
+ * "negativeInf", "dateFormat", "timestampFormat", "maxColumns",
"maxCharsPerColumn",
+ * "mode", "columnNameOfCorruptRecord", "multiLine"
+ *
+ * Detailed information of these CSV options can be found at:
+ *
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv-scala.collection.Seq-
+ *
+ * If the source Avro schema is provided through the {@link
org.apache.hudi.utilities.schema.FilebasedSchemaProvider}
+ * using "hoodie.deltastreamer.schemaprovider.source.schema.file" config, the
schema is
+ * passed to the CSV reader without inferring the schema from the CSV file.
+ */
+public class CsvDFSSource extends RowSource {
+ // CsvSource config prefix
+ public static final String CSV_SRC_CONFIG_PREFIX =
"hoodie.deltastreamer.csv.";
+ // CSV-specific configurations to pass in from Hudi to Spark
+ public static final List<String> CSV_CONFIG_KEYS = Arrays.asList(
+ "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping",
"comment",
+ "header", "enforceSchema", "inferSchema", "samplingRatio",
"ignoreLeadingWhiteSpace",
+ "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue",
"positiveInf",
+ "negativeInf", "dateFormat", "timestampFormat", "maxColumns",
"maxCharsPerColumn",
+ "mode", "columnNameOfCorruptRecord", "multiLine"
+ );
+
+ private final DFSPathSelector pathSelector;
+ private final StructType sourceSchema;
+
+ public CsvDFSSource(TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+ this.pathSelector = new DFSPathSelector(props,
sparkContext.hadoopConfiguration());
+ if (schemaProvider != null) {
+ sourceSchema = (StructType)
SchemaConverters.toSqlType(schemaProvider.getSourceSchema())
+ .dataType();
+ } else {
+ sourceSchema = null;
+ }
+ }
+
+ @Override
+ protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr,
+ long sourceLimit) {
+ Pair<Option<String>, String> selPathsWithMaxModificationTime =
+ pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr,
sourceLimit);
+ return Pair.of(fromFiles(
+ selPathsWithMaxModificationTime.getLeft()),
selPathsWithMaxModificationTime.getRight());
+ }
+
+ /**
+ * Reads the CSV files and parsed the lines into {@link Dataset} of {@link
Row}.
+ *
+ * @param pathStr The list of file paths, separated by ','.
+ * @return {@link Dataset} of {@link Row} containing the records.
+ */
+ private Option<Dataset<Row>> fromFiles(Option<String> pathStr) {
+ if (pathStr.isPresent()) {
+ DataFrameReader dataFrameReader = sparkSession.read().format("csv");
+ CSV_CONFIG_KEYS.forEach(optionKey -> {
+ String configPropName = CSV_SRC_CONFIG_PREFIX + optionKey;
+ String value = props.getString(configPropName, null);
+ // Pass down the Hudi CSV configs to Spark DataFrameReader
+ if (value != null) {
+ dataFrameReader.option(optionKey, value);
+ }
+ });
+ if (sourceSchema != null) {
+ // Source schema is specified, pass it to the reader
+ dataFrameReader.schema(sourceSchema);
+ }
+ dataFrameReader.option("inferSchema", Boolean.toString(sourceSchema ==
null));
+
+ return Option.of(dataFrameReader.load(pathStr.get().split(",")));
+ } else {
+ return Option.empty();
+ }
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 100faa2..9224be0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -19,8 +19,8 @@
package org.apache.hudi.utilities;
import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -42,6 +42,7 @@ import
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
@@ -60,6 +61,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@@ -98,12 +100,14 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
private static final Random RANDOM = new Random();
private static final String PROPS_FILENAME_TEST_SOURCE =
"test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID =
"test-invalid.properties";
+ private static final String PROPS_FILENAME_TEST_CSV =
"test-csv-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_PARQUET =
"test-parquet-dfs-source.properties";
private static final String PARQUET_SOURCE_ROOT = dfsBasePath +
"/parquetFiles";
private static final int PARQUET_NUM_RECORDS = 5;
+ private static final int CSV_NUM_RECORDS = 3;
private static final Logger LOG =
LogManager.getLogger(TestHoodieDeltaStreamer.class);
- private static int parquetTestNum = 1;
+ private static int testNum = 1;
@BeforeClass
public static void initClass() throws Exception {
@@ -114,7 +118,9 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
dfs,
dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc",
dfs, dfsBasePath + "/source.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
dfs, dfsBasePath + "/source-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc",
dfs, dfsBasePath + "/target.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc",
dfs, dfsBasePath + "/target-flattened.avsc");
TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties");
@@ -197,12 +203,12 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
String propsFilename, boolean enableHiveSync, boolean
useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName,
String tableType) {
return makeConfig(basePath, op, TestDataSource.class.getName(),
transformerClassName, propsFilename, enableHiveSync,
- useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName,
tableType);
+ useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName,
tableType, "timestamp");
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation
op, String sourceClassName,
String transformerClassName, String propsFilename, boolean
enableHiveSync, boolean useSchemaProviderClass,
- int sourceLimit, boolean updatePayloadClass, String payloadClassName,
String tableType) {
+ int sourceLimit, boolean updatePayloadClass, String payloadClassName,
String tableType, String sourceOrderingField) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
@@ -211,7 +217,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
cfg.transformerClassName = transformerClassName;
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
- cfg.sourceOrderingField = "timestamp";
+ cfg.sourceOrderingField = sourceOrderingField;
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = sourceLimit;
if (updatePayloadClass) {
@@ -653,7 +659,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
if (hasTransformer) {
-
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/target.avsc");
+
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root",
PARQUET_SOURCE_ROOT);
@@ -663,14 +669,14 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
private void testParquetDFSSource(boolean useSchemaProvider, String
transformerClassName) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
- String tableBasePath = dfsBasePath + "/test_parquet_table" +
parquetTestNum;
+ String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, Operation.INSERT,
ParquetDFSSource.class.getName(),
transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
- useSchemaProvider, 100000, false, null, null), jsc);
+ useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
- parquetTestNum++;
+ testNum++;
}
@Test
@@ -693,6 +699,146 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
}
+ private void prepareCsvDFSSource(
+ boolean hasHeader, char sep, boolean useSchemaProvider, boolean
hasTransformer) throws IOException {
+ String sourceRoot = dfsBasePath + "/csvFiles";
+ String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" :
"_c0";
+
+ // Properties used for testing delta-streamer with CSV source
+ TypedProperties csvProps = new TypedProperties();
+ csvProps.setProperty("include", "base.properties");
+ csvProps.setProperty("hoodie.datasource.write.recordkey.field",
recordKeyField);
+ csvProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+ if (useSchemaProvider) {
+
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source-flattened.avsc");
+ if (hasTransformer) {
+
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target-flattened.avsc");
+ }
+ }
+ csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
+
+ if (sep != ',') {
+ if (sep == '\t') {
+ csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
+ } else {
+ csvProps.setProperty("hoodie.deltastreamer.csv.sep",
Character.toString(sep));
+ }
+ }
+ if (hasHeader) {
+ csvProps.setProperty("hoodie.deltastreamer.csv.header",
Boolean.toString(hasHeader));
+ }
+
+ UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/"
+ PROPS_FILENAME_TEST_CSV);
+
+ String path = sourceRoot + "/1.csv";
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ UtilitiesTestBase.Helpers.saveCsvToDFS(
+ hasHeader, sep,
+ Helpers.jsonifyRecords(dataGenerator.generateInserts("000",
CSV_NUM_RECORDS, true)),
+ dfs, path);
+ }
+
+ private void testCsvDFSSource(
+ boolean hasHeader, char sep, boolean useSchemaProvider, String
transformerClassName) throws Exception {
+ prepareCsvDFSSource(hasHeader, sep, useSchemaProvider,
transformerClassName != null);
+ String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
+ String sourceOrderingField = (hasHeader || useSchemaProvider) ?
"timestamp" : "_c0";
+ HoodieDeltaStreamer deltaStreamer =
+ new HoodieDeltaStreamer(TestHelpers.makeConfig(
+ tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
+ transformerClassName, PROPS_FILENAME_TEST_CSV, false,
+ useSchemaProvider, 1000, false, null, null, sourceOrderingField),
jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
+ testNum++;
+ }
+
+ @Test
+ public void
testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws
Exception {
+ // The CSV files have header, the columns are separated by ',', the
default separator
+ // No schema provider is specified, no transformer is applied
+ // In this case, the source schema comes from the inferred schema of the
CSV files
+ testCsvDFSSource(true, ',', false, null);
+ }
+
+ @Test
+ public void
testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws
Exception {
+ // The CSV files have header, the columns are separated by '\t',
+ // which is passed in through the Hudi CSV properties
+ // No schema provider is specified, no transformer is applied
+ // In this case, the source schema comes from the inferred schema of the
CSV files
+ testCsvDFSSource(true, '\t', false, null);
+ }
+
+ @Test
+ public void
testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws
Exception {
+ // The CSV files have header, the columns are separated by '\t'
+ // File schema provider is used, no transformer is applied
+ // In this case, the source schema comes from the source Avro schema file
+ testCsvDFSSource(true, '\t', true, null);
+ }
+
+ @Test
+ public void
testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer()
throws Exception {
+ // The CSV files have header, the columns are separated by '\t'
+ // No schema provider is specified, transformer is applied
+ // In this case, the source schema comes from the inferred schema of the
CSV files.
+ // Target schema is determined based on the Dataframe after transformation
+ testCsvDFSSource(true, '\t', false,
TripsWithDistanceTransformer.class.getName());
+ }
+
+ @Test
+ public void
testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws
Exception {
+ // The CSV files have header, the columns are separated by '\t'
+ // File schema provider is used, transformer is applied
+ // In this case, the source and target schema come from the Avro schema
files
+ testCsvDFSSource(true, '\t', true,
TripsWithDistanceTransformer.class.getName());
+ }
+
+ @Test
+ public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer()
throws Exception {
+ // The CSV files do not have header, the columns are separated by '\t',
+ // which is passed in through the Hudi CSV properties
+ // No schema provider is specified, no transformer is applied
+ // In this case, the source schema comes from the inferred schema of the
CSV files
+ // No CSV header and no schema provider at the same time are not
recommended
+ // as the column names are not informative
+ testCsvDFSSource(false, '\t', false, null);
+ }
+
+ @Test
+ public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer()
throws Exception {
+ // The CSV files do not have header, the columns are separated by '\t'
+ // File schema provider is used, no transformer is applied
+ // In this case, the source schema comes from the source Avro schema file
+ testCsvDFSSource(false, '\t', true, null);
+ }
+
+ @Test
+ public void
testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws
Exception {
+ // The CSV files do not have header, the columns are separated by '\t'
+ // No schema provider is specified, transformer is applied
+ // In this case, the source schema comes from the inferred schema of the
CSV files.
+ // Target schema is determined based on the Dataframe after transformation
+ // No CSV header and no schema provider at the same time are not
recommended,
+ // as the transformer behavior may be unexpected
+ try {
+ testCsvDFSSource(false, '\t', false,
TripsWithDistanceTransformer.class.getName());
+ fail("Should error out when doing the transformation.");
+ } catch (AnalysisException e) {
+ LOG.error("Expected error during transformation", e);
+ assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given
input columns:"));
+ }
+ }
+
+ @Test
+ public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer()
throws Exception {
+ // The CSV files do not have header, the columns are separated by '\t'
+ // File schema provider is used, transformer is applied
+ // In this case, the source and target schema come from the Avro schema
files
+ testCsvDFSSource(false, '\t', true,
TripsWithDistanceTransformer.class.getName());
+ }
+
/**
* UDF to calculate Haversine distance.
*/
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
index 1fcd99a..abf6578 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
@@ -27,11 +27,20 @@ import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.util.HiveTestService;
import org.apache.hudi.utilities.sources.TestDataSource;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
import com.google.common.collect.ImmutableList;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -42,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -56,6 +66,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/**
@@ -72,6 +83,7 @@ public class UtilitiesTestBase {
protected transient SparkSession sparkSession = null;
protected transient SQLContext sqlContext;
protected static HiveServer2 hiveServer;
+ private static ObjectMapper mapper = new ObjectMapper();
@BeforeClass
public static void initClass() throws Exception {
@@ -193,9 +205,47 @@ public class UtilitiesTestBase {
os.close();
}
+ /**
+ * Converts the json records into CSV format and writes to a file.
+ *
+ * @param hasHeader whether the CSV file should have a header line.
+ * @param sep the column separator to use.
+ * @param lines the records in JSON format.
+ * @param fs {@link FileSystem} instance.
+ * @param targetPath File path.
+ * @throws IOException
+ */
+ public static void saveCsvToDFS(
+ boolean hasHeader, char sep,
+ String[] lines, FileSystem fs, String targetPath) throws IOException {
+ Builder csvSchemaBuilder = CsvSchema.builder();
+
+ ArrayNode arrayNode = mapper.createArrayNode();
+ Arrays.stream(lines).forEachOrdered(
+ line -> {
+ try {
+ arrayNode.add(mapper.readValue(line, ObjectNode.class));
+ } catch (IOException e) {
+ throw new HoodieIOException(
+ "Error converting json records into CSV format: " +
e.getMessage());
+ }
+ });
+
arrayNode.get(0).fieldNames().forEachRemaining(csvSchemaBuilder::addColumn);
+ ObjectWriter csvObjWriter = new CsvMapper()
+ .writerFor(JsonNode.class)
+
.with(csvSchemaBuilder.setUseHeader(hasHeader).setColumnSeparator(sep).build());
+ PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
+ csvObjWriter.writeValue(os, arrayNode);
+ os.flush();
+ os.close();
+ }
+
public static void saveParquetToDFS(List<GenericRecord> records, Path
targetFile) throws IOException {
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(targetFile)
-
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build())
{
+ .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
+ .withConf(HoodieTestUtils.getDefaultHadoopConf())
+ .withWriteMode(Mode.OVERWRITE)
+ .build()) {
for (GenericRecord record : records) {
writer.write(record);
}
@@ -203,9 +253,13 @@ public class UtilitiesTestBase {
}
public static TypedProperties setupSchemaOnDFS() throws IOException {
- UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc",
dfs, dfsBasePath + "/source.avsc");
+ return setupSchemaOnDFS("source.avsc");
+ }
+
+ public static TypedProperties setupSchemaOnDFS(String filename) throws
IOException {
+ UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/" + filename,
dfs, dfsBasePath + "/" + filename);
TypedProperties props = new TypedProperties();
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
+
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/" + filename);
return props;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
index c8dc5d5..175edde 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
@@ -123,7 +123,7 @@ public abstract class AbstractBaseTestSource extends
AvroSource {
updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime,
numUpdates)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr,
dataGenerator));
}
- Stream<GenericRecord> insertStream =
dataGenerator.generateInsertsStream(commitTime, numInserts)
+ Stream<GenericRecord> insertStream =
dataGenerator.generateInsertsStream(commitTime, numInserts, false)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
return Stream.concat(deleteStream, Stream.concat(updateStream,
insertStream));
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
index 5815317..42cbebc 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
@@ -56,6 +56,7 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
String dfsRoot;
String fileSuffix;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ boolean useFlattenedSchema = false;
@BeforeClass
public static void initClass() throws Exception {
@@ -105,7 +106,7 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
*/
Path generateOneFile(String filename, String commitTime, int n) throws
IOException {
Path path = new Path(dfsRoot, filename + fileSuffix);
- writeNewDataToFile(dataGenerator.generateInserts(commitTime, n), path);
+ writeNewDataToFile(dataGenerator.generateInserts(commitTime, n,
useFlattenedSchema), path);
return path;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
new file mode 100644
index 0000000..fbb6d8f
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.UtilitiesTestBase;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Basic tests for {@link CsvDFSSource}.
+ */
+public class TestCsvDFSSource extends AbstractDFSSourceTestBase {
+
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+ this.dfsRoot = dfsBasePath + "/jsonFiles";
+ this.fileSuffix = ".json";
+ this.useFlattenedSchema = true;
+ this.schemaProvider = new FilebasedSchemaProvider(
+ Helpers.setupSchemaOnDFS("source-flattened.avsc"), jsc);
+ }
+
+ @Override
+ Source prepareDFSSource() {
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+ props.setProperty("hoodie.deltastreamer.csv.header",
Boolean.toString(true));
+ props.setProperty("hoodie.deltastreamer.csv.sep", "\t");
+ return new CsvDFSSource(props, jsc, sparkSession, schemaProvider);
+ }
+
+ @Override
+ void writeNewDataToFile(List<HoodieRecord> records, Path path) throws
IOException {
+ UtilitiesTestBase.Helpers.saveCsvToDFS(
+ true, '\t', Helpers.jsonifyRecords(records), dfs, path.toString());
+ }
+}
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc
b/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc
new file mode 100644
index 0000000..ed3a7be
--- /dev/null
+++
b/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "type" : "record",
+ "name" : "triprec",
+ "fields" : [
+ {
+ "name" : "timestamp",
+ "type" : "double"
+ }, {
+ "name" : "_row_key",
+ "type" : "string"
+ }, {
+ "name" : "rider",
+ "type" : "string"
+ }, {
+ "name" : "driver",
+ "type" : "string"
+ }, {
+ "name" : "begin_lat",
+ "type" : "double"
+ }, {
+ "name" : "begin_lon",
+ "type" : "double"
+ }, {
+ "name" : "end_lat",
+ "type" : "double"
+ }, {
+ "name" : "end_lon",
+ "type" : "double"
+ }, {
+ "name" : "fare",
+ "type" : "double"
+ }, {
+ "name" : "currency",
+ "type" : "string"
+ }, {
+ "name" : "_hoodie_is_deleted",
+ "type" : "boolean",
+ "default" : false
+ } ]
+}
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc
b/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc
new file mode 100644
index 0000000..4e9e4af
--- /dev/null
+++
b/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "type" : "record",
+ "name" : "triprec",
+ "fields" : [
+ {
+ "name" : "timestamp",
+ "type" : "double"
+ }, {
+ "name" : "_row_key",
+ "type" : "string"
+ }, {
+ "name" : "rider",
+ "type" : "string"
+ }, {
+ "name" : "driver",
+ "type" : "string"
+ }, {
+ "name" : "begin_lat",
+ "type" : "double"
+ }, {
+ "name" : "begin_lon",
+ "type" : "double"
+ }, {
+ "name" : "end_lat",
+ "type" : "double"
+ }, {
+ "name" : "end_lon",
+ "type" : "double"
+ }, {
+ "name" : "fare",
+ "type" : "double"
+ }, {
+ "name" : "currency",
+ "type" : "string"
+ }, {
+ "name" : "_hoodie_is_deleted",
+ "type" : "boolean",
+ "default" : false
+ }, {
+ "name" : "haversine_distance",
+ "type" : "double"
+ }]
+}