This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6c27fb60d31 [HUDI-7503] Not allowing start time earlier than active
timeline with CDC queries (#12391)
6c27fb60d31 is described below
commit 6c27fb60d3168cbf9b68fa0ab1593075efd5e416
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Dec 2 02:59:13 2024 -0800
[HUDI-7503] Not allowing start time earlier than active timeline with CDC
queries (#12391)
* Not allowing start time earlier than active timeline with CDC queries
* Fixing tests
---
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 6 ++++++
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 11 ++++++++++-
.../main/scala/org/apache/hudi/cdc/CDCRelation.scala | 7 ++-----
.../hudi/functional/cdc/TestCDCDataFrameSuite.scala | 20 ++++++++++++--------
4 files changed, 30 insertions(+), 14 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 77ad0ed3fab..979af5dee95 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
@@ -217,6 +218,11 @@ public class HoodieCDCExtractor {
try {
Set<String> requiredActions = new HashSet<>(Arrays.asList(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, CLUSTERING_ACTION));
HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline();
+ if (instantRange.getStartInstant().isPresent() &&
!metaClient.getArchivedTimeline().empty()
+ &&
InstantComparison.compareTimestamps(metaClient.getArchivedTimeline().lastInstant().get().requestedTime(),
InstantComparison.GREATER_THAN, instantRange.getStartInstant().get())) {
+ throw new HoodieException("Start instant time " +
instantRange.getStartInstant().get()
+ + " for CDC query has to be in the active timeline. Beginning of
active timeline " + activeTimeLine.firstInstant().get().requestedTime());
+ }
this.commits = activeTimeLine.getInstantsAsStream()
.filter(instant ->
instant.isCompleted()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 63b3d336217..c9516ff249e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -558,5 +558,14 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
curAvroRecord, avroSchema, payloadProps).get()
}
- override def close(): Unit = {}
+ override def close(): Unit = {
+ recordIter = Iterator.empty
+ logRecordIter = Iterator.empty
+ beforeImageRecords.clear()
+ afterImageRecords.clear()
+ if (cdcLogRecordIterator != null) {
+ cdcLogRecordIterator.close()
+ cdcLogRecordIterator = null
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index f4ec2b916eb..14cfae37b91 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -26,12 +26,12 @@ import
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
import org.apache.hudi.common.table.log.InstantRange
import org.apache.hudi.common.table.log.InstantRange.RangeType
+import org.apache.hudi.common.table.timeline.{HoodieTimeline,
InstantComparison}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.internal.schema.InternalSchema
-
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -202,9 +202,6 @@ object CDCRelation {
val endCompletionTime =
options.getOrElse(DataSourceReadOptions.END_COMMIT.key(),
getTimestampOfLatestInstant(metaClient)
)
- if (startCompletionTime > endCompletionTime) {
- throw new HoodieException(s"This is not a valid range between
$startCompletionTime and $endCompletionTime")
- }
new CDCRelation(sqlContext, metaClient, startCompletionTime,
endCompletionTime, options, rangeType)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index e62b48a1c2f..7a481e60cb5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -29,11 +29,12 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings,
recordsToStrings}
import org.apache.hudi.config.HoodieWriteConfig
-
import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
+import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
@@ -205,7 +206,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
totalInsertedCnt = 60 + insertedCnt7
totalUpdatedCnt = updatedCnt7
totalDeletedCnt = 0
- allVisibleCDCData = cdcDataFrame((commitTime1.toLong - 1).toString)
+ allVisibleCDCData = cdcDataFrame((commitTime4.toLong - 1).toString)
assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt,
totalDeletedCnt)
// Bulk_Insert Operation With Clean Operation
@@ -225,10 +226,14 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
val cdcDataOnly8 = cdcDataFrame((commitTime8.toLong - 1).toString)
assertCDCOpCnt(cdcDataOnly8, 20, 0, 0)
totalInsertedCnt += 20
- allVisibleCDCData = cdcDataFrame((commitTime1.toLong - 1).toString)
+ allVisibleCDCData = cdcDataFrame((commitTime4.toLong - 1).toString)
assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt,
totalDeletedCnt)
- }
+ // test start commit time in archived timeline. cdc query should fail
+ assertThrows(classOf[HoodieException], () => {
+ cdcDataFrame((commitTime1.toLong - 1).toString)
+ })
+ }
/**
* Step1: Insert 100
@@ -632,9 +637,8 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0)
}
- @ParameterizedTest
- @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
- def testCDCWithAWSDMSPayload(loggingMode: HoodieCDCSupplementalLoggingMode):
Unit = {
+ @Test
+ def testCDCWithAWSDMSPayload(): Unit = {
val options = Map(
"hoodie.table.name" -> "test",
"hoodie.datasource.write.recordkey.field" -> "id",