mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107705172


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - 
GRACE_PERIOD)); // this record will be older than grace period by the end of 
the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, 
SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", 
BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", 
BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Was just an idea. Not a big deal to not have the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to