C0urante commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1176509940
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ########## @@ -57,52 +57,96 @@ public void testOffsetTranslation() { // Emit synced downstream offset without dead-reckoning store.sync(tp, 100, 200); - assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150)); + assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150)); // Translate exact offsets store.sync(tp, 150, 251); - assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150)); + assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150)); // Use old offset (5) prior to any sync -> can't translate - assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5)); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5)); // Downstream offsets reset store.sync(tp, 200, 10); - assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200)); // Upstream offsets reset store.sync(tp, 20, 20); - assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20)); + assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20)); } } @Test public void testNoTranslationIfStoreNotStarted() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { // no offsets exist and store is not started - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // read a sync during startup store.sync(tp, 100, 200); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(); - assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0)); - assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100)); - assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200)); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } } @Test public void testNoTranslationIfNoOffsetSync() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { store.start(); - assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0)); + assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); } } + + @Test + public void testPastOffsetTranslation() { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + long maxOffsetLag = 10; + int offset = 0; + for (; offset <= 1000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + } + store.start(); + + // After starting but before seeing new offsets, only the latest startup offset can be translated + assertSparseSync(store, 1000, -1); + + for (; offset <= 2000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + } + + // After seeing new offsets, we still cannot translate earlier than the latest startup offset + assertSparseSync(store, 1000, -1); + + // We can translate offsets between the latest startup offset and the latest offset with variable precision + // Older offsets are less precise and translation ends up farther apart + assertSparseSync(store, 1030, 1000); + assertSparseSync(store, 1540, 1030); + assertSparseSync(store, 1800, 1540); + assertSparseSync(store, 1920, 1800); + assertSparseSync(store, 1990, 1920); + assertSparseSync(store, 2000, 1990); Review Comment: Thanks for the explanation! I like the values here. They illustrate the granularity with which we can translate offsets (and how that granularity decreases as we have to use older and older data) quite nicely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org