johnclara opened a new issue #1637:
URL: https://github.com/apache/iceberg/issues/1637


   When there are duplicate data files in a table, spark reads fail. This 
happens for us when we retry a successful commit to a table.
   
   The exception:
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 
in stage 0.0 (TID 7, ip-10-26-15-75.prod.dp.box.net, executor 2): 
java.lang.IllegalArgumentException: Multiple entries with same key: 
s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/data/70fd3b2d-41db-4ab1-b583-44a10ad0678b.avro=com.box.dataplatform.iceberg.encryption.DecryptingInputFile@141fc36e
 and 
s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/data/70fd3b2d-41db-4ab1-b583-44a10ad0678b.avro=com.box.dataplatform.iceberg.encryption.DecryptingInputFile@2d8cffef
        at 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.conflictException(ImmutableMap.java:214)
        at 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.checkNoConflict(ImmutableMap.java:208)
        at 
org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.checkNoConflictInKeyBucket(RegularImmutableMap.java:146)
        at 
org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.fromEntryArray(RegularImmutableMap.java:109)
        at 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap$Builder.build(ImmutableMap.java:392)
        at 
org.apache.iceberg.spark.source.BaseDataReader.<init>(BaseDataReader.java:70)
        at 
org.apache.iceberg.spark.source.RowDataReader.<init>(RowDataReader.java:75)
        at 
org.apache.iceberg.spark.source.Reader$RowReader.<init>(Reader.java:511)
        at 
org.apache.iceberg.spark.source.Reader$InternalRowReaderFactory.create(Reader.java:489)
        at 
org.apache.iceberg.spark.source.Reader$ReadTask.createPartitionReader(Reader.java:439)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
        at 
org.apache.spark.sql.Dataset$$anonfun$isEmpty$1.apply(Dataset.scala:534)
        at 
org.apache.spark.sql.Dataset$$anonfun$isEmpty$1.apply(Dataset.scala:533)
        at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
        at org.apache.spark.sql.Dataset.isEmpty(Dataset.scala:533)
        at 
com.box.dataplatform.cdc.compact.CdcCompactJob.run(CdcCompactJob.scala:62)
        at 
com.box.dataplatform.spark.context.DPSparkApp$class.runApp(DPSparkApp.scala:29)
        at 
com.box.dataplatform.spark.context.DPSparkApp$class.$init$(DPSparkApp.scala:11)
        at 
com.box.dataplatform.cdc.compact.CdcCompactApp$.<init>(CdcCompactApp.scala:5)
        at 
com.box.dataplatform.cdc.compact.CdcCompactApp$.<clinit>(CdcCompactApp.scala)
   ```
   
   The bad snapshots:
   ```
   val table = catalog.loadTable(tableId)
   def getAllDatafiles(table: Table): List[DataFile] = {
       table.refresh()
   
       val scan = table.newScan()
       scan.planFiles().iterator().asScala.toList.map(_.file())
   }
   val files = getAllDatafiles(table)
   val result = 
files.filter(_.path().toString.contains("70fd3b2d-41db-4ab1-b583-44a10ad0678b"))
   result.foreach(println(_))
   
   Output:
   GenericDataFile{content=data, 
file_path=s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/data/70fd3b2d-41db-4ab1-b583-44a10ad0678b.avro,
 file_format=AVRO, partition=PartitionData{}, record_count=1, 
file_size_in_bytes=2184, column_sizes=null, value_counts=null, 
null_value_counts=null, lower_bounds=null, upper_bounds=null, 
key_metadata=(redacted), split_offsets=null, equality_ids=null}
   GenericDataFile{content=data, 
file_path=s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/data/70fd3b2d-41db-4ab1-b583-44a10ad0678b.avro,
 file_format=AVRO, partition=PartitionData{}, record_count=1, 
file_size_in_bytes=2184, column_sizes=null, value_counts=null, 
null_value_counts=null, lower_bounds=null, upper_bounds=null, 
key_metadata=(redacted), split_offsets=null, equality_ids=null}
   ```
   
   The snapshots when files are added:
   ```
   val snapshots = table.snapshots.asScala.zipWithIndex.filter { case (k,i) => 
k.addedFiles.asScala.exists(_.path().toString.contains("70fd3b2d-41db-4ab1-b583-44a10ad0678b"))
 }
   snapshots.foreach(println(_))
   
   output:
   (BaseSnapshot{id=8222145611809773150, timestamp_ms=1603152551291, 
operation=append, summary={added-data-files=1, added-records=1, 
changed-partition-count=1, total-records=90, total-data-files=18, 
total-delete-files=0, total-position-deletes=0, total-equality-deletes=0}, 
manifest-list=s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/metadata/snap-8222145611809773150-1-59b2b9e0-2ac9-4ed5-9b00-c37ddab5536a.avro},17)
   (BaseSnapshot{id=1407926955161475586, timestamp_ms=1603152886550, 
operation=append, summary={added-data-files=1, added-records=1, 
changed-partition-count=1, total-records=335, total-data-files=65, 
total-delete-files=0, total-position-deletes=0, total-equality-deletes=0}, 
manifest-list=s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/metadata/snap-1407926955161475586-1-14d529df-9343-401c-b33a-513556c14a7f.avro},64)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to