vamossagar12 commented on PR #12756:
URL: https://github.com/apache/kafka/pull/12756#issuecomment-1279768865

   @vpapavas I need your help on this test failure: 
org.apache.kafka.streams.kstream.internals.KStreamKStreamSelfJoinTest.shouldMatchInnerJoinWithSelfJoinOutOfOrderMessages
 failing with the following stacktrace:
   
   ```
   java.lang.AssertionError: the number of outputs:[Record{key=A, value=11, 
timestamp=0, headers=RecordHeaders(headers = [], isReadOnly = false)}, 
Record{key=A, value=21, timestamp=9999, headers=RecordHeaders(headers = [], 
isReadOnly = false)}, Record{key=A, value=12, timestamp=9999, 
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A, 
value=22, timestamp=9999, headers=RecordHeaders(headers = [], isReadOnly = 
false)}, Record{key=B, value=11, timestamp=11000, headers=RecordHeaders(headers 
= [], isReadOnly = false)}, Record{key=A, value=32, timestamp=13000, 
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A, 
value=23, timestamp=13000, headers=RecordHeaders(headers = [], isReadOnly = 
false)}, Record{key=A, value=33, timestamp=13000, headers=RecordHeaders(headers 
= [], isReadOnly = false)}, Record{key=A, value=42, timestamp=15000, 
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A, 
value=43, timestamp=15000, headers=RecordHeaders
 (headers = [], isReadOnly = false)}, Record{key=A, value=24, timestamp=15000, 
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A, 
value=34, timestamp=15000, headers=RecordHeaders(headers = [], isReadOnly = 
false)}, Record{key=A, value=44, timestamp=15000, headers=RecordHeaders(headers 
= [], isReadOnly = false)}, Record{key=C, value=11, timestamp=16000, 
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=D, 
value=11, timestamp=17000, headers=RecordHeaders(headers = [], isReadOnly = 
false)}, Record{key=A, value=55, timestamp=30000, headers=RecordHeaders(headers 
= [], isReadOnly = false)}, Record{key=A, value=53, timestamp=13000, 
headers=RecordHeaders(headers = [], isReadOnly = false)}, Record{key=A, 
value=54, timestamp=15000, headers=RecordHeaders(headers = [], isReadOnly = 
false)}, Record{key=A, value=55, timestamp=6000, headers=RecordHeaders(headers 
= [], isReadOnly = false)}, Record{key=A, value=35, timestamp=13000, 
headers=RecordHeaders(headers
  = [], isReadOnly = false)}, Record{key=A, value=45, timestamp=15000, 
headers=RecordHeaders(headers = [], isReadOnly = false)}]
   Expected: is <20>
        but: was <21>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.test.MockApiProcessor.checkAndClearProcessResult(MockApiProcessor.java:97)
        at 
org.apache.kafka.streams.kstream.internals.KStreamKStreamSelfJoinTest.shouldMatchInnerJoinWithSelfJoinOutOfOrderMessages(KStreamKStreamSelfJoinTest.java:334)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
        at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
        at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
        at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
        at 
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
        at 
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
        at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
        at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
        at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
        at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
        at org.gradle.internal.dispatc…
   ```
   
   This PR is a follow up to https://github.com/apache/kafka/pull/11211 and it 
aims to filter expired records from stateful persistent stores(in-memory stores 
already supports this). 11211 was merged recently and rolled back because it 
broke a couple of test and one of them is this. I see that your changes were 
introduced around the same time I made the last commit on the PR. Either ways, 
looks like where I need your help is this bit of code:
   
   ```
   // This is needed so that output records follow timestamp order
                       // Join this with self
                       if (inputRecordTimestamp < maxRecordTimestamp && 
!emittedJoinWithSelf) {
                           emittedJoinWithSelf = true;
                           context().forward(selfRecord);
                       }
   ```
   
   To give you more context, what's happening is that due to the expiry of 
records, the Inner join is not returning the self-join records. The stores when 
doing the fetch call, compute the lower bound by using this formula:
   
   ```
   return Math.max(from, observedStreamTime - retentionPeriod + 1);
   ```
   
   due to which the Self join records are ignored from inner join. But, because 
of the condition above, the self join calculation is returning the self join 
record causing the test to fail.
   
   What I want to know is when records are being expired, with this logic 
there's a chance that the output of inner join and self-join for out of order 
records might not match. The way I see it, what inner join is returning now 
seems correct. WDYT? cc @ableegoldman , @cadonna 
   
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to