[ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528038#comment-15528038
 ] 

Apache Spark commented on SPARK-17666:
--------------------------------------

User 'JoshRosen' has created a pull request for this issue:
https://github.com/apache/spark/pull/15271

> take() or isEmpty() on dataset leaks s3a connections
> ----------------------------------------------------
>
>                 Key: SPARK-17666
>                 URL: https://issues.apache.org/jira/browse/SPARK-17666
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>         Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>            Reporter: Igor Berman
>            Priority: Critical
>             Fix For: 2.1.0
>
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>       at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>       at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>       at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>       public static void main(String[] args) throws ParseException {
>               SparkConf sparkConf = new SparkConf();
>               sparkConf.setMaster("local[*]");
>               sparkConf.setAppName("Test");
>               sparkConf.set("spark.local.dir", "/tmp/spark");
>               sparkConf.set("spark.sql.shuffle.partitions", "2");
>               SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>               for (int i = 0; i < 100; i++) {
>                       Dataset<Row> df = session
>                                       .sqlContext()
>                                       .read()
>                                       .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>                       if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>                               System.out.println("Yes");
>                       } else {
>                               System.out.println("No");
>                       }
>               }
>               System.out.println("Done");
>       }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p <pid> | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to