[
https://issues.apache.org/jira/browse/SPARK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14498247#comment-14498247
]
Paul Wu commented on SPARK-6936:
--------------------------------
Not sure about HiveContext. I tried to do the following program and I got
exception (env: JDK 1.8/ Spark 1.3). Why did I get the error on HiveContext?
-- exec-maven-plugin:1.2.1:exec (default-cli) @ Spark-Sample ---
Exception in thread "main" java.lang.NoSuchMethodError:
scala.collection.immutable.$colon$colon.hd$1()Ljava/lang/Object;
at org.apache.spark.sql.hive.HiveQl$.nodeToPlan(HiveQl.scala:574)
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:245)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:137)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:237)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:237)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:217)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:249)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:249)
at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:197)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:249)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:249)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:217)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:882)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:882)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:881)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:137)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:237)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:237)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:217)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:249)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:249)
at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:197)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:249)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:249)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:217)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:882)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:882)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:881)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:234)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at com.att.nwk.spark.sample.SQLHive.main(SQLHive.java:64)
--------------------------------------------------
import com.att.nwk.func.DateToSecond;
import com.att.nwk.func.Greatest;
import com.att.nwk.func.NVL;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.hive.HiveContext;
/**
*
* @author zw251y
*/
public class SQLHive {
private final static Logger log = Logger.getLogger(SQLHive.class);
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("SQLSample");
if (PUtil.isWindows) {
sparkConf.setMaster("local[1]");
}
JavaSparkContext sc = new JavaSparkContext(sparkConf);
HiveContext sqlContext = new HiveContext(sc.sc());
// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people =
sc.textFile("src\\main\\resources\\people.txt").map((String line) -> {
String[] parts = line.split(",", -1);
if (parts[0].contains("paul")) {
return null;
}
log.debug("line=" + line);
Person person = new Person();
person.setName(parts[0]);
person.setAge(parts[1].isEmpty() ? null : 0.0 +
Integer.parseInt(parts[1].trim()));
person.setTest(parts[2].isEmpty() ? null :
Integer.parseInt(parts[2].trim()));
person.setTs(new java.sql.Timestamp(new Date().getTime()));
return person;
});
people = people.filter((Person t1) -> t1 != null);
//Apply a schema to an RDD of JavaBeans and register it as a table.
//DataFrame schemaPeople = sqlContext.applySchema(people, Person.class);
DataFrame schemaPeople = sqlContext.createDataFrame(people,
Person.class);
schemaPeople.registerTempTable("people");
sqlContext.udf().register("nvl", new NVL(), DataTypes.DoubleType);
sqlContext.udf().register("greatest", new Greatest<Comparable>(),
DataTypes.DoubleType);
sqlContext.udf().register("ToSecond", new DateToSecond(),
DataTypes.LongType);
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("select name from people t2 ");
// The results of SQL queries are DataFrames and support all the normal
RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map((Row row) ->
"Name: " + row.get(0)).collect();
log.info(Arrays.toString(teenagerNames.toArray()));
String test = "SELECT sum(ToSecond(ts)) from people where ToSecond(ts)
> " + (new Date().getTime() / 1000 - 10);
DataFrame df = sqlContext.sql(test);
List<String> t = df.javaRDD().map((Row row) -> "Second sum:>>>> " +
row.get(0)).collect();
//org.apache.spark.sql.types.TimestampType p = null;
//p.
test = "SELECT ts from people where ts > '2011-01-01' " ;
df = sqlContext.sql(test);
t = df.javaRDD().map((Row row) -> "Second sum:>>>> " +
row.get(0)).collect();
//te.saveAsObjectFile("/temp/te");
log.info(Arrays.toString(t.toArray()));
}
public static class Person implements Serializable {
private String name;
private Double age;
private Integer test;
private Timestamp ts;
/**
* Get the value of ts
*
* @return the value of ts
*/
public Timestamp getTs() {
return ts;
}
/**
* Set the value of ts
*
* @param d new value of ts
*/
public void setTs(Timestamp d) {
this.ts = d;
}
public Integer getTest() {
return test;
}
public void setTest(Integer test) {
this.test = test;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getAge() {
return age;
}
public void setAge(Double age) {
this.age = age;
}
}
}
> SQLContext.sql() caused deadlock in multi-thread env
> ----------------------------------------------------
>
> Key: SPARK-6936
> URL: https://issues.apache.org/jira/browse/SPARK-6936
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.3.0
> Environment: JDK 1.8.x, RedHat
> Linux version 2.6.32-431.23.3.el6.x86_64
> ([email protected]) (gcc version 4.4.7 20120313 (Red
> Hat 4.4.7-4) (GCC) ) #1 SMP Wed Jul 16 06:12:23 EDT 2014
> Reporter: Paul Wu
> Labels: deadlock, sql, threading
>
> Doing (the same query) in more than one threads with SQLConext.sql may lead
> to deadlock. Here is a way to reproduce it (since this is multi-thread issue,
> the reproduction may or may not be so easy).
> 1. Register a relatively big table.
> 2. Create two different classes and in the classes, do the same query in a
> method and put the results in a set and print out the set size.
> 3. Create two threads to use an object from each class in the run method.
> Start the threads. For my tests, it can have a deadlock just in a few runs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]