This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ed745df  [HUDI-40] Add parquet support for the Delta Streamer (#949)
ed745df is described below

commit ed745dfdbf254bfc2ec6d9c7baed8ccbf571abab
Author: YanJia-Gary-Li <[email protected]>
AuthorDate: Wed Oct 16 21:11:59 2019 -0700

    [HUDI-40] Add parquet support for the Delta Streamer (#949)
---
 .../deltastreamer/SourceFormatAdapter.java         | 15 ++++
 .../hudi/utilities/sources/ParquetDFSSource.java   | 60 ++++++++++++++++
 .../hudi/utilities/sources/ParquetSource.java      | 34 +++++++++
 .../org/apache/hudi/utilities/sources/Source.java  |  2 +-
 .../apache/hudi/utilities/UtilitiesTestBase.java   | 35 +++++++++
 .../hudi/utilities/sources/TestDFSSource.java      | 83 ++++++++++++++++++++--
 6 files changed, 224 insertions(+), 5 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index 4df948e..e44ba53 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.sources.AvroSource;
 import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.JsonSource;
+import org.apache.hudi.utilities.sources.ParquetSource;
 import org.apache.hudi.utilities.sources.RowSource;
 import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -59,6 +60,8 @@ public final class SourceFormatAdapter {
     switch (source.getSourceType()) {
       case AVRO:
         return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
+      case PARQUET:
+        return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((JsonSource) 
source).fetchNext(lastCkptStr, sourceLimit);
         AvroConvertor convertor = new 
AvroConvertor(r.getSchemaProvider().getSourceSchema());
@@ -99,6 +102,18 @@ public final class SourceFormatAdapter {
                         .orElse(null)),
             r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
+      case PARQUET: {
+        InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource) 
source).fetchNext(lastCkptStr, sourceLimit);
+        Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
+        return new InputBatch<>(
+            Option
+                .ofNullable(
+                    r.getBatch()
+                        .map(rdd -> 
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
+                            source.getSparkSession()))
+                        .orElse(null)),
+            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+      }
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((JsonSource) 
source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
new file mode 100644
index 0000000..22ac3f9
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+import org.apache.parquet.avro.AvroParquetInputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * DFS Source that reads parquet data
+ */
+public class ParquetDFSSource extends ParquetSource {
+
+  private final DFSPathSelector pathSelector;
+
+  public ParquetDFSSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+    this.pathSelector = new DFSPathSelector(props, 
this.sparkContext.hadoopConfiguration());
+  }
+
+  @Override
+  protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
+    Pair<Option<String>, String> selectPathsWithMaxModificationTime =
+        pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, 
sourceLimit);
+    return selectPathsWithMaxModificationTime.getLeft()
+        .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), 
selectPathsWithMaxModificationTime.getRight()))
+        .orElseGet(() -> new InputBatch<>(Option.empty(), 
selectPathsWithMaxModificationTime.getRight()));
+  }
+
+  private JavaRDD<GenericRecord> fromFiles(String pathStr) {
+    JavaPairRDD<Void, GenericRecord> avroRDD = 
sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class,
+        Void.class, GenericRecord.class, sparkContext.hadoopConfiguration());
+    return avroRDD.values();
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
new file mode 100644
index 0000000..edcc688
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> {
+
+  public ParquetSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider, 
SourceType.PARQUET);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index ea57f4b..0ed1e6c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -34,7 +34,7 @@ public abstract class Source<T> implements Serializable {
   protected static volatile Logger log = LogManager.getLogger(Source.class);
 
   public enum SourceType {
-    JSON, AVRO, ROW
+    JSON, AVRO, ROW, PARQUET
   }
 
   protected transient TypedProperties props;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
index 2125483..46b0dab 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
@@ -23,23 +23,31 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.server.HiveServer2;
+import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload;
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.TypedProperties;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveClient;
 import org.apache.hudi.hive.util.HiveTestService;
 import org.apache.hudi.utilities.sources.TestDataSource;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
@@ -178,6 +186,15 @@ public class UtilitiesTestBase {
       os.close();
     }
 
+    public static void saveParquetToDFS(List<GenericRecord> records, Path 
targetFile) throws IOException {
+      try (ParquetWriter<GenericRecord> writer = 
AvroParquetWriter.<GenericRecord>builder(targetFile)
+          
.withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build())
 {
+        for (GenericRecord record : records) {
+          writer.write(record);
+        }
+      }
+    }
+
     public static TypedProperties setupSchemaOnDFS() throws IOException {
       UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
       TypedProperties props = new TypedProperties();
@@ -185,6 +202,24 @@ public class UtilitiesTestBase {
       return props;
     }
 
+    public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, 
HoodieTestDataGenerator dataGenerator) {
+      try {
+        Option<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
+        return (GenericRecord) recordOpt.get();
+      } catch (IOException e) {
+        return null;
+      }
+    }
+
+    public static List<GenericRecord> toGenericRecords(List<HoodieRecord> 
hoodieRecords,
+        HoodieTestDataGenerator dataGenerator) {
+      List<GenericRecord> records = new ArrayList<GenericRecord>();
+      for (HoodieRecord hoodieRecord : hoodieRecords) {
+        records.add(toGenericRecord(hoodieRecord, dataGenerator));
+      }
+      return records;
+    }
+
     public static String toJsonString(HoodieRecord hr) {
       try {
         return ((TestRawTripPayload) hr.getData()).getJsonData();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
index 9ee3285..4d4fafb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
@@ -19,10 +19,15 @@
 package org.apache.hudi.utilities.sources;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.List;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
@@ -41,7 +46,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
- * Basic tests against all subclasses of {@link JsonDFSSource}
+ * Basic tests against all subclasses of {@link JsonDFSSource} and {@link 
ParquetDFSSource}
  */
 public class TestDFSSource extends UtilitiesTestBase {
 
@@ -82,11 +87,17 @@ public class TestDFSSource extends UtilitiesTestBase {
     assertEquals(Option.empty(), 
jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
     
UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000",
 100)), dfs,
         dfsBasePath + "/jsonFiles/1.json");
-    assertEquals(Option.empty(), 
jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
-    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
+    // Test respecting sourceLimit
+    int sourceLimit = 10;
+    RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(dfsBasePath + "/jsonFiles/1.json"), true);
+    FileStatus file1Status = files.next();
+    assertTrue(file1Status.getLen() > sourceLimit);
+    assertEquals(Option.empty(), 
jsonSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
+    // Test json -> Avro
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
     assertEquals(100, fetch1.getBatch().get().count());
     // Test json -> Row format
-    InputBatch<Dataset<Row>> fetch1AsRows = 
jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
+    InputBatch<Dataset<Row>> fetch1AsRows = 
jsonSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
     assertEquals(100, fetch1AsRows.getBatch().get().count());
     // Test Avro -> Row format
     Dataset<Row> fetch1Rows = 
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
@@ -113,5 +124,69 @@ public class TestDFSSource extends UtilitiesTestBase {
     InputBatch<JavaRDD<GenericRecord>> fetch4 =
         
jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
     assertEquals(Option.empty(), fetch4.getBatch());
+
+    // 5. Extract from the beginning
+    InputBatch<JavaRDD<GenericRecord>> fetch5 = 
jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+    assertEquals(10100, fetch5.getBatch().get().count());
+  }
+
+  @Test
+  public void testParquetDFSSource() throws IOException {
+    dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles"));
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + 
"/parquetFiles");
+    ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc, 
sparkSession, schemaProvider);
+    SourceFormatAdapter parquetSource = new 
SourceFormatAdapter(parquetDFSSource);
+
+    // 1. Extract without any checkpoint => get all the data, respecting 
sourceLimit
+    assertEquals(Option.empty(), 
parquetSource.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch());
+    List<GenericRecord> batch1 = 
Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100), 
dataGenerator);
+    Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet");
+    Helpers.saveParquetToDFS(batch1, file1);
+    // Test respecting sourceLimit
+    int sourceLimit = 10;
+    RemoteIterator<LocatedFileStatus> files = dfs.listFiles(file1, true);
+    FileStatus file1Status = files.next();
+    assertTrue(file1Status.getLen() > sourceLimit);
+    assertEquals(Option.empty(), 
parquetSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
+    // Test parquet -> Avro
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+    assertEquals(100, fetch1.getBatch().get().count());
+    // Test parquet -> Row
+    InputBatch<Dataset<Row>> fetch1AsRows = 
parquetSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
+    assertEquals(100, fetch1AsRows.getBatch().get().count());
+
+    // 2. Produce new data, extract new data
+    List<GenericRecord> batch2 = 
Helpers.toGenericRecords(dataGenerator.generateInserts("001", 10000), 
dataGenerator);
+    Path file2 = new Path(dfsBasePath + "/parquetFiles", "2.parquet");
+    Helpers.saveParquetToDFS(batch2, file2);
+    // Test parquet -> Avro
+    InputBatch<JavaRDD<GenericRecord>> fetch2 =
+        
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
+    assertEquals(10000, fetch2.getBatch().get().count());
+    // Test parquet -> Row
+    InputBatch<Dataset<Row>> fetch2AsRows =
+        
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
+    assertEquals(10000, fetch2AsRows.getBatch().get().count());
+
+    // 3. Extract with previous checkpoint => gives same data back (idempotent)
+    InputBatch<Dataset<Row>> fetch3AsRows =
+        
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
+    assertEquals(10000, fetch3AsRows.getBatch().get().count());
+    assertEquals(fetch2AsRows.getCheckpointForNextBatch(), 
fetch3AsRows.getCheckpointForNextBatch());
+    fetch3AsRows.getBatch().get().registerTempTable("test_dfs_table");
+    Dataset<Row> rowDataset = new SQLContext(jsc.sc()).sql("select * from 
test_dfs_table");
+    assertEquals(10000, rowDataset.count());
+
+    // 4. Extract with latest checkpoint => no new data returned
+    InputBatch<JavaRDD<GenericRecord>> fetch4 =
+        
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
+    assertEquals(Option.empty(), fetch4.getBatch());
+
+    // 5. Extract from the beginning
+    InputBatch<JavaRDD<GenericRecord>> fetch5 = 
parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+    assertEquals(10100, fetch5.getBatch().get().count());
   }
 }

Reply via email to