zhangyue19921010 commented on code in PR #13592:
URL: https://github.com/apache/hudi/pull/13592#discussion_r2230584449
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java:
##########
@@ -462,6 +462,47 @@ public void testStopWithSavepointAndRestore() throws
Exception {
}
}
+ @Test
+ public void testCheckpointRestoreWithLimit() throws Exception {
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ conf.setInteger(FlinkOptions.READ_SPLITS_LIMIT, 2);
+ StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+ OperatorSubtaskState state;
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
+ harness.setup();
+ harness.open();
+ CountDownLatch latch = new CountDownLatch(2);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+ runAsync(sourceContext, function);
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ state = harness.snapshot(1, 1);
+ // Stop the stream task.
+ function.close();
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2"));
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getInstantRange().isPresent()),
+ "All instants should have range limit");
+ }
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+ StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function2)) {
+ harness.setup();
+ // Recover to process the remaining snapshots.
+ harness.initializeState(state);
+ harness.open();
+ CountDownLatch latch = new CountDownLatch(6);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+ runAsync(sourceContext, function2);
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ // Stop the stream task.
+ function.close();
Review Comment:
close wrong function. need to `function2.close();`
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java:
##########
@@ -462,6 +462,47 @@ public void testStopWithSavepointAndRestore() throws
Exception {
}
}
+ @Test
+ public void testCheckpointRestoreWithLimit() throws Exception {
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ conf.setInteger(FlinkOptions.READ_SPLITS_LIMIT, 2);
+ StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+ OperatorSubtaskState state;
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
+ harness.setup();
+ harness.open();
+ CountDownLatch latch = new CountDownLatch(2);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+ runAsync(sourceContext, function);
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ state = harness.snapshot(1, 1);
+ // Stop the stream task.
+ function.close();
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2"));
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getInstantRange().isPresent()),
+ "All instants should have range limit");
+ }
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+ StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function2)) {
+ harness.setup();
+ // Recover to process the remaining snapshots.
+ harness.initializeState(state);
+ harness.open();
+ CountDownLatch latch = new CountDownLatch(6);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+ runAsync(sourceContext, function2);
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ // Stop the stream task.
+ function.close();
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2,par3,par3,par4,par4"));
Review Comment:
`"par1,par2,par3,par3,par4,par4"` -> "par1,par2,par3,par4" ?
Since `getPartitionPaths` has distinct partitions in split.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java:
##########
@@ -462,6 +462,47 @@ public void testStopWithSavepointAndRestore() throws
Exception {
}
}
+ @Test
+ public void testCheckpointRestoreWithLimit() throws Exception {
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ conf.setInteger(FlinkOptions.READ_SPLITS_LIMIT, 2);
+ StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+ OperatorSubtaskState state;
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
+ harness.setup();
+ harness.open();
+ CountDownLatch latch = new CountDownLatch(2);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+ runAsync(sourceContext, function);
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ state = harness.snapshot(1, 1);
+ // Stop the stream task.
+ function.close();
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2"));
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getInstantRange().isPresent()),
+ "All instants should have range limit");
+ }
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+ StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function2)) {
+ harness.setup();
+ // Recover to process the remaining snapshots.
+ harness.initializeState(state);
+ harness.open();
+ CountDownLatch latch = new CountDownLatch(6);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+ runAsync(sourceContext, function2);
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
Review Comment:
need to trigger cp after run async `state = harness.snapshot(2, 1);`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]