igorcalabria opened a new issue #4288:
URL: https://github.com/apache/iceberg/issues/4288
Using snapshot version `0.13.0-20220307.001124-2`. Spark version 3.2
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID
308) (10.103.53.2 executor 1): java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at
org.apache.iceberg.hadoop.HadoopMetricsContext.counter(HadoopMetricsContext.java:80)
at
org.apache.iceberg.aws.s3.S3OutputStream.<init>(S3OutputStream.java:135)
at
org.apache.iceberg.aws.s3.S3OutputFile.createOrOverwrite(S3OutputFile.java:60)
at
org.apache.iceberg.avro.AvroFileAppender.<init>(AvroFileAppender.java:51)
at org.apache.iceberg.avro.Avro$WriteBuilder.build(Avro.java:191)
at
org.apache.iceberg.ManifestWriter$V1Writer.newAppender(ManifestWriter.java:281)
at org.apache.iceberg.ManifestWriter.<init>(ManifestWriter.java:58)
at org.apache.iceberg.ManifestWriter.<init>(ManifestWriter.java:34)
at
org.apache.iceberg.ManifestWriter$V1Writer.<init>(ManifestWriter.java:260)
at org.apache.iceberg.ManifestFiles.write(ManifestFiles.java:117)
at
org.apache.iceberg.spark.actions.BaseRewriteManifestsSparkAction.writeManifest(BaseRewriteManifestsSparkAction.java:324)
at
org.apache.iceberg.spark.actions.BaseRewriteManifestsSparkAction.lambda$toManifests$afb7bc39$1(BaseRewriteManifestsSparkAction.java:354)
at
org.apache.spark.sql.Dataset.$anonfun$mapPartitions$1(Dataset.scala:2826)
at
org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
```
I think this may have been introduced in
https://github.com/apache/iceberg/pull/4050 From a quick glance at the code it
seems that the root cause is that `HadoopMetricsContext#statistics` is a
transient variable. If I understood thingas correctly, `HadoopMetricsContext`
is initialized in `S3FileIo`:
```java
@Override
public void initialize(Map<String, String> properties) {
this.awsProperties = new AwsProperties(properties);
// Do not override s3 client if it was provided
if (s3 == null) {
this.s3 = AwsClientFactories.from(properties)::s3;
}
// Report Hadoop metrics if Hadoop is available
try {
DynConstructors.Ctor<MetricsContext> ctor =
DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL,
String.class).buildChecked();
this.metrics = ctor.newInstance("s3");
metrics.initialize(properties);
} catch (NoSuchMethodException | ClassCastException e) {
LOG.warn("Unable to load metrics class: '{}', falling back to null
metrics", DEFAULT_METRICS_IMPL, e);
}
}
```
Since `FileIO` is a broadcast variable in
`org.apache.iceberg.spark.actions.BaseRewriteManifestsSparkAction`
```java
private static ManifestFile writeManifest(
List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
String location, int format, PartitionSpec spec, StructType sparkType)
throws IOException {
...
```
It seems that `HadoopMetricsContext` is serialized along with it and when
its deserialized at the executor, `statistcs` variable will be null because it
was marked as transient. I'm not really sure if that's exactly whats happening
but the only way I could reproduce this outside of spark was calling
`HadoopMetricsContext#counter` with statistics variable set to null and the
most obvious way that this could happens is after a deserialization
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]