Myasuka commented on a change in pull request #11482: [FLINK-16581][table] 
Minibatch deduplication lack state TTL bug fix
URL: https://github.com/apache/flink/pull/11482#discussion_r409339136
 
 

 ##########
 File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateKeepFirstRowFunctionTest.java
 ##########
 @@ -71,4 +71,41 @@ public void testKeepFirstRowWithGenerateRetraction() throws 
Exception {
                testHarness.close();
        }
 
+       @Test
+       public void tesKeepFirstRowWithStateTtl() throws Exception {
+               MiniBatchDeduplicateKeepFirstRowFunction func = new 
MiniBatchDeduplicateKeepFirstRowFunction(typeSerializer, 
minTime.toMilliseconds());
+               OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness 
= createTestHarness(func);
+               testHarness.setup();
+               testHarness.open();
+               testHarness.processElement(record("book", 1L, 12));
+               testHarness.processElement(record("book", 2L, 11));
+               // output is empty because bundle not trigger yet.
+               Assert.assertTrue(testHarness.getOutput().isEmpty());
+               testHarness.processElement(record("book", 1L, 13));
+
+               testHarness.setStateTtlProcessingTime(30);
+               //Incremental cleanup is an eventual clean up, more state 
access guarantee more expired state cleaned
+               for (long i = 3; i < 30; i++) {
+                       testHarness.processElement(record("book", i, 20));
 
 Review comment:
   Since we choose `ReturnExpiredIfNotCleanedUp` for better performance, and 
verifying the data has been physically deleted is a topic related to the 
implementation of clean up strategy for different state backends. For RocksDB, 
we have to reply on the compaction taken for sst files containing those keys. 
For Heap KeyedStateBackend, the records to fetch for cleaning up is related to 
the key groups (how many state map would existed) in one state backend. 
   I think state backend module could guarantee the correctness, and this 
should be out of scope for SQL module. Once state backend changes the 
implementation of TTL cleanup strategy, verifying the data has been physically 
deleted in SQL module might be unstable.
   In a nutshell, we would not need to ensure data has been physically deleted 
here, verifying the state in `MiniBatchDeduplicateKeepFirstRowFunction` has 
been configured with TTL is enough. Otherwise, if we use `NeverReturnExpired` 
strategy, things would be much simpler here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to