[
https://issues.apache.org/jira/browse/HUDI-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hui An updated HUDI-5253:
-------------------------
Summary: HoodieMergeOnReadTableInputFormat could have duplicate records
issue if it contains delta files while still splittable (was: Fix flaky test
TestHoodieClientOnMergeOnReadStorage#testLogCompactionOnMORTable could have
duplicate records issue)
> HoodieMergeOnReadTableInputFormat could have duplicate records issue if it
> contains delta files while still splittable
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: HUDI-5253
> URL: https://issues.apache.org/jira/browse/HUDI-5253
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Hui An
> Priority: Major
> Labels: pull-request-available
>
> Sometimes could throw {{IllegalStateException}} duplicates key error when we
> run the CI.
> {code:java}
> java.lang.IllegalStateException: Duplicate key {"_hoodie_commit_time":
> 20221122170106908, "_hoodie_commit_seqno": 20221122170106908_0_48,
> "_hoodie_record_key": 53c028d3-e4b0-4d6f-a041-f09e418c36b3,
> "_hoodie_partition_path": 2016/03/15, "_hoodie_file_name":
> 6a9fff5b-3b75-48ad-85fe-c621c9a2c25d-0, "timestamp": 0, "_row_key":
> 53c028d3-e4b0-4d6f-a041-f09e418c36b3, "partition_path": 2016/03/15, "rider":
> rider-20221122170106908, "driver": driver-20221122170106908, "begin_lat":
> 0.5407076277518825, "begin_lon": 0.39726822192851885, "end_lat":
> 0.49363027135660975, "end_lon": 0.6482366665027408, "distance_in_meters":
> -1534272590, "seconds_since_epoch": 6103867871123100710, "weight":
> 0.38126373, "nation": 7b 62 79 74 65 73 3d 43 61 6e 61 64 61 7d,
> "current_date": 1970-01-17, "current_ts": 1460315658, "height": 0.093258,
> "city_to_state": org.apache.hadoop.io.ArrayWritable@1b28684, "fare":
> org.apache.hadoop.io.ArrayWritable@694b818f, "tip_history":
> org.apache.hadoop.io.ArrayWritable@63cb1a9a, "_hoodie_is_deleted": false}
> at
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
> at java.util.HashMap.merge(HashMap.java:1254)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:95)
> at
> org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:80)
> at
> org.apache.hudi.client.functional.TestHoodieClientOnMergeOnReadStorage.testLogCompactionOnMORTable(TestHoodieClientOnMergeOnReadStorage.java:187)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
> at
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
> at
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
> at
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
> at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
> at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
> at
> com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
> at
> com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> {code}
> We can easily to reproduce this in
> {{org.apache.hudi.testutils.HoodieMergeOnReadTestUtils#getRecordsUsingInputFormat}}
> to allow it create more splits
> {code:java}
> FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
> // Add 3 to the inputPaths to create more splits
> InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size() + 3);
> for (InputSplit split : splits) {
> // ...
> }
> {code}
> We actually cannot allow a path to be splitable if it contains delta files.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)