This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c27fb60d31 [HUDI-7503] Not allowing start time earlier than active 
timeline with CDC queries (#12391)
6c27fb60d31 is described below

commit 6c27fb60d3168cbf9b68fa0ab1593075efd5e416
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Dec 2 02:59:13 2024 -0800

    [HUDI-7503] Not allowing start time earlier than active timeline with CDC 
queries (#12391)
    
    * Not allowing start time earlier than active timeline with CDC queries
    
    * Fixing tests
---
 .../hudi/common/table/cdc/HoodieCDCExtractor.java    |  6 ++++++
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala   | 11 ++++++++++-
 .../main/scala/org/apache/hudi/cdc/CDCRelation.scala |  7 ++-----
 .../hudi/functional/cdc/TestCDCDataFrameSuite.scala  | 20 ++++++++++++--------
 4 files changed, 30 insertions(+), 14 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 77ad0ed3fab..979af5dee95 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.InstantComparison;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -217,6 +218,11 @@ public class HoodieCDCExtractor {
     try {
       Set<String> requiredActions = new HashSet<>(Arrays.asList(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, CLUSTERING_ACTION));
       HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline();
+      if (instantRange.getStartInstant().isPresent() && 
!metaClient.getArchivedTimeline().empty()
+          && 
InstantComparison.compareTimestamps(metaClient.getArchivedTimeline().lastInstant().get().requestedTime(),
 InstantComparison.GREATER_THAN, instantRange.getStartInstant().get())) {
+        throw new HoodieException("Start instant time " + 
instantRange.getStartInstant().get()
+            + " for CDC query has to be in the active timeline. Beginning of 
active timeline " + activeTimeLine.firstInstant().get().requestedTime());
+      }
       this.commits = activeTimeLine.getInstantsAsStream()
           .filter(instant ->
               instant.isCompleted()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 63b3d336217..c9516ff249e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -558,5 +558,14 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
       curAvroRecord, avroSchema, payloadProps).get()
   }
 
-  override def close(): Unit = {}
+  override def close(): Unit = {
+    recordIter = Iterator.empty
+    logRecordIter = Iterator.empty
+    beforeImageRecords.clear()
+    afterImageRecords.clear()
+    if (cdcLogRecordIterator != null) {
+      cdcLogRecordIterator.close()
+      cdcLogRecordIterator = null
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index f4ec2b916eb..14cfae37b91 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -26,12 +26,12 @@ import 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
 import org.apache.hudi.common.table.log.InstantRange
 import org.apache.hudi.common.table.log.InstantRange.RangeType
+import org.apache.hudi.common.table.timeline.{HoodieTimeline, 
InstantComparison}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.internal.schema.InternalSchema
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -202,9 +202,6 @@ object CDCRelation {
     val endCompletionTime = 
options.getOrElse(DataSourceReadOptions.END_COMMIT.key(),
       getTimestampOfLatestInstant(metaClient)
     )
-    if (startCompletionTime > endCompletionTime) {
-      throw new HoodieException(s"This is not a valid range between 
$startCompletionTime and $endCompletionTime")
-    }
 
     new CDCRelation(sqlContext, metaClient, startCompletionTime, 
endCompletionTime, options, rangeType)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index e62b48a1c2f..7a481e60cb5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -29,11 +29,12 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
 import org.apache.hudi.config.HoodieWriteConfig
-
 import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SaveMode}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
 
@@ -205,7 +206,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     totalInsertedCnt = 60 + insertedCnt7
     totalUpdatedCnt = updatedCnt7
     totalDeletedCnt = 0
-    allVisibleCDCData = cdcDataFrame((commitTime1.toLong - 1).toString)
+    allVisibleCDCData = cdcDataFrame((commitTime4.toLong - 1).toString)
     assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, 
totalDeletedCnt)
 
     // Bulk_Insert Operation With Clean Operation
@@ -225,10 +226,14 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     val cdcDataOnly8 = cdcDataFrame((commitTime8.toLong - 1).toString)
     assertCDCOpCnt(cdcDataOnly8, 20, 0, 0)
     totalInsertedCnt += 20
-    allVisibleCDCData = cdcDataFrame((commitTime1.toLong - 1).toString)
+    allVisibleCDCData = cdcDataFrame((commitTime4.toLong - 1).toString)
     assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, 
totalDeletedCnt)
-  }
 
+    // test start commit time in archived timeline. cdc query should fail
+    assertThrows(classOf[HoodieException], () => {
+      cdcDataFrame((commitTime1.toLong - 1).toString)
+    })
+  }
 
   /**
    * Step1: Insert 100
@@ -632,9 +637,8 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0)
   }
 
-  @ParameterizedTest
-  @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
-  def testCDCWithAWSDMSPayload(loggingMode: HoodieCDCSupplementalLoggingMode): 
Unit = {
+  @Test
+  def testCDCWithAWSDMSPayload(): Unit = {
     val options = Map(
       "hoodie.table.name" -> "test",
       "hoodie.datasource.write.recordkey.field" -> "id",

Reply via email to