fcvr1010 opened a new issue #3055:
URL: https://github.com/apache/iceberg/issues/3055


   Hi, I run into a strange behaviour I could not really explain and I'd 
appreciate your feedback.
   
   I'm running 5 concurrent PySpark 3.0 jobs on AWS EMR whose code is 
essentially the following:
   
   ```python
   # `customer` is a variable of the job, each job has a different value.
   
   data = [(customer, "abcde")]
   df = (
       spark.createDataFrame(data, ["customer", "data"])
       .repartition(1)
       .sortWithinPartitions("customer")
       .cache()
   )
   df.count()
   
   trials = []
   for i in range(20):
       start = time.time()
       df.write.saveAsTable(TABLE_NAME, mode="append")
       end = time.time()
       trials.append(end - start)
       time.sleep(1)
   ```
   
   Basically, what I want to do is measure the throughput (in terms of 
commits-per-minute) we can get when writing concurrently to the same table but 
to different partitions. In fact, here the table I write to is 
customer-partitioned.
   
   I have setup my environment in AWS as per the [Iceberg 
docs](https://iceberg.apache.org/aws/), in particular using the Glue Catalog 
and DynamoDB to obtain the lock on the catalog. I'm using version 0.12 of 
Iceberg.
   
   What I noticed is that after some time (minutes, typically), one or more of 
the jobs would hang. I tried to understand at which point by running `sudo -u 
hadoop jstack ApplicationMasterPID`. This is, I think, the relevant portion of 
the output
   
   ```
   "iceberg-worker-pool-71" #382 daemon prio=5 os_prio=0 tid=0x00007f0c55c30000 
nid=0xbf9e waiting on condition [0x00007f0c3ab48000]
   "iceberg-worker-pool-70" #381 daemon prio=5 os_prio=0 tid=0x00007f0c55c2e000 
nid=0xbf9d waiting on condition [0x00007f0c3ac49000]
   [… several more like this]
   
   "Thread-6" #34 daemon prio=5 os_prio=0 tid=0x00007f0c6c028800 nid=0xb08a 
runnable [0x00007f0c584ad000]
      java.lang.Thread.State: RUNNABLE
        at java.io.FilterInputStream.read(FilterInputStream.java:133)
        at 
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3454)
        at 
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3447)
        at 
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3435)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at 
software.amazon.awssdk.services.s3.checksums.ChecksumValidatingInputStream.read(ChecksumValidatingInputStream.java:125)
        at java.io.FilterInputStream.read(FilterInputStream.java:133)
        at 
software.amazon.awssdk.core.io.SdkFilterInputStream.read(SdkFilterInputStream.java:66)
        at org.apache.iceberg.aws.s3.S3InputStream.read(S3InputStream.java:93)
        at 
org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:120)
        at 
org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:61)
        at 
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
        at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
        at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
        at 
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
        at 
org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
        at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
        at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127)
        at org.apache.iceberg.BaseSnapshot.allManifests(BaseSnapshot.java:141)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:307)
        at 
org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:194)
        at 
org.apache.iceberg.spark.source.SparkWrite.access$1200(SparkWrite.java:87)
        at 
org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:251)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:396)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
        at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:253)
        at 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:259)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
        - locked <0x0000000730b3fa68> (a 
org.apache.spark.sql.execution.datasources.v2.AppendDataExec)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:54)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at 
org.apache.spark.sql.execution.SparkPlan$$Lambda$1848/1701024125.apply(Unknown 
Source)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at 
org.apache.spark.sql.execution.SparkPlan$$Lambda$1849/213791705.apply(Unknown 
Source)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:124)
        - locked <0x000000072f18b268> (a 
org.apache.spark.sql.execution.QueryExecution)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:123)
        at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
        at 
org.apache.spark.sql.DataFrameWriter$$Lambda$3022/93072724.apply(Unknown Source)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:132)
        at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1668/344209694.apply(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
        at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1667/2017859747.apply(Unknown
 Source)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:131)
        at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1663/388091508.apply(Unknown
 Source)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
        at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:660)
        at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:596)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   The full output is here: 
[stack.log](https://github.com/apache/iceberg/files/7082344/stack.log)
   
   So, if I understand correctly, all workers thread of Iceberg are waiting for 
jobs to process, and `Thread-6` is actually running and trying to read data 
from S3 in order to perform the perform the commit. I presume such data is the 
Iceberg meta-data. Now, if that is correct, why would such read hang? Should I 
perhaps try a different library? Any guidance is deeply appreciated. Also, 
please let me know if there are better ways to get more information about the 
blocked threads.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to