[ 
https://issues.apache.org/jira/browse/SPARK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14498247#comment-14498247
 ] 

Paul Wu commented on SPARK-6936:
--------------------------------

Not sure about HiveContext. I tried to do the following program and I got 
exception (env: JDK 1.8/ Spark 1.3).  Why did I get the error on HiveContext?


-- exec-maven-plugin:1.2.1:exec (default-cli) @ Spark-Sample ---
Exception in thread "main" java.lang.NoSuchMethodError: 
scala.collection.immutable.$colon$colon.hd$1()Ljava/lang/Object;
        at org.apache.spark.sql.hive.HiveQl$.nodeToPlan(HiveQl.scala:574)
        at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:245)
        at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
        at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
        at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:137)
        at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:237)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:237)
        at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:217)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:249)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:249)
        at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:197)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:249)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:249)
        at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:217)
        at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:882)
        at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:882)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:881)
        at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
        at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
        at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
        at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
        at 
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
        at 
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
        at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:137)
        at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:237)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:237)
        at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:217)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:249)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:249)
        at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:197)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:249)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:249)
        at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:217)
        at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:882)
        at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:882)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:881)
        at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
        at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
        at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:234)
        at 
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
        at 
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
        at com.att.nwk.spark.sample.SQLHive.main(SQLHive.java:64)

--------------------------------------------------

import com.att.nwk.func.DateToSecond;
import com.att.nwk.func.Greatest;
import com.att.nwk.func.NVL;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.hive.HiveContext;

/**
 *
 * @author zw251y
 */
public class SQLHive {

    private final static Logger log = Logger.getLogger(SQLHive.class);

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("SQLSample");
        if (PUtil.isWindows) {
            sparkConf.setMaster("local[1]");
        }

        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        HiveContext sqlContext = new HiveContext(sc.sc());

        // Load a text file and convert each line to a JavaBean.
        JavaRDD<Person> people = 
sc.textFile("src\\main\\resources\\people.txt").map((String line) -> {
            String[] parts = line.split(",", -1);
            if (parts[0].contains("paul")) {
                return null;
            }
            log.debug("line=" + line);
            Person person = new Person();
            person.setName(parts[0]);
            person.setAge(parts[1].isEmpty() ? null : 0.0 + 
Integer.parseInt(parts[1].trim()));
            person.setTest(parts[2].isEmpty() ? null : 
Integer.parseInt(parts[2].trim()));
            person.setTs(new java.sql.Timestamp(new Date().getTime()));

            return person;
        });

        people = people.filter((Person t1) -> t1 != null);

        //Apply a schema to an RDD of JavaBeans and register it as a table.
        //DataFrame schemaPeople = sqlContext.applySchema(people, Person.class);
        DataFrame schemaPeople = sqlContext.createDataFrame(people, 
Person.class);
        schemaPeople.registerTempTable("people");
        sqlContext.udf().register("nvl", new NVL(), DataTypes.DoubleType);
        sqlContext.udf().register("greatest", new Greatest<Comparable>(), 
DataTypes.DoubleType);
        sqlContext.udf().register("ToSecond", new DateToSecond(), 
DataTypes.LongType);
        // SQL can be run over RDDs that have been registered as tables.

        DataFrame teenagers = sqlContext.sql("select  name from people t2 ");

        // The results of SQL queries are DataFrames and support all the normal 
RDD operations.
        // The columns of a row in the result can be accessed by ordinal.
        List<String> teenagerNames = teenagers.javaRDD().map((Row row) -> 
"Name: " + row.get(0)).collect();
        log.info(Arrays.toString(teenagerNames.toArray()));

         
        String test = "SELECT  sum(ToSecond(ts)) from people where ToSecond(ts) 
> " + (new Date().getTime() / 1000 - 10);
        DataFrame df = sqlContext.sql(test);

        List<String> t = df.javaRDD().map((Row row) -> "Second sum:>>>> " + 
row.get(0)).collect();
        
       //org.apache.spark.sql.types.TimestampType p = null;
       //p.
           test = "SELECT  ts  from people where ts > '2011-01-01' " ;
          df = sqlContext.sql(test);

        t = df.javaRDD().map((Row row) -> "Second sum:>>>> " + 
row.get(0)).collect();
        
        //te.saveAsObjectFile("/temp/te");
        log.info(Arrays.toString(t.toArray()));
    }

    public static class Person implements Serializable {

        private String name;
        private Double age;
        private Integer test;
        private Timestamp ts;

        /**
         * Get the value of ts
         *
         * @return the value of ts
         */
        public Timestamp getTs() {
            return ts;
        }

        /**
         * Set the value of ts
         *
         * @param d new value of ts
         */
        public void setTs(Timestamp d) {
            this.ts = d;
        }

        public Integer getTest() {
            return test;
        }

        public void setTest(Integer test) {
            this.test = test;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Double getAge() {
            return age;
        }

        public void setAge(Double age) {
            this.age = age;
        }
    }
}

> SQLContext.sql() caused deadlock in multi-thread env
> ----------------------------------------------------
>
>                 Key: SPARK-6936
>                 URL: https://issues.apache.org/jira/browse/SPARK-6936
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.0
>         Environment: JDK 1.8.x, RedHat
> Linux version 2.6.32-431.23.3.el6.x86_64 
> ([email protected]) (gcc version 4.4.7 20120313 (Red 
> Hat 4.4.7-4) (GCC) ) #1 SMP Wed Jul 16 06:12:23 EDT 2014
>            Reporter: Paul Wu
>              Labels: deadlock, sql, threading
>
> Doing (the same query) in more than one threads with SQLConext.sql may lead 
> to deadlock. Here is a way to reproduce it (since this is multi-thread issue, 
> the reproduction may or may not be so easy).
> 1. Register a relatively big table.
> 2.  Create two different classes and in the classes, do the same query in a 
> method and put the results in a set and print out the set size.
> 3.  Create two threads to use an object from each class in the run method. 
> Start the threads. For my tests,  it can have a deadlock just in a few runs. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to