yihua commented on code in PR #13503:
URL: https://github.com/apache/hudi/pull/13503#discussion_r2244114437
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala:
##########
@@ -297,61 +298,120 @@ class TestStreamingSource extends StreamTest {
})
}
- test("Test checkpoint translation") {
+ private def testCheckpointTranslation(tableName: String,
+ tableType: HoodieTableType,
+ writeTableVersion: HoodieTableVersion,
+ streamingReadVersions: List[Int]):
Unit = {
Review Comment:
```suggestion
tableType: HoodieTableType,
writeTableVersion:
HoodieTableVersion,
streamingReadVersions: List[Int]):
Unit = {
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala:
##########
@@ -297,61 +298,120 @@ class TestStreamingSource extends StreamTest {
})
}
- test("Test checkpoint translation") {
+ private def testCheckpointTranslation(tableName: String,
+ tableType: HoodieTableType,
+ writeTableVersion: HoodieTableVersion,
+ streamingReadVersions: List[Int]):
Unit = {
withTempDir { inputDir =>
- val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream_ckpt"
+ val tablePath = s"${inputDir.getCanonicalPath}/$tableName"
val metaClient = HoodieTableMetaClient.newTableBuilder()
- .setTableType(COPY_ON_WRITE)
+ .setTableType(tableType)
.setTableName(getTableName(tablePath))
+ .setTableVersion(writeTableVersion)
.setRecordKeyFields("id")
.setPreCombineFields("ts")
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()),
tablePath)
- addData(tablePath, Seq(("1", "a1", "10", "000")))
- addData(tablePath, Seq(("2", "a1", "11", "001")))
- addData(tablePath, Seq(("3", "a1", "12", "002")))
+ // Add initial data
+ addData(tablePath, Seq(("1", "a1", "10", "000")), tableVersion =
writeTableVersion)
+ addData(tablePath, Seq(("2", "a1", "11", "001")), tableVersion =
writeTableVersion)
+ addData(tablePath, Seq(("3", "a1", "12", "002")), tableVersion =
writeTableVersion)
+
+ // Add update for MOR tests
+ if (tableType == MERGE_ON_READ) {
+ addData(tablePath, Seq(("2", "a2_updated", "16", "003")), tableVersion
= writeTableVersion)
+ }
val instants =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.getInstants
- assertEquals(3, instants.size())
+ val expectedInstantCount = if (tableType == MERGE_ON_READ) 4 else 3
+ assertEquals(expectedInstantCount, instants.size())
+
+ val startTimestampIndex = if (tableType == MERGE_ON_READ) 2 else 1
+ val startTimestamp = instants.get(startTimestampIndex).requestedTime
- // If the request time is used, i.e., V1, then the second record is
included in the output.
- // Otherwise, only third record in the output.
- val startTimestamp = instants.get(1).requestedTime
- for (streamingReadTableVersion <-
List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.EIGHT.versionCode())) {
+ for (streamingReadTableVersion <- streamingReadVersions) {
val df = spark.readStream
.format("org.apache.hudi")
.option(START_OFFSET.key, startTimestamp)
- .option(WRITE_TABLE_VERSION.key,
HoodieTableVersion.current().versionCode().toString)
+ .option(WRITE_TABLE_VERSION.key,
writeTableVersion.versionCode().toString)
.option(STREAMING_READ_TABLE_VERSION.key,
streamingReadTableVersion.toString)
.load(tablePath)
.select("id", "name", "price", "ts")
- val expectedRows = if (streamingReadTableVersion ==
HoodieTableVersion.EIGHT.versionCode()) {
- Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+
+ val expectedRows = if (tableType == MERGE_ON_READ) {
+ if (streamingReadTableVersion ==
HoodieTableVersion.current().versionCode()) {
+ Seq(Row("3", "a1", "12", "002"), Row("2", "a2_updated", "16",
"003"))
+ } else {
+ Seq(Row("2", "a2_updated", "16", "003"))
+ }
} else {
- Seq(Row("3", "a1", "12", "002"))
+ if (streamingReadTableVersion ==
HoodieTableVersion.current().versionCode()) {
+ Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+ } else {
+ Seq(Row("3", "a1", "12", "002"))
+ }
}
+
testStream(df)(
AssertOnQuery { q => q.processAllAvailable(); true },
- // Start after the first commit
CheckAnswerRows(expectedRows, lastOnly = true, isSorted = false)
)
}
}
}
+ test("Test checkpoint translation") {
+ testCheckpointTranslation(
+ "test_cow_stream_ckpt",
+ COPY_ON_WRITE,
+ HoodieTableVersion.current(),
+ List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.current().versionCode())
+ )
+ }
+
+ test("Test checkpoint translation mor") {
Review Comment:
```suggestion
test("Test checkpoint translation on MOR table") {
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala:
##########
@@ -297,61 +298,120 @@ class TestStreamingSource extends StreamTest {
})
}
- test("Test checkpoint translation") {
+ private def testCheckpointTranslation(tableName: String,
+ tableType: HoodieTableType,
+ writeTableVersion: HoodieTableVersion,
+ streamingReadVersions: List[Int]):
Unit = {
withTempDir { inputDir =>
- val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream_ckpt"
+ val tablePath = s"${inputDir.getCanonicalPath}/$tableName"
val metaClient = HoodieTableMetaClient.newTableBuilder()
- .setTableType(COPY_ON_WRITE)
+ .setTableType(tableType)
.setTableName(getTableName(tablePath))
+ .setTableVersion(writeTableVersion)
.setRecordKeyFields("id")
.setPreCombineFields("ts")
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()),
tablePath)
- addData(tablePath, Seq(("1", "a1", "10", "000")))
- addData(tablePath, Seq(("2", "a1", "11", "001")))
- addData(tablePath, Seq(("3", "a1", "12", "002")))
+ // Add initial data
+ addData(tablePath, Seq(("1", "a1", "10", "000")), tableVersion =
writeTableVersion)
+ addData(tablePath, Seq(("2", "a1", "11", "001")), tableVersion =
writeTableVersion)
+ addData(tablePath, Seq(("3", "a1", "12", "002")), tableVersion =
writeTableVersion)
+
+ // Add update for MOR tests
+ if (tableType == MERGE_ON_READ) {
+ addData(tablePath, Seq(("2", "a2_updated", "16", "003")), tableVersion
= writeTableVersion)
+ }
val instants =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.getInstants
- assertEquals(3, instants.size())
+ val expectedInstantCount = if (tableType == MERGE_ON_READ) 4 else 3
+ assertEquals(expectedInstantCount, instants.size())
+
+ val startTimestampIndex = if (tableType == MERGE_ON_READ) 2 else 1
+ val startTimestamp = instants.get(startTimestampIndex).requestedTime
- // If the request time is used, i.e., V1, then the second record is
included in the output.
- // Otherwise, only third record in the output.
- val startTimestamp = instants.get(1).requestedTime
- for (streamingReadTableVersion <-
List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.EIGHT.versionCode())) {
+ for (streamingReadTableVersion <- streamingReadVersions) {
val df = spark.readStream
.format("org.apache.hudi")
.option(START_OFFSET.key, startTimestamp)
- .option(WRITE_TABLE_VERSION.key,
HoodieTableVersion.current().versionCode().toString)
+ .option(WRITE_TABLE_VERSION.key,
writeTableVersion.versionCode().toString)
.option(STREAMING_READ_TABLE_VERSION.key,
streamingReadTableVersion.toString)
.load(tablePath)
.select("id", "name", "price", "ts")
- val expectedRows = if (streamingReadTableVersion ==
HoodieTableVersion.EIGHT.versionCode()) {
- Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+
+ val expectedRows = if (tableType == MERGE_ON_READ) {
+ if (streamingReadTableVersion ==
HoodieTableVersion.current().versionCode()) {
+ Seq(Row("3", "a1", "12", "002"), Row("2", "a2_updated", "16",
"003"))
+ } else {
+ Seq(Row("2", "a2_updated", "16", "003"))
+ }
} else {
- Seq(Row("3", "a1", "12", "002"))
+ if (streamingReadTableVersion ==
HoodieTableVersion.current().versionCode()) {
+ Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+ } else {
+ Seq(Row("3", "a1", "12", "002"))
+ }
}
+
testStream(df)(
AssertOnQuery { q => q.processAllAvailable(); true },
- // Start after the first commit
CheckAnswerRows(expectedRows, lastOnly = true, isSorted = false)
)
}
}
}
+ test("Test checkpoint translation") {
+ testCheckpointTranslation(
+ "test_cow_stream_ckpt",
+ COPY_ON_WRITE,
+ HoodieTableVersion.current(),
+ List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.current().versionCode())
+ )
+ }
+
+ test("Test checkpoint translation mor") {
+ testCheckpointTranslation(
+ "test_mor_stream_ckpt",
+ MERGE_ON_READ,
+ HoodieTableVersion.current(),
+ List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.current().versionCode())
+ )
+ }
+
+ test("Test checkpoint translation v6 base") {
+ testCheckpointTranslation(
+ "test_cow_stream_ckpt_v6",
+ COPY_ON_WRITE,
+ HoodieTableVersion.SIX,
+ List(HoodieTableVersion.SIX.versionCode())
+ )
+ }
+
+ test("Test checkpoint translation v6 base mor") {
Review Comment:
```suggestion
test("Test checkpoint translation on MOR table with table version 6") {
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala:
##########
@@ -297,61 +298,120 @@ class TestStreamingSource extends StreamTest {
})
}
- test("Test checkpoint translation") {
+ private def testCheckpointTranslation(tableName: String,
+ tableType: HoodieTableType,
+ writeTableVersion: HoodieTableVersion,
+ streamingReadVersions: List[Int]):
Unit = {
withTempDir { inputDir =>
- val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream_ckpt"
+ val tablePath = s"${inputDir.getCanonicalPath}/$tableName"
val metaClient = HoodieTableMetaClient.newTableBuilder()
- .setTableType(COPY_ON_WRITE)
+ .setTableType(tableType)
.setTableName(getTableName(tablePath))
+ .setTableVersion(writeTableVersion)
.setRecordKeyFields("id")
.setPreCombineFields("ts")
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()),
tablePath)
- addData(tablePath, Seq(("1", "a1", "10", "000")))
- addData(tablePath, Seq(("2", "a1", "11", "001")))
- addData(tablePath, Seq(("3", "a1", "12", "002")))
+ // Add initial data
+ addData(tablePath, Seq(("1", "a1", "10", "000")), tableVersion =
writeTableVersion)
+ addData(tablePath, Seq(("2", "a1", "11", "001")), tableVersion =
writeTableVersion)
+ addData(tablePath, Seq(("3", "a1", "12", "002")), tableVersion =
writeTableVersion)
+
+ // Add update for MOR tests
+ if (tableType == MERGE_ON_READ) {
+ addData(tablePath, Seq(("2", "a2_updated", "16", "003")), tableVersion
= writeTableVersion)
+ }
val instants =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.getInstants
- assertEquals(3, instants.size())
+ val expectedInstantCount = if (tableType == MERGE_ON_READ) 4 else 3
+ assertEquals(expectedInstantCount, instants.size())
+
+ val startTimestampIndex = if (tableType == MERGE_ON_READ) 2 else 1
+ val startTimestamp = instants.get(startTimestampIndex).requestedTime
- // If the request time is used, i.e., V1, then the second record is
included in the output.
- // Otherwise, only third record in the output.
- val startTimestamp = instants.get(1).requestedTime
- for (streamingReadTableVersion <-
List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.EIGHT.versionCode())) {
+ for (streamingReadTableVersion <- streamingReadVersions) {
val df = spark.readStream
.format("org.apache.hudi")
.option(START_OFFSET.key, startTimestamp)
- .option(WRITE_TABLE_VERSION.key,
HoodieTableVersion.current().versionCode().toString)
+ .option(WRITE_TABLE_VERSION.key,
writeTableVersion.versionCode().toString)
.option(STREAMING_READ_TABLE_VERSION.key,
streamingReadTableVersion.toString)
.load(tablePath)
.select("id", "name", "price", "ts")
- val expectedRows = if (streamingReadTableVersion ==
HoodieTableVersion.EIGHT.versionCode()) {
- Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+
+ val expectedRows = if (tableType == MERGE_ON_READ) {
+ if (streamingReadTableVersion ==
HoodieTableVersion.current().versionCode()) {
+ Seq(Row("3", "a1", "12", "002"), Row("2", "a2_updated", "16",
"003"))
+ } else {
+ Seq(Row("2", "a2_updated", "16", "003"))
+ }
} else {
- Seq(Row("3", "a1", "12", "002"))
+ if (streamingReadTableVersion ==
HoodieTableVersion.current().versionCode()) {
+ Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+ } else {
+ Seq(Row("3", "a1", "12", "002"))
+ }
}
+
testStream(df)(
AssertOnQuery { q => q.processAllAvailable(); true },
- // Start after the first commit
CheckAnswerRows(expectedRows, lastOnly = true, isSorted = false)
)
}
}
}
+ test("Test checkpoint translation") {
+ testCheckpointTranslation(
+ "test_cow_stream_ckpt",
+ COPY_ON_WRITE,
+ HoodieTableVersion.current(),
+ List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.current().versionCode())
+ )
+ }
+
+ test("Test checkpoint translation mor") {
+ testCheckpointTranslation(
+ "test_mor_stream_ckpt",
+ MERGE_ON_READ,
+ HoodieTableVersion.current(),
+ List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.current().versionCode())
+ )
+ }
+
+ test("Test checkpoint translation v6 base") {
Review Comment:
```suggestion
test("Test checkpoint translation on COW table with table version 6") {
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala:
##########
@@ -297,61 +298,120 @@ class TestStreamingSource extends StreamTest {
})
}
- test("Test checkpoint translation") {
+ private def testCheckpointTranslation(tableName: String,
+ tableType: HoodieTableType,
+ writeTableVersion: HoodieTableVersion,
+ streamingReadVersions: List[Int]):
Unit = {
withTempDir { inputDir =>
- val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream_ckpt"
+ val tablePath = s"${inputDir.getCanonicalPath}/$tableName"
val metaClient = HoodieTableMetaClient.newTableBuilder()
- .setTableType(COPY_ON_WRITE)
+ .setTableType(tableType)
.setTableName(getTableName(tablePath))
+ .setTableVersion(writeTableVersion)
.setRecordKeyFields("id")
.setPreCombineFields("ts")
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()),
tablePath)
- addData(tablePath, Seq(("1", "a1", "10", "000")))
- addData(tablePath, Seq(("2", "a1", "11", "001")))
- addData(tablePath, Seq(("3", "a1", "12", "002")))
+ // Add initial data
+ addData(tablePath, Seq(("1", "a1", "10", "000")), tableVersion =
writeTableVersion)
+ addData(tablePath, Seq(("2", "a1", "11", "001")), tableVersion =
writeTableVersion)
+ addData(tablePath, Seq(("3", "a1", "12", "002")), tableVersion =
writeTableVersion)
+
+ // Add update for MOR tests
+ if (tableType == MERGE_ON_READ) {
+ addData(tablePath, Seq(("2", "a2_updated", "16", "003")), tableVersion
= writeTableVersion)
+ }
val instants =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.getInstants
- assertEquals(3, instants.size())
+ val expectedInstantCount = if (tableType == MERGE_ON_READ) 4 else 3
+ assertEquals(expectedInstantCount, instants.size())
+
+ val startTimestampIndex = if (tableType == MERGE_ON_READ) 2 else 1
+ val startTimestamp = instants.get(startTimestampIndex).requestedTime
- // If the request time is used, i.e., V1, then the second record is
included in the output.
- // Otherwise, only third record in the output.
- val startTimestamp = instants.get(1).requestedTime
- for (streamingReadTableVersion <-
List(HoodieTableVersion.SIX.versionCode(),
HoodieTableVersion.EIGHT.versionCode())) {
+ for (streamingReadTableVersion <- streamingReadVersions) {
val df = spark.readStream
.format("org.apache.hudi")
.option(START_OFFSET.key, startTimestamp)
- .option(WRITE_TABLE_VERSION.key,
HoodieTableVersion.current().versionCode().toString)
+ .option(WRITE_TABLE_VERSION.key,
writeTableVersion.versionCode().toString)
.option(STREAMING_READ_TABLE_VERSION.key,
streamingReadTableVersion.toString)
.load(tablePath)
.select("id", "name", "price", "ts")
- val expectedRows = if (streamingReadTableVersion ==
HoodieTableVersion.EIGHT.versionCode()) {
- Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+
+ val expectedRows = if (tableType == MERGE_ON_READ) {
+ if (streamingReadTableVersion ==
HoodieTableVersion.current().versionCode()) {
+ Seq(Row("3", "a1", "12", "002"), Row("2", "a2_updated", "16",
"003"))
+ } else {
+ Seq(Row("2", "a2_updated", "16", "003"))
+ }
} else {
- Seq(Row("3", "a1", "12", "002"))
+ if (streamingReadTableVersion ==
HoodieTableVersion.current().versionCode()) {
+ Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002"))
+ } else {
+ Seq(Row("3", "a1", "12", "002"))
+ }
}
+
testStream(df)(
AssertOnQuery { q => q.processAllAvailable(); true },
- // Start after the first commit
CheckAnswerRows(expectedRows, lastOnly = true, isSorted = false)
)
}
}
}
+ test("Test checkpoint translation") {
Review Comment:
```suggestion
test("Test checkpoint translation on COW table") {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]