How to use disk instead of just InMemoryRelation when use JDBC datasource in SPARKSQL?

2018-04-11 Thread Louis Hust
We  want to extract data from mysql, and calculate in sparksql.
The sql explain like below.


REGIONKEY#177,N_COMMENT#178] PushedFilters: [], ReadSchema:
struct
  +- *(20) Sort [r_regionkey#203 ASC NULLS FIRST], false, 0
 +- Exchange(coordinator id: 266374831)
hashpartitioning(r_regionkey#203, 200), coordinator[target post-shuffle
partition size: 67108864]
+- *(19) Project [R_REGIONKEY#203]
   +- *(19) Filter ((isnotnull(r_name#204) &&
(r_name#204 = AFRICA)) && isnotnull(r_regionkey#203))
  +- InMemoryTableScan [R_REGIONKEY#203,
r_name#204], [isnotnull(r_name#204), (r_name#204 = AFRICA),
isnotnull(r_regionkey#203)]
+- InMemoryRelation [R_REGIONKEY#203,
R_NAME#204, R_COMMENT#205], true, 1, StorageLevel(disk, memory, 1
replicas)
  +- *(1) Scan JDBCRelation(region)
[numPartitions=1] [R_REGIONKEY#203,R_NAME#204,R_COMMENT#205] PushedFilters:
[], ReadSchema: struct


As you see, all JDBCRelation convert to InMemoryRelation. Cause the JDBC
table is so big, the all data can not be filled into memory,  OOM occurs.
If there is some option to make SparkSQL use Disk if memory not enough?


回复:Spark is only using one worker machine when more are available

2018-04-11 Thread 宋源栋
Hi 1. Spark version : 2.3.0 2. jdk: oracle jdk 1.8 3. os version: centos 6.8 4. 
spark-env.sh: null 5. spark session config:
SparkSession.builder().appName("DBScale")
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.scheduler.mode", "FAIR")
.config("spark.executor.memory", "1g")
.config("spark.executor.cores", 1)
.config("spark.driver.memory", "20")
.config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:+PrintFlagsFinal 
-XX:+PrintReferenceGC " +
"-verbose:gc -XX:+PrintGCDetails " +
"-XX:+PrintGCTimeStamps 
-XX:+PrintAdaptiveSizePolicy")
.master(this.spark_master)
.getOrCreate();  6. core code: for 
(SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql
String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
String[] pred = new String[tableInfo.partition_num];
if (tableInfo.partition_num > 0) {
for (int j = 0; j < tableInfo.partition_num; j++) {
String str = "some where clause to split mysql table into 
many partitions";
pred[j] = str;
}
Dataset jdbcDF = ss.read().jdbc(this.url, dt, pred, 
connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:)
jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
} else {
logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
Dataset jdbcDF = ss.read().jdbc(this.url, dt, connProp);
jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
}
}
// Then run a query and write the result set to mysql

Dataset result = ss.sql(this.sql);
result.explain(true);
connProp.put("rewriteBatchedStatements", "true");
connProp.put("sessionVariables", "sql_log_bin=off");
result.write().jdbc(this.dst_url, this.dst_table, connProp);

--发件人:Jhon 
Anderson Cardenas Diaz 发送时间:2018年4月11日(星期三) 
22:42收件人:宋源栋 抄 送:user 
主 题:Re: Spark is only using one worker machine when more 
are available
Hi, could you please share the environment variables values that you are 
sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if 
you are using spark 2.0.0.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 :


Hi all,
I hava a standalone mode spark cluster without HDFS with 10 machines that each 
one has 40 cpu cores and 128G RAM.
My application is a sparksql application that reads data from database 
"tpch_100g" in mysql and run tpch queries. When loading tables from myql to 
spark, I spilts the biggest table "lineitem" into 600 partitions. 

When my application runs, there are only 40 executor(spark.executor.memory = 
1g, spark.executor.cores = 1) in executor page of spark application web and all 
executors are on the same mathine. It is too slowly that all tasks are 
parallelly running in only one mathine.







Nullpointerexception error when in repartition

2018-04-11 Thread Junfeng Chen
I write a program to read some json data from kafka and purpose to save
them to parquet file on hdfs.
Here is my code:

> JavaInputDstream stream = ...
> JavaDstream rdd = stream.map...
> rdd.repartition(taksNum).foreachRDD(VoldFunction stringjavardd->{
> Dataset df = spark.read().json( stringjavardd ); // convert json
> to df
> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
> StructType type = df.schema()...; // constuct new type for new added
> fields
> Dataset new dataframe
>
> newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath);
> // save to parquet
> })



However, if I remove the repartition method of newdf in writing parquet
stage, the program always throw nullpointerexception error in json convert
line:

Java.lang.NullPointerException
>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
> ...


While it looks make no sense, writing parquet operation should be in
different stage with json transforming operation.
So how to solve it? Thanks!

Regard,
Junfeng Chen


Re: Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-11 Thread surender kumar
right, this is what I did when I said I tried to persist and create an RDD out 
of it to sample from. But how to do for each user?You have one rdd of users on 
one hand and rdd of items on the other. How to go from here? Am I missing 
something trivial?  

On Thursday, 12 April, 2018, 2:10:51 AM IST, Matteo Cossu 
 wrote:  
 
 Why broadcasting this list then? You should use an RDD or DataFrame. For 
example, RDD has a method sample() that returns a random sample from it.
On 11 April 2018 at 22:34, surender kumar  wrote:

I'm using pySpark.I've list of 1 million items (all float values ) and 1 
million users. for each user I want to sample randomly some items from the item 
list.Broadcasting the item list results in Outofmemory error on the driver, 
tried setting driver memory till 10G.  I tried to persist this array on disk 
but I'm not able to figure out a way to read the same on the workers.
Any suggestion would be appreciated.

  

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

2018-04-11 Thread Shiyuan
Here it is :
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2991198123660769/823198936734135/866038034322120/latest.html


On Wed, Apr 11, 2018 at 10:55 AM, Alessandro Solimando <
alessandro.solima...@gmail.com> wrote:

> Hi Shiyuan,
> can you show us the output of ¨explain¨ over df (as a last step)?
>
> On 11 April 2018 at 19:47, Shiyuan  wrote:
>
>> Variable name binding is a python thing, and Spark should not care how
>> the variable is named. What matters is the dependency graph. Spark fails to
>> handle this dependency graph correctly for which I am quite surprised: this
>> is just a simple combination of three very common sql operations.
>>
>>
>> On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Shiyuan,
>>>
>>> I do not know whether I am right, but I would prefer to avoid
>>> expressions in Spark as:
>>>
>>> df = <>
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan  wrote:
>>>
 Here is the pretty print of the physical plan which reveals some
 details about what causes the bug (see the lines highlighted in bold):
 WithColumnRenamed() fails to update the dependency graph correctly:


 'Resolved attribute(s) kk#144L missing from
 ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
 score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
 the operation: kk. Please check if the right attribute(s) are used

 Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
 +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
:- Project [ID#64, score#67, LABEL#65, kk#73L]
:  +- Join Inner, (ID#64 = ID#99)
: :- Project [ID#64, score#67, LABEL#65, kk#73L]
: :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
: : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
: +- Project [ID#99]
:+- Filter (nL#90L > cast(1 as bigint))
:   +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100)
 AS nL#90L]
:  +- Project [ID#99, score#102, LABEL#100, kk#73L]
: +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
 score#102]
:+- LogicalRDD [ID#99, LABEL#100, k#101L,
 score#102]
+- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
   +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
 count#118L]
  +- Project [ID#135, score#138, LABEL#136, kk#128L]
 +- Join Inner, (ID#135 = ID#99)
:- Project [ID#135, score#138, LABEL#136, kk#128L]
:  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
 score#138]*
: +- LogicalRDD [ID#135, LABEL#136, k#137L,
 score#138]
+- Project [ID#99]
   +- Filter (nL#90L > cast(1 as bigint))
  +- Aggregate [ID#99], [ID#99, count(distinct
 LABEL#100) AS nL#90L]
 +- *!Project [ID#99, score#102, LABEL#100,
 kk#128L]*
+-* Project [ID#99, LABEL#100, k#101L AS
 kk#73L, score#102]*
   +- LogicalRDD [ID#99, LABEL#100, k#101L,
 score#102]

 Here is the code which generates the error:

 import pyspark.sql.functions as F
 from pyspark.sql import Row
 df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
 ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename
 d("k","kk").select("ID","score","LABEL","kk")
 df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
 ilter(F.col("nL")>1)
 df = df.join(df_t.select("ID"),["ID"])
 df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
 "cnt1")
 df = df.join(df_sw, ["ID","kk"])


 On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan  wrote:

> The spark warning about Row instead of Dict is not the culprit. The
> problem still persists after I use Row instead of Dict to generate the
> dataframe.
>
> Here is the expain() output regarding the reassignment of df as Gourav
> suggests to run, They look the same except that  the serial numbers
> following the columns are different(eg. ID#7273 vs. ID#7344).
>
> this is the output of df.explain() after df =
> df.join(df_t.select("ID"),["ID"])
> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange 
> hashpartitioning(ID#7273,
> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
> 

Re: Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-11 Thread Matteo Cossu
Why broadcasting this list then? You should use an RDD or DataFrame. For
example, RDD has a method sample() that returns a random sample from it.

On 11 April 2018 at 22:34, surender kumar 
wrote:

> I'm using pySpark.
> I've list of 1 million items (all float values ) and 1 million users. for
> each user I want to sample randomly some items from the item list.
> Broadcasting the item list results in Outofmemory error on the driver,
> tried setting driver memory till 10G.  I tried to persist this array on
> disk but I'm not able to figure out a way to read the same on the workers.
>
> Any suggestion would be appreciated.
>


Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-11 Thread surender kumar
I'm using pySpark.I've list of 1 million items (all float values ) and 1 
million users. for each user I want to sample randomly some items from the item 
list.Broadcasting the item list results in Outofmemory error on the driver, 
tried setting driver memory till 10G.  I tried to persist this array on disk 
but I'm not able to figure out a way to read the same on the workers.
Any suggestion would be appreciated.

Re: Not able to access Pyspark into Jupyter notebook

2018-04-11 Thread Dylan Guedes
Well... could you post the log or any errors that occurs?

I used this pyspark jupyter notebook

and it worked great.

On Wed, Apr 11, 2018 at 12:36 AM, @Nandan@ 
wrote:

> Hi Users,
>
> Currently, I am trying to use Apache Spark 2.2.0 by using a Jupyter
> notebook but not able to achieve it.
>
> I am using Ubuntu 17.10.
> I can able to use pyspark in command line as well as spark-shell . Please
> give some ideas.
>
> Thanks.
> Nandan Priyadarshi
>


Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

2018-04-11 Thread Alessandro Solimando
Hi Shiyuan,
can you show us the output of ¨explain¨ over df (as a last step)?

On 11 April 2018 at 19:47, Shiyuan  wrote:

> Variable name binding is a python thing, and Spark should not care how the
> variable is named. What matters is the dependency graph. Spark fails to
> handle this dependency graph correctly for which I am quite surprised: this
> is just a simple combination of three very common sql operations.
>
>
> On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi Shiyuan,
>>
>> I do not know whether I am right, but I would prefer to avoid expressions
>> in Spark as:
>>
>> df = <>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan  wrote:
>>
>>> Here is the pretty print of the physical plan which reveals some details
>>> about what causes the bug (see the lines highlighted in bold):
>>> WithColumnRenamed() fails to update the dependency graph correctly:
>>>
>>>
>>> 'Resolved attribute(s) kk#144L missing from
>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>> the operation: kk. Please check if the right attribute(s) are used
>>>
>>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>>:- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>:  +- Join Inner, (ID#64 = ID#99)
>>>: :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>: :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>>: : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>>: +- Project [ID#99]
>>>:+- Filter (nL#90L > cast(1 as bigint))
>>>:   +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100)
>>> AS nL#90L]
>>>:  +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>>: +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>>> score#102]
>>>:+- LogicalRDD [ID#99, LABEL#100, k#101L,
>>> score#102]
>>>+- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>>   +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>>> count#118L]
>>>  +- Project [ID#135, score#138, LABEL#136, kk#128L]
>>> +- Join Inner, (ID#135 = ID#99)
>>>:- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>:  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>>> score#138]*
>>>: +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>>>+- Project [ID#99]
>>>   +- Filter (nL#90L > cast(1 as bigint))
>>>  +- Aggregate [ID#99], [ID#99, count(distinct
>>> LABEL#100) AS nL#90L]
>>> +- *!Project [ID#99, score#102, LABEL#100,
>>> kk#128L]*
>>>+-* Project [ID#99, LABEL#100, k#101L AS
>>> kk#73L, score#102]*
>>>   +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>> score#102]
>>>
>>> Here is the code which generates the error:
>>>
>>> import pyspark.sql.functions as F
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename
>>> d("k","kk").select("ID","score","LABEL","kk")
>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>> ilter(F.col("nL")>1)
>>> df = df.join(df_t.select("ID"),["ID"])
>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>> "cnt1")
>>> df = df.join(df_sw, ["ID","kk"])
>>>
>>>
>>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan  wrote:
>>>
 The spark warning about Row instead of Dict is not the culprit. The
 problem still persists after I use Row instead of Dict to generate the
 dataframe.

 Here is the expain() output regarding the reassignment of df as Gourav
 suggests to run, They look the same except that  the serial numbers
 following the columns are different(eg. ID#7273 vs. ID#7344).

 this is the output of df.explain() after df =
 df.join(df_t.select("ID"),["ID"])
 == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
 kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
 [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
 kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
 ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort
 [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5)
 Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
 functions=[finalmerge_count(distinct merge count#7314L) AS
 count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
 *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

2018-04-11 Thread Shiyuan
Variable name binding is a python thing, and Spark should not care how the
variable is named. What matters is the dependency graph. Spark fails to
handle this dependency graph correctly for which I am quite surprised: this
is just a simple combination of three very common sql operations.


On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta 
wrote:

> Hi Shiyuan,
>
> I do not know whether I am right, but I would prefer to avoid expressions
> in Spark as:
>
> df = <>
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan  wrote:
>
>> Here is the pretty print of the physical plan which reveals some details
>> about what causes the bug (see the lines highlighted in bold):
>> WithColumnRenamed() fails to update the dependency graph correctly:
>>
>>
>> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
>> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
>> with the same name appear in the operation: kk. Please check if the right
>> attribute(s) are used
>>
>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>:- Project [ID#64, score#67, LABEL#65, kk#73L]
>>:  +- Join Inner, (ID#64 = ID#99)
>>: :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>: :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>: : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>: +- Project [ID#99]
>>:+- Filter (nL#90L > cast(1 as bigint))
>>:   +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
>> nL#90L]
>>:  +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>: +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>> score#102]
>>:+- LogicalRDD [ID#99, LABEL#100, k#101L,
>> score#102]
>>+- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>   +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>> count#118L]
>>  +- Project [ID#135, score#138, LABEL#136, kk#128L]
>> +- Join Inner, (ID#135 = ID#99)
>>:- Project [ID#135, score#138, LABEL#136, kk#128L]
>>:  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>> score#138]*
>>: +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>>+- Project [ID#99]
>>   +- Filter (nL#90L > cast(1 as bigint))
>>  +- Aggregate [ID#99], [ID#99, count(distinct
>> LABEL#100) AS nL#90L]
>> +- *!Project [ID#99, score#102, LABEL#100,
>> kk#128L]*
>>+-* Project [ID#99, LABEL#100, k#101L AS
>> kk#73L, score#102]*
>>   +- LogicalRDD [ID#99, LABEL#100, k#101L,
>> score#102]
>>
>> Here is the code which generates the error:
>>
>> import pyspark.sql.functions as F
>> from pyspark.sql import Row
>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=
>> 2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenam
>> ed("k","kk").select("ID","score","LABEL","kk")
>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).
>> filter(F.col("nL")>1)
>> df = df.join(df_t.select("ID"),["ID"])
>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>> "cnt1")
>> df = df.join(df_sw, ["ID","kk"])
>>
>>
>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan  wrote:
>>
>>> The spark warning about Row instead of Dict is not the culprit. The
>>> problem still persists after I use Row instead of Dict to generate the
>>> dataframe.
>>>
>>> Here is the expain() output regarding the reassignment of df as Gourav
>>> suggests to run, They look the same except that  the serial numbers
>>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>>
>>> this is the output of df.explain() after df =
>>> df.join(df_t.select("ID"),["ID"])
>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort
>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5)
>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>>> functions=[finalmerge_count(distinct merge count#7314L) AS
>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>>> functions=[]) +- *(3) Project [ID#7303, 

Re: cache OS memory and spark usage of it

2018-04-11 Thread yncxcw
hi, Raúl 

(1)&(2) yes, the OS needs some pressure to release it. For example, if you
have a total 16GB ram in your machine, then you read a file of 8GB and
immediately close it. Noe the page cache would cache 8GB the file data. Then
you start a program requesting memory from OS, the OS will release the page
cache as long as your request goes beyond 8GB.

(3) I think you can configure your JVM with a maximum heap size of 14GB
(-xmx) and leave 2GB memory for OS.  you will have memory elasticity with
this configuration. The JVM will increase memory allocation from OS as long
as new objects are created, but it is bounded by 14GB which will not cause
memory swapping. For example, if your application only needs 8GB memory,
then the rest 8GB can be used for page cache, improving you IO performance.
Otherwise, if your application needs 14GB memory, then the JVM will force OS
to release almost all page cache. In this situation, your IO performance may
not be good, but you can hold more data (e.g, RDD) in your application.


Wei



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark is only using one worker machine when more are available

2018-04-11 Thread Jhon Anderson Cardenas Diaz
Hi, could you please share the environment variables values that you are
sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES
if you are using spark 2.0.0
.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 :

>
>
> Hi all,
>
> I hava a standalone mode spark cluster without HDFS with 10 machines that
> each one has 40 cpu cores and 128G RAM.
>
> My application is a sparksql application that reads data from database
> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
> spark, I spilts the biggest table "lineitem" into 600 partitions.
>
> When my application runs, there are only 40 executor(spark.executor.memory
> = 1g, spark.executor.cores = 1) in executor page of spark application web
> and all executors are on the same mathine. It is too slowly that all tasks
> are parallelly running in only one mathine.
>
>
>
>


Structured Streaming output a lot pieces of files with Append Mode

2018-04-11 Thread feng wang
Hi,
I have seen the doc in Spark 2.2 about Structured Steaming

> Append mode (default) - This is the default mode, where only the new rows
> added to the Result Table since the last trigger will be outputted to the
> sink. This is supported for only those queries where rows added to the
> Result Table is never going to change. Hence, this mode guarantees that
> each row will be output only once (assuming fault-tolerant sink). For
> example, queries with only select, where, map, flatMap, filter, join,
> etc. will support Append mode.

So I tried to output Streaming DataFrame to HDFS with sample code but get
many smaller files in target output path,

>
> val df =  spark.readStream .option("sep", ",") .option("header", true)
> .option("quote","\"").csv(inputpath)
> val flow: DataFrame => DataFrame = df.select("name")  // I also try to
> use df.withColumn()
> val Data: DataFrame = flow(df)
> val query: StreamingQuery = Data.writeStream
>   .format("csv")
>   .option("header", "true")
>   .option("format", "append")
>   .option("path", output)
>   .option("checkpointLocation", "/tmp/checkout")
>   .outputMode(OutputMode.Append())
>   .start()query.processAllAvailable()

I founded that there was 4 executors in Mesos web in the job duration

  My question is generic:

1.  Is it a bug with Append mode,I means why not write all records to one
file with append mode?

2.  Is there any way to write all records to one file except using `hadoop
getmerge` or `Data.coalesce(1).writeStream.xx`  not so well as  repartition
to 1 partition to generate 1 output file


Spark is only using one worker machine when more are available

2018-04-11 Thread 宋源栋


Hi all,
I hava a standalone mode spark cluster without HDFS with 10 machines that each 
one has 40 cpu cores and 128G RAM.
My application is a sparksql application that reads data from database 
"tpch_100g" in mysql and run tpch queries. When loading tables from myql to 
spark, I spilts the biggest table "lineitem" into 600 partitions. 

When my application runs, there are only 40 executor(spark.executor.memory = 
1g, spark.executor.cores = 1) in executor page of spark application web and all 
executors are on the same mathine. It is too slowly that all tasks are 
parallelly running in only one mathine.





Hot to filter the datatime in dataset with java code please?

2018-04-11 Thread 1427357...@qq.com
HI  all,

I want to filter the data by the datatime.
In mysql, the colume is the DATETIME type, named A. 
I write my code like:
import java.util.Date;
newX.filter(newX.col("A").isNull().or(newX.col("A").lt(new Date(.show();

I got error :
Exception in thread "main" java.lang.RuntimeException: Unsupported literal type 
class java.util.Date Wed Apr 11 16:17:31 CST 2018
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:77)
at 
org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:163)
at 
org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:163)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:162)
at org.apache.spark.sql.functions$.typedLit(functions.scala:113)
at org.apache.spark.sql.functions$.lit(functions.scala:96)
at org.apache.spark.sql.Column.$less(Column.scala:384)
at org.apache.spark.sql.Column.lt(Column.scala:399)
at Main.main(Main.java:38)

How should I do to filter the datetime in dataset filter please?






1427357...@qq.com


Re: Re: how to use the sql join in java please

2018-04-11 Thread 1427357...@qq.com
Hi  yucai,

It works well now.
Thanks.



1427357...@qq.com
 
From: Yu, Yucai
Date: 2018-04-11 16:01
To: 1427357...@qq.com; spark?users
Subject: Re: how to use the sql join in java please
Do you really want to do a cartesian product on those two tables?
If yes, you can set spark.sql.crossJoin.enabled=true.
 
Thanks,
Yucai
 
From: "1427357...@qq.com" <1427357...@qq.com>
Date: Wednesday, April 11, 2018 at 3:16 PM
To: spark?users 
Subject: how to use the sql join in java please
 
Hi  all,
 
I write java code to join two table.
My code looks like:
 
SparkSession ss = 
SparkSession.builder().master("local[4]").appName("testSql").getOrCreate();

Properties properties = new Properties();
properties.put("user","A");
properties.put("password","B");
String url = 
"jdbc:mysql://xxx:/xxx?useUnicode=true=gbk=convertToNull=UTC";
Dataset data_busi_hour = ss.read().jdbc(url, "A", properties);
data_busi_hour.show();
//newemployee.printSchema();

Dataset t_pro_ware_partner_rela = ss.read().jdbc(url, "B", 
properties);

Dataset newX  = t_pro_ware_partner_rela.join(data_busi_hour);
newX.show();
 
I get a error  like below:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Detected 
cartesian product for INNER join between logical plans
Relation[ XXX   FIRST_ORG_ARCHNAME#80,... 11 more fields] 
JDBCRelation(t_pro_ware_partner_rela) [numPartitions=1]
and
Relation[id#0L,project_code#1,project_name#2] JDBCRelation(data_busi_hour) 
[numPartitions=1]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1124)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1121)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$.apply(Optimizer.scala:1121)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$.apply(Optimizer.scala:1103)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at 

Re: How to submit some code segment to existing SparkContext

2018-04-11 Thread Saisai Shao
Maybe you can try Livy (http://livy.incubator.apache.org/).

Thanks
Jerry

2018-04-11 15:46 GMT+08:00 杜斌 :

> Hi,
>
> Is there any way to submit some code segment to the existing SparkContext?
> Just like a web backend, send some user code to the Spark to run, but the
> initial SparkContext takes time, just want to execute some code or Spark
> Sql, and get the result quickly.
>
> Thanks,
> Bin
>


Re: how to use the sql join in java please

2018-04-11 Thread Yu, Yucai
Do you really want to do a cartesian product on those two tables?
If yes, you can set spark.sql.crossJoin.enabled=true.

Thanks,
Yucai

From: "1427357...@qq.com" <1427357...@qq.com>
Date: Wednesday, April 11, 2018 at 3:16 PM
To: spark?users 
Subject: how to use the sql join in java please

Hi  all,

I write java code to join two table.
My code looks like:


SparkSession ss = 
SparkSession.builder().master("local[4]").appName("testSql").getOrCreate();

Properties properties = new Properties();
properties.put("user","A");
properties.put("password","B");
String url = 
"jdbc:mysql://xxx:/xxx?useUnicode=true=gbk=convertToNull=UTC";
Dataset data_busi_hour = ss.read().jdbc(url, "A", properties);
data_busi_hour.show();
//newemployee.printSchema();

Dataset t_pro_ware_partner_rela = ss.read().jdbc(url, "B", 
properties);

Dataset newX  = t_pro_ware_partner_rela.join(data_busi_hour);
newX.show();

I get a error  like below:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Detected 
cartesian product for INNER join between logical plans
Relation[ XXX   FIRST_ORG_ARCHNAME#80,... 11 more fields] 
JDBCRelation(t_pro_ware_partner_rela) [numPartitions=1]
and
Relation[id#0L,project_code#1,project_name#2] JDBCRelation(data_busi_hour) 
[numPartitions=1]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1124)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1121)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$.apply(Optimizer.scala:1121)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$.apply(Optimizer.scala:1103)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at 

How to submit some code segment to existing SparkContext

2018-04-11 Thread 杜斌
Hi,

Is there any way to submit some code segment to the existing SparkContext?
Just like a web backend, send some user code to the Spark to run, but the
initial SparkContext takes time, just want to execute some code or Spark
Sql, and get the result quickly.

Thanks,
Bin


Re: Issue with map function in Spark 2.2.0

2018-04-11 Thread ayan guha
As the error says clearly, column FL Date has a different format that you
are expecting. Modify you date format mask appropriately

On Wed, 11 Apr 2018 at 5:12 pm, @Nandan@ 
wrote:

> Hi ,
> I am not able to use .map function in Spark.
>
> My codes are as below :-
>
> *1) Create Parse function:-*
>
> from datetime import datetime
> from collections import namedtuple
> fields =
> ('date','airline','flightnum','origin','dest','dep','dep_delay','arv','arv_delay','airtime','distance')
> Flight = namedtuple('Flight',fields,verbose=True)
> DATE_FMT = "%y-%m-%d"
> TIME_FMT = "%H%M"
> def parse(row) :
> row[0] = datetime.strptime(row[0], DATE_FMT).date()
> row[5] = datetime.strptime(row[5], TIME_FMT).time()
> row[6] = float(row[6])
> row[7] = datetime.strptime(row[7], TIME_FMT).time()
> row[8] = float(row[8])
> row[9] = float(row[9])
> row[10] = float(row[10])
> return Flight(*row[:11])
>
> *2) Using Parse to parse my RDD*
>
> flightsParsedMap = flights.map(lambda x: x.split(',')).map(parse)
>
> *3) Checking Parsed RDD *
> flightsParsedMap
> *Output is :-  *
>
> *PythonRDD[8] at RDD at PythonRDD.scala:48*
> *4) Checking first row :-*
>
> flightsParsedMap.first()
> Here i am getting issue:-
>
>
>
> ---Py4JJavaError
>  Traceback (most recent call 
> last) in ()> 1 
> flightsParsedMap.first()
> C:\spark\spark\python\pyspark\rdd.py in first(self)   1374 
> ValueError: RDD is empty   1375 """-> 1376 rs = self.take(1)  
>  1377 if rs:   1378 return rs[0]
> C:\spark\spark\python\pyspark\rdd.py in take(self, num)   13561357
>  p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))-> 
> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)   1359   
>  1360 items += res
> C:\spark\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, 
> partitions, allowLocal)999 # SparkContext#runJob.   1000 
> mappedRDD = rdd.mapPartitions(partitionFunc)-> 1001 port = 
> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)   
> 1002 return list(_load_from_socket(port, 
> mappedRDD._jrdd_deserializer))   1003
> C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in 
> __call__(self, *args)   1158 answer = 
> self.gateway_client.send_command(command)   1159 return_value = 
> get_return_value(-> 1160 answer, self.gateway_client, 
> self.target_id, self.name)   11611162 for temp_arg in temp_args:
> C:\spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def 
> deco(*a, **kw): 62 try:---> 63 return f(*a, **kw) 
> 64 except py4j.protocol.Py4JJavaError as e: 65 s = 
> e.java_exception.toString()
> C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)318  
>raise Py4JJavaError(319 "An error occurred while 
> calling {0}{1}{2}.\n".--> 320 format(target_id, ".", 
> name), value)321 else:322 raise Py4JError(
> Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 
> (TID 9, localhost, executor driver): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, 
> in main
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, 
> in process
>   File "C:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 
> 372, in dump_stream
> vs = list(itertools.islice(iterator, batch))
>   File "C:\spark\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
> yield next(iterator)
>   File "", line 8, in parse
>   File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 565, in 
> _strptime_datetime
> tt, fraction = _strptime(data_string, format)
>   File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 362, in _strptime
> (data_string, format))
> ValueError: time data '"FL_DATE"' does not match format '%y-%m-%d'
>
>   at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
>   at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
>   at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
>   at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
>   at 
> 

how to use the sql join in java please

2018-04-11 Thread 1427357...@qq.com
Hi  all,

I write java code to join two table.
My code looks like:

SparkSession ss = 
SparkSession.builder().master("local[4]").appName("testSql").getOrCreate();

Properties properties = new Properties();
properties.put("user","A");
properties.put("password","B");
String url = 
"jdbc:mysql://xxx:/xxx?useUnicode=true=gbk=convertToNull=UTC";
Dataset data_busi_hour = ss.read().jdbc(url, "A", properties);
data_busi_hour.show();
//newemployee.printSchema();

Dataset t_pro_ware_partner_rela = ss.read().jdbc(url, "B", 
properties);

Dataset newX  = t_pro_ware_partner_rela.join(data_busi_hour);
newX.show();

I get a error  like below:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Detected 
cartesian product for INNER join between logical plans
Relation[ XXX   FIRST_ORG_ARCHNAME#80,... 11 more fields] 
JDBCRelation(t_pro_ware_partner_rela) [numPartitions=1]
and
Relation[id#0L,project_code#1,project_name#2] JDBCRelation(data_busi_hour) 
[numPartitions=1]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1124)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1121)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$.apply(Optimizer.scala:1121)
at 
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$.apply(Optimizer.scala:1103)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at 

Issue with map function in Spark 2.2.0

2018-04-11 Thread @Nandan@
Hi ,
I am not able to use .map function in Spark.

My codes are as below :-

*1) Create Parse function:-*

from datetime import datetime
from collections import namedtuple
fields =
('date','airline','flightnum','origin','dest','dep','dep_delay','arv','arv_delay','airtime','distance')
Flight = namedtuple('Flight',fields,verbose=True)
DATE_FMT = "%y-%m-%d"
TIME_FMT = "%H%M"
def parse(row) :
row[0] = datetime.strptime(row[0], DATE_FMT).date()
row[5] = datetime.strptime(row[5], TIME_FMT).time()
row[6] = float(row[6])
row[7] = datetime.strptime(row[7], TIME_FMT).time()
row[8] = float(row[8])
row[9] = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])

*2) Using Parse to parse my RDD*

flightsParsedMap = flights.map(lambda x: x.split(',')).map(parse)

*3) Checking Parsed RDD *
flightsParsedMap
*Output is :-  *

*PythonRDD[8] at RDD at PythonRDD.scala:48*
*4) Checking first row :-*

flightsParsedMap.first()
Here i am getting issue:-



---Py4JJavaError
Traceback (most recent call
last) in ()> 1
flightsParsedMap.first()
C:\spark\spark\python\pyspark\rdd.py in first(self)   1374
ValueError: RDD is empty   1375 """-> 1376 rs =
self.take(1)   1377 if rs:   1378 return rs[0]
C:\spark\spark\python\pyspark\rdd.py in take(self, num)   13561357
p = range(partsScanned, min(partsScanned + numPartsToTry,
totalParts))-> 1358 res = self.context.runJob(self,
takeUpToNumLeft, p)   13591360 items += res
C:\spark\spark\python\pyspark\context.py in runJob(self, rdd,
partitionFunc, partitions, allowLocal)999 #
SparkContext#runJob.   1000 mappedRDD =
rdd.mapPartitions(partitionFunc)-> 1001 port =
self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
partitions)   1002 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))   1003
C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in
__call__(self, *args)   1158 answer =
self.gateway_client.send_command(command)   1159 return_value
= get_return_value(-> 1160 answer, self.gateway_client,
self.target_id, self.name)   11611162 for temp_arg in
temp_args:
C:\spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw) 61
 def deco(*a, **kw): 62 try:---> 63 return
f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()
C:\spark\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in
get_return_value(answer, gateway_client, target_id, name)318
  raise Py4JJavaError(319 "An error
occurred while calling {0}{1}{2}.\n".--> 320
format(target_id, ".", name), value)321 else:322
  raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0
in stage 8.0 (TID 9, localhost, executor driver):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
  File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line
229, in main
  File "C:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line
224, in process
  File "C:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py",
line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File "C:\spark\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
yield next(iterator)
  File "", line 8, in parse
  File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 565, in
_strptime_datetime
tt, fraction = _strptime(data_string, format)
  File "C:\ProgramData\Anaconda3\lib\_strptime.py", line 362, in _strptime
(data_string, format))
ValueError: time data '"FL_DATE"' does not match format '%y-%m-%d'

at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at 

Re: cache OS memory and spark usage of it

2018-04-11 Thread Jose Raul Perez Rodriguez

it was helpful,

Then, the OS needs to fill some pressure from the applications 
requesting memory to free some memory cache?


Exactly under which circumstances the OS free that memory to give it to 
applications requesting it?


I mean if the total memory is 16GB and 10GB are used for OS cache, how 
the JVM can obtain memory from that.


Thanks,


On 11/04/18 01:36, yncxcw wrote:

hi, Raúl

First, the most of the OS memory cache is used by  Page Cache
   which OS use for caching the
recent read/write I/O.

I think the understanding of OS memory cache should be discussed in two
different perspectives. From a perspective of
user-space (e.g, Spark application), it is not used, since the Spark is not
allocating memory from this part of memory.
However, from a perspective of OS, it is actually used, because the memory
pages are already allocated for caching the
I/O pages. For each I/O request, the OS always allocate memory pages to
cache it to expect these cached I/O pages can be reused in near future.
Recall, you use vim/emacs to open a large file. It is pretty slow when you
open it at the first time. But it will be much faster when you close it and
open it immediately because the file has been cached in file cache at the
first time you open it.

It is hard for Spark to use this part of memory. Because this part of the
memory is managed by OS and is transparent to applications.  The only thing
you can do is that you can continuously allocate memory from OS (by
malloc()), to some certain points which the OS senses some memory pressure,
the OS will voluntarily release the page cache to satisfy your memory
allocation. Another thing is that the memory limit of Spark is limited by
maximum JVM heap size. So your memory request from your Spark application is
actually handled by JVM not the OS.


Hope this answer can help you!


Wei




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How do I implement forEachWriter in structured streaming so that the connection is created once per partition?

2018-04-11 Thread SRK
Hi,

How do I implement forEachWriter in structured streaming so that the connect
is created once per partition and updates are done in a batch just like
forEachPartition in RDDs?

Thanks for the help!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Does structured streaming support Spark Kafka Direct?

2018-04-11 Thread SRK
hi,

We have code based on Spark Kafka Direct in production and we want to port
this code to Structured Streaming. Does structured streaming support spark
kafka direct? What are the configs for parallelism and scalability in
structured streaming? In Spark Kafka Direct, the number of kafka partitions
take care of parallelism when doing the consumption. Is it the same case
with Structured Streaming?

Thanks for the help in Advance!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org