This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a64afdf HUDI-528 Handle empty commit in incremental pulling (#1612)
a64afdf is described below
commit a64afdfd17ac974e451bceb877f3d40a9c775253
Author: Gary Li <[email protected]>
AuthorDate: Thu May 14 22:55:25 2020 -0700
HUDI-528 Handle empty commit in incremental pulling (#1612)
---
.../org/apache/hudi/IncrementalRelation.scala | 29 +++++++++-------------
.../apache/hudi/functional/TestDataSource.scala | 8 ++++++
2 files changed, 20 insertions(+), 17 deletions(-)
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 8bb4609..436895b 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -19,9 +19,9 @@ package org.apache.hudi
import org.apache.hadoop.fs.GlobPattern
import org.apache.hadoop.fs.Path
+import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord,
HoodieTableType}
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.util.ParquetUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.table.HoodieTable
@@ -47,8 +47,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[IncrementalRelation])
- val fs = new
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val metaClient = new
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath,
true)
+ private val metaClient = new
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath,
true)
// MOR tables not supported yet
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
throw new HoodieException("Incremental view not implemented yet, for
merge-on-read tables")
@@ -56,7 +55,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
// TODO : Figure out a valid HoodieWriteConfig
private val hoodieTable = HoodieTable.create(metaClient,
HoodieWriteConfig.newBuilder().withPath(basePath).build(),
sqlContext.sparkContext.hadoopConfiguration)
- val commitTimeline =
hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
+ private val commitTimeline =
hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
}
@@ -65,25 +64,21 @@ class IncrementalRelation(val sqlContext: SQLContext,
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
}
- val lastInstant = commitTimeline.lastInstant().get()
+ private val lastInstant = commitTimeline.lastInstant().get()
- val commitsToReturn = commitTimeline.findInstantsInRange(
+ private val commitsToReturn = commitTimeline.findInstantsInRange(
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
lastInstant.getTimestamp))
.getInstants.iterator().toList
- // use schema from a file produced in the latest instant
- val latestSchema = {
- // use last instant if instant range is empty
- val instant = commitsToReturn.lastOption.getOrElse(lastInstant)
- val latestMeta = HoodieCommitMetadata
- .fromBytes(commitTimeline.getInstantDetails(instant).get,
classOf[HoodieCommitMetadata])
- val metaFilePath =
latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next()
-
AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema(
- sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath)))
+ // use schema from latest metadata, if not present, read schema from the
data file
+ private val latestSchema = {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ val tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
+ AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
}
- val filters = {
+ private val filters = {
if
(optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
val filterStr = optParams.getOrElse(
DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY,
diff --git
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
index fdd02bf..8352485 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
@@ -126,6 +126,14 @@ class TestDataSource {
assertEquals(1, countsPerCommit.length)
assertEquals(firstCommit, countsPerCommit(0).get(0))
+ // Upsert an empty dataFrame
+ val emptyRecords =
DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002",
0)).toList
+ val emptyDF: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
+ emptyDF.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
// pull the latest commit
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)