[
https://issues.apache.org/jira/browse/HUDI-2774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-2774:
--------------------------------------
Description:
Setup:
Started deltastreamer with parquet dfs source. source folder did not have any
data as such. Enabled async clustering with below props
```
hoodie.clustering.async.max.commits=2
hoodie.clustering.plan.strategy.sort.columns=type,id
```
Added 1 file to the source folder. and deltastreamer failed during this. commit
went through fine. looks like 1st replace commit also went through fine. but
deltastreamer failed. I need to understand why deltastreamer tries to schedule
a 2nd replace commit as well. It runs in continuous mode and goes into next
round immediately and there is no more data to sync.
clustering plan seems to be same in both replace commit requested meta files
{code:java}
^@&<93>c%^Z<F1><81>9%<E6>-^K<EF><AC><FC>A^B<BA>^G^B^NCLUSTER^B^B^B^B^B^B<EC>^Afile:/tmp/hudi-deltastreamer-gh-mw/PushEvent/2542ddef-0169-4978-9b1b-84977d6141cf-0_0-49-161_20211116130523827.parquet^B^@^BL2542ddef-0169-4978-9b1b-84977d6141cf-0^B^RPushEvent^B^@^@^B^@^B
^^TOTAL_LOG_FILES^@^@^@^@^@^@^@^@^VTOTAL_IO_MB^@^@^@^@^@^@^@^@
TOTAL_IO_READ_MB^@^@^@^@^@^@^@^@(TOTAL_LOG_FILES_SIZE^@^@^@^@^@^@^@^@"TOTAL_IO_WRITE_MB^@^@^@^@^@^@^@^@^@^@^B^@^B^@^B^B<A0>^Aorg.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy^B^BXhoodie.clustering.plan.strategy.sort.columns^Ntype,id^@^@^B^B^@^@^B^B^A^B^@^@^B&<93>c%^Z<F1><81>9%<E6>-^K<EF><AC><FC>A{code}
{code:java}
^@<FB>^L%b<C3>3<85><D7><<BB><A3><B1><BA>
<A9><89>^B<BA>^G^B^NCLUSTER^B^B^B^B^B^B<EC>^Afile:/tmp/hudi-deltastreamer-gh-mw/PushEvent/2542ddef-0169-4978-9b1b-84977d6141cf-0_0-49-161_20211116130523827.parquet^B^@^BL2542ddef-0169-4978-9b1b-84977d6141cf-0^B^RPushEvent^B^@^@^B^@^B
^^TOTAL_LOG_FILES^@^@^@^@^@^@^@^@^VTOTAL_IO_MB^@^@^@^@^@^@^@^@
TOTAL_IO_READ_MB^@^@^@^@^@^@^@^@(TOTAL_LOG_FILES_SIZE^@^@^@^@^@^@^@^@"TOTAL_IO_WRITE_MB^@^@^@^@^@^@^@^@^@^@^B^@^B^@^B^B<A0>^Aorg.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy^B^BXhoodie.clustering.plan.strategy.sort.columns^Ntype,id^@^@^B^B^@^@^B^B^A^B^@^@^B<FB>^L%b<C3>3<85><D7><<BB><A3><B1><BA>
<A9><89> {code}
timeline
!Screen Shot 2021-11-16 at 12.42.20 PM.png!
stacktrace:
{code:java}
21/11/16 13:05:20 WARN HoodieDeltaStreamer: Next round
21/11/16 13:05:20 WARN DeltaSync: Extra metadata :: 20211116130512915,
20211116130512915.commit, = [schema, deltastreamer.checkpoint.key]
21/11/16 13:05:23 WARN HoodieDeltaStreamer: Starting async clustering service
if required 111
21/11/16 13:05:27 WARN HoodieDeltaStreamer: Scheduled async clustering for
instant: 20211116130526895
21/11/16 13:05:27 WARN HoodieDeltaStreamer: Next round
21/11/16 13:05:27 WARN DeltaSync: Extra metadata :: 20211116130523827,
20211116130523827.commit, = [schema, deltastreamer.checkpoint.key]
21/11/16 13:05:27 WARN HoodieDeltaStreamer: Scheduled async clustering for
instant: 20211116130527394
21/11/16 13:05:27 WARN HoodieDeltaStreamer: Next round
21/11/16 13:05:27 WARN DeltaSync: Extra metadata :: 20211116130523827,
20211116130523827.commit, = [schema, deltastreamer.checkpoint.key]
21/11/16 13:05:28 ERROR Executor: Exception in task 0.0 in stage 74.0 (TID 176)
java.lang.IllegalStateException: Duplicate key
[==>20211116130526895__replacecommit__INFLIGHT]
at
java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1254)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(ClusteringUtils.java:127)
at
org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:113)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:100)
at
org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:168)
at
org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$5fcdabfe$1(FileSystemViewManager.java:259)
at
org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:111)
at
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at
org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:110)
at org.apache.hudi.table.HoodieTable.getSliceView(HoodieTable.java:277)
at
org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy.getFileSlicesEligibleForClustering(ClusteringPlanStrategy.java:77)
at
org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy.getFileSlicesEligibleForClustering(SparkSizeBasedClusteringPlanStrategy.java:118)
at
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.lambda$generateClusteringPlan$4e6aac78$1(PartitionAwareClusteringPlanStrategy.java:79)
at
org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/11/16 13:05:28 WARN TaskSetManager: Lost task 0.0 in stage 74.0 (TID 176,
localhost, executor driver): java.lang.IllegalStateException: Duplicate key
[==>20211116130526895__replacecommit__INFLIGHT] {code}
was:
Setup:
Started deltastreamer with parquet dfs source. source folder did not have any
data as such. Enabled async clustering with below props
```
hoodie.clustering.async.max.commits=2
hoodie.clustering.plan.strategy.sort.columns=type,id
```
Added 1 file to the source folder. and deltastreamer failed during this. commit
went through fine. looks like 1st replace commit also went through fine. but
deltastreamer failed. I need to understand why deltastreamer tries to schedule
a 2nd replace commit as well. It runs in continuous mode and goes into next
round immediately and there is no more data to sync.
clustering plan seems to be same in both replace commit requested meta files
{code:java}
^@&<93>c%^Z<F1><81>9%<E6>-^K<EF><AC><FC>A^B<BA>^G^B^NCLUSTER^B^B^B^B^B^B<EC>^Afile:/tmp/hudi-deltastreamer-gh-mw/PushEvent/2542ddef-0169-4978-9b1b-84977d6141cf-0_0-49-161_20211116130523827.parquet^B^@^BL2542ddef-0169-4978-9b1b-84977d6141cf-0^B^RPushEvent^B^@^@^B^@^B
^^TOTAL_LOG_FILES^@^@^@^@^@^@^@^@^VTOTAL_IO_MB^@^@^@^@^@^@^@^@
TOTAL_IO_READ_MB^@^@^@^@^@^@^@^@(TOTAL_LOG_FILES_SIZE^@^@^@^@^@^@^@^@"TOTAL_IO_WRITE_MB^@^@^@^@^@^@^@^@^@^@^B^@^B^@^B^B<A0>^Aorg.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy^B^BXhoodie.clustering.plan.strategy.sort.columns^Ntype,id^@^@^B^B^@^@^B^B^A^B^@^@^B&<93>c%^Z<F1><81>9%<E6>-^K<EF><AC><FC>A{code}
{code:java}
^@<FB>^L%b<C3>3<85><D7><<BB><A3><B1><BA>
<A9><89>^B<BA>^G^B^NCLUSTER^B^B^B^B^B^B<EC>^Afile:/tmp/hudi-deltastreamer-gh-mw/PushEvent/2542ddef-0169-4978-9b1b-84977d6141cf-0_0-49-161_20211116130523827.parquet^B^@^BL2542ddef-0169-4978-9b1b-84977d6141cf-0^B^RPushEvent^B^@^@^B^@^B
^^TOTAL_LOG_FILES^@^@^@^@^@^@^@^@^VTOTAL_IO_MB^@^@^@^@^@^@^@^@
TOTAL_IO_READ_MB^@^@^@^@^@^@^@^@(TOTAL_LOG_FILES_SIZE^@^@^@^@^@^@^@^@"TOTAL_IO_WRITE_MB^@^@^@^@^@^@^@^@^@^@^B^@^B^@^B^B<A0>^Aorg.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy^B^BXhoodie.clustering.plan.strategy.sort.columns^Ntype,id^@^@^B^B^@^@^B^B^A^B^@^@^B<FB>^L%b<C3>3<85><D7><<BB><A3><B1><BA>
<A9><89> {code}
timeline
!Screen Shot 2021-11-16 at 12.42.20 PM.png!
> Async Clustering via deltstreamer fails with IllegalStateException: Duplicate
> key [==>20211116123724586__replacecommit__INFLIGHT]
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: HUDI-2774
> URL: https://issues.apache.org/jira/browse/HUDI-2774
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: sivabalan narayanan
> Assignee: Sagar Sumit
> Priority: Blocker
> Fix For: 0.10.0
>
> Attachments: Screen Shot 2021-11-16 at 12.42.20 PM.png
>
>
> Setup:
> Started deltastreamer with parquet dfs source. source folder did not have any
> data as such. Enabled async clustering with below props
> ```
> hoodie.clustering.async.max.commits=2
> hoodie.clustering.plan.strategy.sort.columns=type,id
> ```
> Added 1 file to the source folder. and deltastreamer failed during this.
> commit went through fine. looks like 1st replace commit also went through
> fine. but deltastreamer failed. I need to understand why deltastreamer tries
> to schedule a 2nd replace commit as well. It runs in continuous mode and
> goes into next round immediately and there is no more data to sync.
>
> clustering plan seems to be same in both replace commit requested meta files
> {code:java}
> ^@&<93>c%^Z<F1><81>9%<E6>-^K<EF><AC><FC>A^B<BA>^G^B^NCLUSTER^B^B^B^B^B^B<EC>^Afile:/tmp/hudi-deltastreamer-gh-mw/PushEvent/2542ddef-0169-4978-9b1b-84977d6141cf-0_0-49-161_20211116130523827.parquet^B^@^BL2542ddef-0169-4978-9b1b-84977d6141cf-0^B^RPushEvent^B^@^@^B^@^B
> ^^TOTAL_LOG_FILES^@^@^@^@^@^@^@^@^VTOTAL_IO_MB^@^@^@^@^@^@^@^@
> TOTAL_IO_READ_MB^@^@^@^@^@^@^@^@(TOTAL_LOG_FILES_SIZE^@^@^@^@^@^@^@^@"TOTAL_IO_WRITE_MB^@^@^@^@^@^@^@^@^@^@^B^@^B^@^B^B<A0>^Aorg.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy^B^BXhoodie.clustering.plan.strategy.sort.columns^Ntype,id^@^@^B^B^@^@^B^B^A^B^@^@^B&<93>c%^Z<F1><81>9%<E6>-^K<EF><AC><FC>A{code}
>
> {code:java}
> ^@<FB>^L%b<C3>3<85><D7><<BB><A3><B1><BA>
> <A9><89>^B<BA>^G^B^NCLUSTER^B^B^B^B^B^B<EC>^Afile:/tmp/hudi-deltastreamer-gh-mw/PushEvent/2542ddef-0169-4978-9b1b-84977d6141cf-0_0-49-161_20211116130523827.parquet^B^@^BL2542ddef-0169-4978-9b1b-84977d6141cf-0^B^RPushEvent^B^@^@^B^@^B
> ^^TOTAL_LOG_FILES^@^@^@^@^@^@^@^@^VTOTAL_IO_MB^@^@^@^@^@^@^@^@
> TOTAL_IO_READ_MB^@^@^@^@^@^@^@^@(TOTAL_LOG_FILES_SIZE^@^@^@^@^@^@^@^@"TOTAL_IO_WRITE_MB^@^@^@^@^@^@^@^@^@^@^B^@^B^@^B^B<A0>^Aorg.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy^B^BXhoodie.clustering.plan.strategy.sort.columns^Ntype,id^@^@^B^B^@^@^B^B^A^B^@^@^B<FB>^L%b<C3>3<85><D7><<BB><A3><B1><BA>
> <A9><89> {code}
>
> timeline
> !Screen Shot 2021-11-16 at 12.42.20 PM.png!
>
> stacktrace:
> {code:java}
> 21/11/16 13:05:20 WARN HoodieDeltaStreamer: Next round
> 21/11/16 13:05:20 WARN DeltaSync: Extra metadata :: 20211116130512915,
> 20211116130512915.commit, = [schema, deltastreamer.checkpoint.key]
> 21/11/16 13:05:23 WARN HoodieDeltaStreamer: Starting async clustering service
> if required 111
> 21/11/16 13:05:27 WARN HoodieDeltaStreamer: Scheduled async clustering for
> instant: 20211116130526895
> 21/11/16 13:05:27 WARN HoodieDeltaStreamer: Next round
> 21/11/16 13:05:27 WARN DeltaSync: Extra metadata :: 20211116130523827,
> 20211116130523827.commit, = [schema, deltastreamer.checkpoint.key]
> 21/11/16 13:05:27 WARN HoodieDeltaStreamer: Scheduled async clustering for
> instant: 20211116130527394
> 21/11/16 13:05:27 WARN HoodieDeltaStreamer: Next round
> 21/11/16 13:05:27 WARN DeltaSync: Extra metadata :: 20211116130523827,
> 20211116130523827.commit, = [schema, deltastreamer.checkpoint.key]
> 21/11/16 13:05:28 ERROR Executor: Exception in task 0.0 in stage 74.0 (TID
> 176)
> java.lang.IllegalStateException: Duplicate key
> [==>20211116130526895__replacecommit__INFLIGHT]
> at
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
> at java.util.HashMap.merge(HashMap.java:1254)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(ClusteringUtils.java:127)
> at
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:113)
> at
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:106)
> at
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:100)
> at
> org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:168)
> at
> org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$5fcdabfe$1(FileSystemViewManager.java:259)
> at
> org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:111)
> at
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
> at
> org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:110)
> at org.apache.hudi.table.HoodieTable.getSliceView(HoodieTable.java:277)
> at
> org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy.getFileSlicesEligibleForClustering(ClusteringPlanStrategy.java:77)
> at
> org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy.getFileSlicesEligibleForClustering(SparkSizeBasedClusteringPlanStrategy.java:118)
> at
> org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.lambda$generateClusteringPlan$4e6aac78$1(PartitionAwareClusteringPlanStrategy.java:79)
> at
> org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1334)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 21/11/16 13:05:28 WARN TaskSetManager: Lost task 0.0 in stage 74.0 (TID 176,
> localhost, executor driver): java.lang.IllegalStateException: Duplicate key
> [==>20211116130526895__replacecommit__INFLIGHT] {code}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)