johnclara opened a new issue #1637:
URL: https://github.com/apache/iceberg/issues/1637
When there are duplicate data files in a table, spark reads fail. This
happens for us when we retry a successful commit to a table.
The exception:
```
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3
in stage 0.0 (TID 7, ip-10-26-15-75.prod.dp.box.net, executor 2):
java.lang.IllegalArgumentException: Multiple entries with same key:
s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/data/70fd3b2d-41db-4ab1-b583-44a10ad0678b.avro=com.box.dataplatform.iceberg.encryption.DecryptingInputFile@141fc36e
and
s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/data/70fd3b2d-41db-4ab1-b583-44a10ad0678b.avro=com.box.dataplatform.iceberg.encryption.DecryptingInputFile@2d8cffef
at
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.conflictException(ImmutableMap.java:214)
at
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.checkNoConflict(ImmutableMap.java:208)
at
org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.checkNoConflictInKeyBucket(RegularImmutableMap.java:146)
at
org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.fromEntryArray(RegularImmutableMap.java:109)
at
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap$Builder.build(ImmutableMap.java:392)
at
org.apache.iceberg.spark.source.BaseDataReader.<init>(BaseDataReader.java:70)
at
org.apache.iceberg.spark.source.RowDataReader.<init>(RowDataReader.java:75)
at
org.apache.iceberg.spark.source.Reader$RowReader.<init>(Reader.java:511)
at
org.apache.iceberg.spark.source.Reader$InternalRowReaderFactory.create(Reader.java:489)
at
org.apache.iceberg.spark.source.Reader$ReadTask.createPartitionReader(Reader.java:439)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
at
org.apache.spark.sql.Dataset$$anonfun$isEmpty$1.apply(Dataset.scala:534)
at
org.apache.spark.sql.Dataset$$anonfun$isEmpty$1.apply(Dataset.scala:533)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.isEmpty(Dataset.scala:533)
at
com.box.dataplatform.cdc.compact.CdcCompactJob.run(CdcCompactJob.scala:62)
at
com.box.dataplatform.spark.context.DPSparkApp$class.runApp(DPSparkApp.scala:29)
at
com.box.dataplatform.spark.context.DPSparkApp$class.$init$(DPSparkApp.scala:11)
at
com.box.dataplatform.cdc.compact.CdcCompactApp$.<init>(CdcCompactApp.scala:5)
at
com.box.dataplatform.cdc.compact.CdcCompactApp$.<clinit>(CdcCompactApp.scala)
```
The bad snapshots:
```
val table = catalog.loadTable(tableId)
def getAllDatafiles(table: Table): List[DataFile] = {
table.refresh()
val scan = table.newScan()
scan.planFiles().iterator().asScala.toList.map(_.file())
}
val files = getAllDatafiles(table)
val result =
files.filter(_.path().toString.contains("70fd3b2d-41db-4ab1-b583-44a10ad0678b"))
result.foreach(println(_))
Output:
GenericDataFile{content=data,
file_path=s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/data/70fd3b2d-41db-4ab1-b583-44a10ad0678b.avro,
file_format=AVRO, partition=PartitionData{}, record_count=1,
file_size_in_bytes=2184, column_sizes=null, value_counts=null,
null_value_counts=null, lower_bounds=null, upper_bounds=null,
key_metadata=(redacted), split_offsets=null, equality_ids=null}
GenericDataFile{content=data,
file_path=s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/data/70fd3b2d-41db-4ab1-b583-44a10ad0678b.avro,
file_format=AVRO, partition=PartitionData{}, record_count=1,
file_size_in_bytes=2184, column_sizes=null, value_counts=null,
null_value_counts=null, lower_bounds=null, upper_bounds=null,
key_metadata=(redacted), split_offsets=null, equality_ids=null}
```
The snapshots when files are added:
```
val snapshots = table.snapshots.asScala.zipWithIndex.filter { case (k,i) =>
k.addedFiles.asScala.exists(_.path().toString.contains("70fd3b2d-41db-4ab1-b583-44a10ad0678b"))
}
snapshots.foreach(println(_))
output:
(BaseSnapshot{id=8222145611809773150, timestamp_ms=1603152551291,
operation=append, summary={added-data-files=1, added-records=1,
changed-partition-count=1, total-records=90, total-data-files=18,
total-delete-files=0, total-position-deletes=0, total-equality-deletes=0},
manifest-list=s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/metadata/snap-8222145611809773150-1-59b2b9e0-2ac9-4ed5-9b00-c37ddab5536a.avro},17)
(BaseSnapshot{id=1407926955161475586, timestamp_ms=1603152886550,
operation=append, summary={added-data-files=1, added-records=1,
changed-partition-count=1, total-records=335, total-data-files=65,
total-delete-files=0, total-position-deletes=0, total-equality-deletes=0},
manifest-list=s3a://box-dp-gbox-us-west-2-prod/gbox.db/enterprise_additional_platform_resource.b25.bootstrap.v25/metadata/snap-1407926955161475586-1-14d529df-9343-401c-b33a-513556c14a7f.avro},64)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]