vamossagar12 commented on PR #12756:
URL: https://github.com/apache/kafka/pull/12756#issuecomment-1279768865
@vpapavas I need your help on this test failure:
org.apache.kafka.streams.kstream.internals.KStreamKStreamSelfJoinTest.shouldMatchInnerJoinWithSelfJoinOutOfOrderMessages
failing with the following stacktrace:
```
java.lang.AssertionError: the number of outputs:[Record{key=A, value=11,
timestamp=0, headers=RecordHeaders(headers = [], isReadOnly = false)},
Record{key=A, value=21, timestamp=9999, headers=RecordHeaders(headers = [],
isReadOnly = false)}, Record{key=A, value=12, timestamp=9999,
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A,
value=22, timestamp=9999, headers=RecordHeaders(headers = [], isReadOnly =
false)}, Record{key=B, value=11, timestamp=11000, headers=RecordHeaders(headers
= [], isReadOnly = false)}, Record{key=A, value=32, timestamp=13000,
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A,
value=23, timestamp=13000, headers=RecordHeaders(headers = [], isReadOnly =
false)}, Record{key=A, value=33, timestamp=13000, headers=RecordHeaders(headers
= [], isReadOnly = false)}, Record{key=A, value=42, timestamp=15000,
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A,
value=43, timestamp=15000, headers=RecordHeaders
(headers = [], isReadOnly = false)}, Record{key=A, value=24, timestamp=15000,
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A,
value=34, timestamp=15000, headers=RecordHeaders(headers = [], isReadOnly =
false)}, Record{key=A, value=44, timestamp=15000, headers=RecordHeaders(headers
= [], isReadOnly = false)}, Record{key=C, value=11, timestamp=16000,
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=D,
value=11, timestamp=17000, headers=RecordHeaders(headers = [], isReadOnly =
false)}, Record{key=A, value=55, timestamp=30000, headers=RecordHeaders(headers
= [], isReadOnly = false)}, Record{key=A, value=53, timestamp=13000,
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A,
value=54, timestamp=15000, headers=RecordHeaders(headers = [], isReadOnly =
false)}, Record{key=A, value=55, timestamp=6000, headers=RecordHeaders(headers
= [], isReadOnly = false)}, Record{key=A, value=35, timestamp=13000,
headers=RecordHeaders(headers
= [], isReadOnly = false)}, Record{key=A, value=45, timestamp=15000,
headers=RecordHeaders(headers = [], isReadOnly = false)}]
Expected: is <20>
but: was <21>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at
org.apache.kafka.test.MockApiProcessor.checkAndClearProcessResult(MockApiProcessor.java:97)
at
org.apache.kafka.streams.kstream.internals.KStreamKStreamSelfJoinTest.shouldMatchInnerJoinWithSelfJoinOutOfOrderMessages(KStreamKStreamSelfJoinTest.java:334)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
at
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
at
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
at
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
at
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatc…
```
This PR is a follow up to https://github.com/apache/kafka/pull/11211 and it
aims to filter expired records from stateful persistent stores(in-memory stores
already supports this). 11211 was merged recently and rolled back because it
broke a couple of test and one of them is this. I see that your changes were
introduced around the same time I made the last commit on the PR. Either ways,
looks like where I need your help is this bit of code:
```
// This is needed so that output records follow timestamp order
// Join this with self
if (inputRecordTimestamp < maxRecordTimestamp &&
!emittedJoinWithSelf) {
emittedJoinWithSelf = true;
context().forward(selfRecord);
}
```
To give you more context, what's happening is that due to the expiry of
records, the Inner join is not returning the self-join records. The stores when
doing the fetch call, compute the lower bound by using this formula:
```
return Math.max(from, observedStreamTime - retentionPeriod + 1);
```
due to which the Self join records are ignored from inner join. But, because
of the condition above, the self join calculation is returning the self join
record causing the test to fail.
What I want to know is when records are being expired, with this logic
there's a chance that the output of inner join and self-join for out of order
records might not match. The way I see it, what inner join is returning now
seems correct. WDYT? cc @ableegoldman , @cadonna
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]