Daniel Darabos created SPARK-8898:
-------------------------------------

             Summary: Jets3t hangs with more than 1 core
                 Key: SPARK-8898
                 URL: https://issues.apache.org/jira/browse/SPARK-8898
             Project: Spark
          Issue Type: Bug
          Components: Input/Output
    Affects Versions: 1.4.0
         Environment: S3
            Reporter: Daniel Darabos


If I have an RDD that reads from S3 ({{newAPIHadoopFile}}), and try to write 
this to S3 ({{saveAsNewAPIHadoopFile}}), it hangs if I have more than 1 core 
per executor.

It sounds like a race condition, but so far I have seen it trigger 100% of the 
time. From a race for taking a limited number of connections I would expect it 
to succeed at least on 1 task at least some of the time. But I never saw a 
single completed task, except when running with 1-core executors.

All executor threads hang with one of the following two stack traces:

{noformat:title=Stack trace 1}
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000007759cae70> (a 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool)
        at 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518)
        - locked <0x00000007759cae70> (a 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool)
        at 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416)
        at 
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153)
        at 
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
        at 
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
        at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
        at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
        at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
        at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
        at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
        at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
        at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:107)
        at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
        at org.apache.hadoop.fs.s3native.$Proxy8.retrieveMetadata(Unknown 
Source)
        at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1332)
        at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:851)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:832)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:731)
        at 
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:128)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1030)
        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

{noformat:title=Stack trace 2}
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000007759cae70> (a 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool)
        at 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518)
        - locked <0x00000007759cae70> (a 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool)
        at 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java
:416)
        at 
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153)
        at 
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
        at 
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
        at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
        at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestGet(RestS3Service.java:752)
        at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1601)
        at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1544)
        at org.jets3t.service.S3Service.getObject(S3Service.java:2072)
        at org.jets3t.service.S3Service.getObject(S3Service.java:1310)
        at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieve(Jets3tNativeFileSystemStore.java:122)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
        at org.apache.hadoop.fs.s3native.$Proxy8.retrieve(Unknown Source)
        at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.open(NativeS3FileSystem.java:564)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:711)
        at 
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:75)
        at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

This is running the ancient 0.7.1 version of Jets3t that comes with Spark. 
(Theoretically Spark built with newer Hadoop profiles would use Jets3t 0.9.3, 
but I could not get the {{spark-ec2}} script to install one of these builds.) I 
could not find documentation for this version, but based on the current docs 
and the 0.7.1 source code I tried putting this into a {{jets3t.properties}} 
file (which on the classpath of the driver and the executors):

{noformat}
httpclient.max-connections=10000
httpclient.max-connections-per-host=10000
http.connection-manager.max-total=10000
http.connection-manager.max-per-host=10000
{noformat}

It didn't help.

It's very simple to reproduce from the {{spark-shell}}. It's a bit messy 
because I have to create a HadoopConfiguration to pass in the access key and 
the password. But I can add it to the ticket if it would be useful.

I understand that this is probably a Jets3t configuration issue. But I hope 
Spark could use a newer version or provide defaults such that this would work 
better.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to