This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ed745df [HUDI-40] Add parquet support for the Delta Streamer (#949)
ed745df is described below
commit ed745dfdbf254bfc2ec6d9c7baed8ccbf571abab
Author: YanJia-Gary-Li <[email protected]>
AuthorDate: Wed Oct 16 21:11:59 2019 -0700
[HUDI-40] Add parquet support for the Delta Streamer (#949)
---
.../deltastreamer/SourceFormatAdapter.java | 15 ++++
.../hudi/utilities/sources/ParquetDFSSource.java | 60 ++++++++++++++++
.../hudi/utilities/sources/ParquetSource.java | 34 +++++++++
.../org/apache/hudi/utilities/sources/Source.java | 2 +-
.../apache/hudi/utilities/UtilitiesTestBase.java | 35 +++++++++
.../hudi/utilities/sources/TestDFSSource.java | 83 ++++++++++++++++++++--
6 files changed, 224 insertions(+), 5 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index 4df948e..e44ba53 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
+import org.apache.hudi.utilities.sources.ParquetSource;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -59,6 +60,8 @@ public final class SourceFormatAdapter {
switch (source.getSourceType()) {
case AVRO:
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
+ case PARQUET:
+ return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource)
source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new
AvroConvertor(r.getSchemaProvider().getSourceSchema());
@@ -99,6 +102,18 @@ public final class SourceFormatAdapter {
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
+ case PARQUET: {
+ InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource)
source).fetchNext(lastCkptStr, sourceLimit);
+ Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
+ return new InputBatch<>(
+ Option
+ .ofNullable(
+ r.getBatch()
+ .map(rdd ->
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
+ source.getSparkSession()))
+ .orElse(null)),
+ r.getCheckpointForNextBatch(), r.getSchemaProvider());
+ }
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource)
source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
new file mode 100644
index 0000000..22ac3f9
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+import org.apache.parquet.avro.AvroParquetInputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * DFS Source that reads parquet data
+ */
+public class ParquetDFSSource extends ParquetSource {
+
+ private final DFSPathSelector pathSelector;
+
+ public ParquetDFSSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+ this.pathSelector = new DFSPathSelector(props,
this.sparkContext.hadoopConfiguration());
+ }
+
+ @Override
+ protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String>
lastCkptStr, long sourceLimit) {
+ Pair<Option<String>, String> selectPathsWithMaxModificationTime =
+ pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr,
sourceLimit);
+ return selectPathsWithMaxModificationTime.getLeft()
+ .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)),
selectPathsWithMaxModificationTime.getRight()))
+ .orElseGet(() -> new InputBatch<>(Option.empty(),
selectPathsWithMaxModificationTime.getRight()));
+ }
+
+ private JavaRDD<GenericRecord> fromFiles(String pathStr) {
+ JavaPairRDD<Void, GenericRecord> avroRDD =
sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class,
+ Void.class, GenericRecord.class, sparkContext.hadoopConfiguration());
+ return avroRDD.values();
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
new file mode 100644
index 0000000..edcc688
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> {
+
+ public ParquetSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider,
SourceType.PARQUET);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index ea57f4b..0ed1e6c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -34,7 +34,7 @@ public abstract class Source<T> implements Serializable {
protected static volatile Logger log = LogManager.getLogger(Source.class);
public enum SourceType {
- JSON, AVRO, ROW
+ JSON, AVRO, ROW, PARQUET
}
protected transient TypedProperties props;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
index 2125483..46b0dab 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
@@ -23,23 +23,31 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
+import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.util.HiveTestService;
import org.apache.hudi.utilities.sources.TestDataSource;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
@@ -178,6 +186,15 @@ public class UtilitiesTestBase {
os.close();
}
+ public static void saveParquetToDFS(List<GenericRecord> records, Path
targetFile) throws IOException {
+ try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(targetFile)
+
.withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build())
{
+ for (GenericRecord record : records) {
+ writer.write(record);
+ }
+ }
+ }
+
public static TypedProperties setupSchemaOnDFS() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc",
dfs, dfsBasePath + "/source.avsc");
TypedProperties props = new TypedProperties();
@@ -185,6 +202,24 @@ public class UtilitiesTestBase {
return props;
}
+ public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord,
HoodieTestDataGenerator dataGenerator) {
+ try {
+ Option<IndexedRecord> recordOpt =
hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
+ return (GenericRecord) recordOpt.get();
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ public static List<GenericRecord> toGenericRecords(List<HoodieRecord>
hoodieRecords,
+ HoodieTestDataGenerator dataGenerator) {
+ List<GenericRecord> records = new ArrayList<GenericRecord>();
+ for (HoodieRecord hoodieRecord : hoodieRecords) {
+ records.add(toGenericRecord(hoodieRecord, dataGenerator));
+ }
+ return records;
+ }
+
public static String toJsonString(HoodieRecord hr) {
try {
return ((TestRawTripPayload) hr.getData()).getJsonData();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
index 9ee3285..4d4fafb 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
@@ -19,10 +19,15 @@
package org.apache.hudi.utilities.sources;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.List;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
@@ -41,7 +46,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
/**
- * Basic tests against all subclasses of {@link JsonDFSSource}
+ * Basic tests against all subclasses of {@link JsonDFSSource} and {@link
ParquetDFSSource}
*/
public class TestDFSSource extends UtilitiesTestBase {
@@ -82,11 +87,17 @@ public class TestDFSSource extends UtilitiesTestBase {
assertEquals(Option.empty(),
jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000",
100)), dfs,
dfsBasePath + "/jsonFiles/1.json");
- assertEquals(Option.empty(),
jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
- InputBatch<JavaRDD<GenericRecord>> fetch1 =
jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
+ // Test respecting sourceLimit
+ int sourceLimit = 10;
+ RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new
Path(dfsBasePath + "/jsonFiles/1.json"), true);
+ FileStatus file1Status = files.next();
+ assertTrue(file1Status.getLen() > sourceLimit);
+ assertEquals(Option.empty(),
jsonSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
+ // Test json -> Avro
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1.getBatch().get().count());
// Test json -> Row format
- InputBatch<Dataset<Row>> fetch1AsRows =
jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
+ InputBatch<Dataset<Row>> fetch1AsRows =
jsonSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1AsRows.getBatch().get().count());
// Test Avro -> Row format
Dataset<Row> fetch1Rows =
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
@@ -113,5 +124,69 @@ public class TestDFSSource extends UtilitiesTestBase {
InputBatch<JavaRDD<GenericRecord>> fetch4 =
jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()),
Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
+
+ // 5. Extract from the beginning
+ InputBatch<JavaRDD<GenericRecord>> fetch5 =
jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(10100, fetch5.getBatch().get().count());
+ }
+
+ @Test
+ public void testParquetDFSSource() throws IOException {
+ dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles"));
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath +
"/parquetFiles");
+ ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc,
sparkSession, schemaProvider);
+ SourceFormatAdapter parquetSource = new
SourceFormatAdapter(parquetDFSSource);
+
+ // 1. Extract without any checkpoint => get all the data, respecting
sourceLimit
+ assertEquals(Option.empty(),
parquetSource.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch());
+ List<GenericRecord> batch1 =
Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100),
dataGenerator);
+ Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet");
+ Helpers.saveParquetToDFS(batch1, file1);
+ // Test respecting sourceLimit
+ int sourceLimit = 10;
+ RemoteIterator<LocatedFileStatus> files = dfs.listFiles(file1, true);
+ FileStatus file1Status = files.next();
+ assertTrue(file1Status.getLen() > sourceLimit);
+ assertEquals(Option.empty(),
parquetSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
+ // Test parquet -> Avro
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(100, fetch1.getBatch().get().count());
+ // Test parquet -> Row
+ InputBatch<Dataset<Row>> fetch1AsRows =
parquetSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(100, fetch1AsRows.getBatch().get().count());
+
+ // 2. Produce new data, extract new data
+ List<GenericRecord> batch2 =
Helpers.toGenericRecords(dataGenerator.generateInserts("001", 10000),
dataGenerator);
+ Path file2 = new Path(dfsBasePath + "/parquetFiles", "2.parquet");
+ Helpers.saveParquetToDFS(batch2, file2);
+ // Test parquet -> Avro
+ InputBatch<JavaRDD<GenericRecord>> fetch2 =
+
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()),
Long.MAX_VALUE);
+ assertEquals(10000, fetch2.getBatch().get().count());
+ // Test parquet -> Row
+ InputBatch<Dataset<Row>> fetch2AsRows =
+
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()),
Long.MAX_VALUE);
+ assertEquals(10000, fetch2AsRows.getBatch().get().count());
+
+ // 3. Extract with previous checkpoint => gives same data back (idempotent)
+ InputBatch<Dataset<Row>> fetch3AsRows =
+
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()),
Long.MAX_VALUE);
+ assertEquals(10000, fetch3AsRows.getBatch().get().count());
+ assertEquals(fetch2AsRows.getCheckpointForNextBatch(),
fetch3AsRows.getCheckpointForNextBatch());
+ fetch3AsRows.getBatch().get().registerTempTable("test_dfs_table");
+ Dataset<Row> rowDataset = new SQLContext(jsc.sc()).sql("select * from
test_dfs_table");
+ assertEquals(10000, rowDataset.count());
+
+ // 4. Extract with latest checkpoint => no new data returned
+ InputBatch<JavaRDD<GenericRecord>> fetch4 =
+
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()),
Long.MAX_VALUE);
+ assertEquals(Option.empty(), fetch4.getBatch());
+
+ // 5. Extract from the beginning
+ InputBatch<JavaRDD<GenericRecord>> fetch5 =
parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ assertEquals(10100, fetch5.getBatch().get().count());
}
}