fcvr1010 opened a new issue #3055:
URL: https://github.com/apache/iceberg/issues/3055
Hi, I run into a strange behaviour I could not really explain and I'd
appreciate your feedback.
I'm running 5 concurrent PySpark 3.0 jobs on AWS EMR whose code is
essentially the following:
```python
# `customer` is a variable of the job, each job has a different value.
data = [(customer, "abcde")]
df = (
spark.createDataFrame(data, ["customer", "data"])
.repartition(1)
.sortWithinPartitions("customer")
.cache()
)
df.count()
trials = []
for i in range(20):
start = time.time()
df.write.saveAsTable(TABLE_NAME, mode="append")
end = time.time()
trials.append(end - start)
time.sleep(1)
```
Basically, what I want to do is measure the throughput (in terms of
commits-per-minute) we can get when writing concurrently to the same table but
to different partitions. In fact, here the table I write to is
customer-partitioned.
I have setup my environment in AWS as per the [Iceberg
docs](https://iceberg.apache.org/aws/), in particular using the Glue Catalog
and DynamoDB to obtain the lock on the catalog. I'm using version 0.12 of
Iceberg.
What I noticed is that after some time (minutes, typically), one or more of
the jobs would hang. I tried to understand at which point by running `sudo -u
hadoop jstack ApplicationMasterPID`. This is, I think, the relevant portion of
the output
```
"iceberg-worker-pool-71" #382 daemon prio=5 os_prio=0 tid=0x00007f0c55c30000
nid=0xbf9e waiting on condition [0x00007f0c3ab48000]
"iceberg-worker-pool-70" #381 daemon prio=5 os_prio=0 tid=0x00007f0c55c2e000
nid=0xbf9d waiting on condition [0x00007f0c3ac49000]
[… several more like this]
"Thread-6" #34 daemon prio=5 os_prio=0 tid=0x00007f0c6c028800 nid=0xb08a
runnable [0x00007f0c584ad000]
java.lang.Thread.State: RUNNABLE
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3454)
at
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3447)
at
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3435)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at
software.amazon.awssdk.services.s3.checksums.ChecksumValidatingInputStream.read(ChecksumValidatingInputStream.java:125)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at
software.amazon.awssdk.core.io.SdkFilterInputStream.read(SdkFilterInputStream.java:66)
at org.apache.iceberg.aws.s3.S3InputStream.read(S3InputStream.java:93)
at
org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:120)
at
org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:61)
at
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
at
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
at
org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127)
at org.apache.iceberg.BaseSnapshot.allManifests(BaseSnapshot.java:141)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:307)
at
org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:194)
at
org.apache.iceberg.spark.source.SparkWrite.access$1200(SparkWrite.java:87)
at
org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:251)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:396)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
at
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:253)
at
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:259)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
- locked <0x0000000730b3fa68> (a
org.apache.spark.sql.execution.datasources.v2.AppendDataExec)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:54)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at
org.apache.spark.sql.execution.SparkPlan$$Lambda$1848/1701024125.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at
org.apache.spark.sql.execution.SparkPlan$$Lambda$1849/213791705.apply(Unknown
Source)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:124)
- locked <0x000000072f18b268> (a
org.apache.spark.sql.execution.QueryExecution)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:123)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at
org.apache.spark.sql.DataFrameWriter$$Lambda$3022/93072724.apply(Unknown Source)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:132)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1668/344209694.apply(Unknown
Source)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1667/2017859747.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:131)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1663/388091508.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:660)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:596)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```
The full output is here:
[stack.log](https://github.com/apache/iceberg/files/7082344/stack.log)
So, if I understand correctly, all workers thread of Iceberg are waiting for
jobs to process, and `Thread-6` is actually running and trying to read data
from S3 in order to perform the perform the commit. I presume such data is the
Iceberg meta-data. Now, if that is correct, why would such read hang? Should I
perhaps try a different library? Any guidance is deeply appreciated. Also,
please let me know if there are better ways to get more information about the
blocked threads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]