igorcalabria opened a new issue #4288:
URL: https://github.com/apache/iceberg/issues/4288


   Using snapshot version `0.13.0-20220307.001124-2`. Spark version 3.2 
   
   ```
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 
308) (10.103.53.2 executor 1): java.lang.NullPointerException
        at java.base/java.util.Objects.requireNonNull(Objects.java:221)
        at 
org.apache.iceberg.hadoop.HadoopMetricsContext.counter(HadoopMetricsContext.java:80)
        at 
org.apache.iceberg.aws.s3.S3OutputStream.<init>(S3OutputStream.java:135)
        at 
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:60)
        at 
org.apache.iceberg.avro.AvroFileAppender.<init>(AvroFileAppender.java:51)
        at org.apache.iceberg.avro.Avro$WriteBuilder.build(Avro.java:191)
        at 
org.apache.iceberg.ManifestWriter$V1Writer.newAppender(ManifestWriter.java:281)
        at org.apache.iceberg.ManifestWriter.<init>(ManifestWriter.java:58)
        at org.apache.iceberg.ManifestWriter.<init>(ManifestWriter.java:34)
        at 
org.apache.iceberg.ManifestWriter$V1Writer.<init>(ManifestWriter.java:260)
        at org.apache.iceberg.ManifestFiles.write(ManifestFiles.java:117)
        at 
org.apache.iceberg.spark.actions.BaseRewriteManifestsSparkAction.writeManifest(BaseRewriteManifestsSparkAction.java:324)
        at 
org.apache.iceberg.spark.actions.BaseRewriteManifestsSparkAction.lambda$toManifests$afb7bc39$1(BaseRewriteManifestsSparkAction.java:354)
        at 
org.apache.spark.sql.Dataset.$anonfun$mapPartitions$1(Dataset.scala:2826)
        at 
org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   
   ```
   I think this may have been introduced in 
https://github.com/apache/iceberg/pull/4050 From a quick glance at the code it 
seems that the root cause is that `HadoopMetricsContext#statistics` is a 
transient variable. If I understood thingas correctly, `HadoopMetricsContext` 
is initialized in `S3FileIo`:
   
   ```java
     @Override
     public void initialize(Map<String, String> properties) {
       this.awsProperties = new AwsProperties(properties);
   
       // Do not override s3 client if it was provided
       if (s3 == null) {
         this.s3 = AwsClientFactories.from(properties)::s3;
       }
   
       // Report Hadoop metrics if Hadoop is available
       try {
         DynConstructors.Ctor<MetricsContext> ctor =
             
DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL, 
String.class).buildChecked();
         this.metrics = ctor.newInstance("s3");
   
         metrics.initialize(properties);
       } catch (NoSuchMethodException | ClassCastException e) {
         LOG.warn("Unable to load metrics class: '{}', falling back to null 
metrics", DEFAULT_METRICS_IMPL, e);
       }
     }
   ``` 
   Since `FileIO` is a broadcast variable in 
`org.apache.iceberg.spark.actions.BaseRewriteManifestsSparkAction` 
   
   ```java
     private static ManifestFile writeManifest(
         List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
         String location, int format, PartitionSpec spec, StructType sparkType) 
throws IOException {
   ...
   ```
   It seems that `HadoopMetricsContext` is serialized along with it and when 
its deserialized at the executor, `statistcs` variable will be null because it 
was marked as transient. I'm not really sure if that's exactly whats happening 
but the only way I could reproduce this outside of spark was calling 
`HadoopMetricsContext#counter` with statistics variable set to null and the 
most obvious way that this could happens is after a deserialization


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to