Myasuka commented on a change in pull request #11482: [FLINK-16581][table]
Minibatch deduplication lack state TTL bug fix
URL: https://github.com/apache/flink/pull/11482#discussion_r409339136
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepFirstRowFunctionTest.java
##########
@@ -71,4 +71,41 @@ public void testKeepFirstRowWithGenerateRetraction() throws
Exception {
testHarness.close();
}
+ @Test
+ public void tesKeepFirstRowWithStateTtl() throws Exception {
+ MiniBatchDeduplicateKeepFirstRowFunction func = new
MiniBatchDeduplicateKeepFirstRowFunction(typeSerializer,
minTime.toMilliseconds());
+ OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness
= createTestHarness(func);
+ testHarness.setup();
+ testHarness.open();
+ testHarness.processElement(record("book", 1L, 12));
+ testHarness.processElement(record("book", 2L, 11));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ testHarness.processElement(record("book", 1L, 13));
+
+ testHarness.setStateTtlProcessingTime(30);
+ //Incremental cleanup is an eventual clean up, more state
access guarantee more expired state cleaned
+ for (long i = 3; i < 30; i++) {
+ testHarness.processElement(record("book", i, 20));
Review comment:
Since we choose `ReturnExpiredIfNotCleanedUp` for better performance, and
verifying the data has been physically deleted is a topic related to the
implementation of clean up strategy for different state backends. For RocksDB,
we have to reply on the compaction taken for sst files containing those keys.
For Heap KeyedStateBackend, the records to fetch for cleaning up is related to
the key groups (how many state map would existed) in one state backend.
I think state backend module could guarantee the correctness, and this
should be out of scope for SQL module. Once state backend changes the
implementation of TTL cleanup strategy, verifying the data has been physically
deleted in SQL module might be unstable.
In a nutshell, we would not need to ensure data has been physically deleted
here, verifying the state in `MiniBatchDeduplicateKeepFirstRowFunction` has
been configured with TTL is enough. Otherwise, if we use `NeverReturnExpired`
strategy, things would be much simpler here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services