vinothchandar commented on a change in pull request #2296:
URL: https://github.com/apache/hudi/pull/2296#discussion_r665832216
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -932,15 +932,10 @@ public void testFilterDupes() throws Exception {
ds2.sync();
mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
tableBasePath, true);
HoodieInstant newLastFinished =
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
-
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(),
HoodieTimeline.GREATER_THAN, lastFinished.getTimestamp()
+ // there is not new commit generate for empty commits
Review comment:
we should actually have this generate an empty commit and test. If we
don't then we checkpoints won't move.
Consider this scenario, when deltastreamer reads from kafka using a custom
transformer. If the transformer filters out all records from Kafka, we will
have empty input for write, but the kafka offsets have to move ahead.
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -348,4 +348,23 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
+
+ @Test def testWithEmptyInput(): Unit = {
+ val inputDF1 =
spark.read.json(spark.sparkContext.parallelize(Seq.empty[String], 1))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ // Empty commit does not has a new commit
Review comment:
empty input, you mean?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -173,6 +173,10 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds) {
+ // Skip the empty commit
+ if (stats.isEmpty()) {
Review comment:
lets control this using a new config?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]