linliu-code commented on code in PR #18224:
URL: https://github.com/apache/hudi/pull/18224#discussion_r2835540302
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3112,6 +3119,281 @@ public void testJsonKafkaDFSSource() throws Exception {
deltaStreamer.shutdownGracefully();
}
+ /**
+ * Tests DeltaStreamer ingestion from JsonKinesisSource using LocalStack.
+ */
+ @Nested
+
@org.junit.jupiter.api.TestInstance(org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS)
+ class TestKinesisSource {
+
+ private KinesisTestUtils kinesisTestUtils;
+
+ @BeforeAll
+ void initKinesis() {
+ kinesisTestUtils = new KinesisTestUtils().setup();
+ }
+
+ @AfterAll
+ void tearDownKinesis() {
+ if (kinesisTestUtils != null) {
+ kinesisTestUtils.teardown();
+ }
+ }
+
+ @Test
+ public void testJsonKinesisDFSSource() throws Exception {
+ String streamName = "test-kinesis-stream-" + testNum;
+ // Create stream with at least 2 shards to exercise multi-shard reading
+ prepareJsonKinesisDFSFiles(JSON_KINESIS_NUM_RECORDS, true, streamName);
+ prepareJsonKinesisDFSSource(PROPS_FILENAME_TEST_JSON_KINESIS,
streamName);
+ String tableBasePath = basePath + "/test_json_kinesis_table" + testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKinesisSource.class.getName(),
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KINESIS, false,
+ true, 100000, false, null, null, "timestamp", null), jsc);
+
+ // Track commit completion times for incremental queries
+ List<String> commitCompletionTimes = new ArrayList<>();
+
+ // Batch 1: initial ingestion
+ deltaStreamer.sync();
+ assertRecordCount(JSON_KINESIS_NUM_RECORDS, tableBasePath, sqlContext);
+ String commit1 = getLatestCommitInstantTime(tableBasePath);
+ commitCompletionTimes.add(getCommitCompletionTime(tableBasePath));
+ assertRecordsForCommit(tableBasePath, commit1, JSON_KINESIS_NUM_RECORDS);
+
+ int totalRecords = JSON_KINESIS_NUM_RECORDS;
+ // Batch 2
+ int batch2Records = 10;
+ totalRecords += batch2Records;
+ prepareJsonKinesisDFSFiles(batch2Records, false, streamName);
+ deltaStreamer.sync();
+ assertRecordCount(totalRecords, tableBasePath, sqlContext);
+ String commit2 = getLatestCommitInstantTime(tableBasePath);
+ commitCompletionTimes.add(getCommitCompletionTime(tableBasePath));
+ assertRecordsForCommit(tableBasePath, commit2, batch2Records);
+
+ // Batch 3
+ int batch3Records = 8;
+ totalRecords += batch3Records;
+ prepareJsonKinesisDFSFiles(batch3Records, false, streamName);
+ deltaStreamer.sync();
+ assertRecordCount(totalRecords, tableBasePath, sqlContext);
+ String commit3 = getLatestCommitInstantTime(tableBasePath);
+ commitCompletionTimes.add(getCommitCompletionTime(tableBasePath));
+ assertRecordsForCommit(tableBasePath, commit3, batch3Records);
+
+ // Batch 4
+ int batch4Records = 7;
+ totalRecords += batch4Records;
+ prepareJsonKinesisDFSFiles(batch4Records, false, streamName);
+ deltaStreamer.sync();
+ assertRecordCount(totalRecords, tableBasePath, sqlContext);
+ String commit4 = getLatestCommitInstantTime(tableBasePath);
+ commitCompletionTimes.add(getCommitCompletionTime(tableBasePath));
+ assertRecordsForCommit(tableBasePath, commit4, batch4Records);
+ deltaStreamer.shutdownGracefully();
+
+ // Incremental queries for a subset of commits
+ sqlContext.clearCache();
+ String c1 = commitCompletionTimes.get(0);
+ String c2 = commitCompletionTimes.get(1);
+ String c3 = commitCompletionTimes.get(2);
+ String c4 = commitCompletionTimes.get(3);
+
+ // Incremental: first commit only (batch 1)
+ Dataset<Row> incrBatch1 = sqlContext.read().format("org.apache.hudi")
+ .options(hudiOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+ .option(DataSourceReadOptions.START_COMMIT().key(), "000")
+ .option(DataSourceReadOptions.END_COMMIT().key(), c1)
+ .load(tableBasePath);
+ assertEquals(JSON_KINESIS_NUM_RECORDS, incrBatch1.count(), "Incremental
read for batch 1");
+
+ // Incremental: commits 2 and 3 (batches 2 + 3)
+ Dataset<Row> incrBatches2And3 =
sqlContext.read().format("org.apache.hudi")
+ .options(hudiOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+ .option(DataSourceReadOptions.START_COMMIT().key(), c1)
+ .option(DataSourceReadOptions.END_COMMIT().key(), c3)
+ .load(tableBasePath);
+ assertEquals(batch2Records + batch3Records, incrBatches2And3.count(),
"Incremental read for batches 2 and 3");
+
+ // Incremental: commits 3 and 4 (batches 3 + 4)
+ Dataset<Row> incrBatches3And4 =
sqlContext.read().format("org.apache.hudi")
+ .options(hudiOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+ .option(DataSourceReadOptions.START_COMMIT().key(), c2)
+ .option(DataSourceReadOptions.END_COMMIT().key(), c4)
+ .load(tableBasePath);
+ assertEquals(batch3Records + batch4Records, incrBatches3And4.count(),
"Incremental read for batches 3 and 4");
+
+ // Verify data correctness: read back and check _row_key exists
+ sqlContext.clearCache();
+ Dataset<Row> ds =
sqlContext.read().format("org.apache.hudi").load(tableBasePath);
+ assertEquals(totalRecords, ds.count());
+ assertTrue(ds.filter("_row_key is not null").count() > 0);
+ }
+
+ @Test
+ public void testJsonKinesisAggregatedRecords() throws Exception {
+ String streamName = "test-kinesis-stream-agg-" + testNum;
+ kinesisTestUtils.createStream(streamName, 2);
+ HoodieTestDataGenerator dataGenerator = new
HoodieTestDataGenerator(System.nanoTime());
+ String[] jsonRecords = UtilitiesTestBase.Helpers.jsonifyRecords(
+ dataGenerator.generateInsertsAsPerSchema("000", 6,
HoodieTestDataGenerator.TRIP_SCHEMA));
+ kinesisTestUtils.sendAggregatedRecords(streamName, jsonRecords);
+ prepareJsonKinesisDFSSource(PROPS_FILENAME_TEST_JSON_KINESIS,
streamName);
+ String tableBasePath = basePath + "/test_json_kinesis_agg_table" +
testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKinesisSource.class.getName(),
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KINESIS, false,
+ true, 100000, false, null, null, "timestamp", null), jsc);
+ deltaStreamer.sync();
+ assertRecordCount(6, tableBasePath, sqlContext);
+ deltaStreamer.shutdownGracefully();
+ testNum++;
+ }
+
+ @Test
+ public void testJsonKinesisShardSplitCheckpoint() throws Exception {
+ String streamName = "test-kinesis-stream-split-" + testNum;
+ kinesisTestUtils.createStream(streamName, 2);
+ prepareJsonKinesisDFSFiles(10, false, streamName);
+ prepareJsonKinesisDFSSource(PROPS_FILENAME_TEST_JSON_KINESIS,
streamName);
+ String tableBasePath = basePath + "/test_json_kinesis_split_table" +
testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKinesisSource.class.getName(),
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KINESIS, false,
+ true, 100000, false, null, null, "timestamp", null), jsc);
+ deltaStreamer.sync();
+ assertRecordCount(10, tableBasePath, sqlContext);
+ String checkpointAfterBatch1 =
getCheckpointFromLatestCommit(tableBasePath);
+ assertNotNull(checkpointAfterBatch1);
+ assertTrue(checkpointAfterBatch1.startsWith(streamName + ","));
+ assertTrue(checkpointAfterBatch1.contains(":"), "Checkpoint should have
shard:seq format");
+
+ kinesisTestUtils.updateShardCount(streamName, 4);
+ prepareJsonKinesisDFSFiles(5, false, streamName);
+ deltaStreamer.sync();
+ assertRecordCount(15, tableBasePath, sqlContext);
+ String checkpointAfterSplit =
getCheckpointFromLatestCommit(tableBasePath);
+ assertNotNull(checkpointAfterSplit);
+ assertTrue(checkpointAfterSplit.startsWith(streamName + ","));
+ // Closed parent shards must have lastSeq|endSeq with endSeq <= lastSeq
so we can detect
+ // "fully consumed" when parent expires. LocalStack returns
Long.MAX_VALUE; we replace with lastSeq.
+ assertFalse(checkpointAfterSplit.contains("9223372036854775807"),
+ "Checkpoint should not contain Long.MAX_VALUE as endSeq (parent
expiry would fail)");
+ int initialShardCount =
KinesisOffsetGen.CheckpointUtils.strToOffsets(checkpointAfterBatch1).size();
+ int shardCountAfterSplit =
KinesisOffsetGen.CheckpointUtils.strToOffsets(checkpointAfterSplit).size();
+ assertTrue(shardCountAfterSplit > initialShardCount,
+ "Checkpoint after split should have more shards (" +
shardCountAfterSplit
+ + ") than initial (" + initialShardCount + ")");
+ deltaStreamer.shutdownGracefully();
+ testNum++;
+ }
+
+ @Test
+ public void testJsonKinesisShardMergeCheckpoint() throws Exception {
+ String streamName = "test-kinesis-stream-merge-" + testNum;
+ kinesisTestUtils.createStream(streamName, 4);
+ prepareJsonKinesisDFSFiles(8, false, streamName);
+ prepareJsonKinesisDFSSource(PROPS_FILENAME_TEST_JSON_KINESIS,
streamName);
+ String tableBasePath = basePath + "/test_json_kinesis_merge_table" +
testNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKinesisSource.class.getName(),
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KINESIS, false,
+ true, 100000, false, null, null, "timestamp", null), jsc);
+ deltaStreamer.sync();
+ assertRecordCount(8, tableBasePath, sqlContext);
+ String checkpointAfterBatch1 =
getCheckpointFromLatestCommit(tableBasePath);
+ assertNotNull(checkpointAfterBatch1);
+
+ kinesisTestUtils.updateShardCount(streamName, 2);
+ prepareJsonKinesisDFSFiles(4, false, streamName);
+ deltaStreamer.sync();
+ assertRecordCount(12, tableBasePath, sqlContext);
+ String checkpointAfterMerge =
getCheckpointFromLatestCommit(tableBasePath);
+ assertNotNull(checkpointAfterMerge);
+ assertTrue(checkpointAfterMerge.startsWith(streamName + ","));
+ int initialShardCount =
KinesisOffsetGen.CheckpointUtils.strToOffsets(checkpointAfterBatch1).size();
+ int shardCountAfterMerge =
KinesisOffsetGen.CheckpointUtils.strToOffsets(checkpointAfterMerge).size();
+ assertTrue(shardCountAfterMerge > initialShardCount,
Review Comment:
After merge, new child shards are generated, but the parent shards are still
there and not expired yet. So ">" not "<"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]