[Spark-sql]: DF parquet read write multiple tasks

2018-04-02 Thread snjv
Spark : 2.2
Number of cores : 128 ( all allocated to spark)
Filesystem : Alluxio 1.6
Block size on alluxio: 32MB

Input1 size : 586MB ( 150m records with only 1 column as int)
Input2 size : 50MB ( 10m records with only 1 column as int)
 
Input1 is spread across 20 parquet files. each file size is 29MB ( 1 alluxio
block for each file) 
Input2 is also spread across 20 parquet files. Each file size is 2.2MB ( 1
alluxio block for each file)

Operation : Read parquet as DF

For Input1 : Number of tasks created is 120
For Input2 : number of tasks created is 20

How the number of tasks calculated for both?

secondly, If i look at  task Details UI I am seeing some tasks "Input size"
as some xxx bytes while for some its in MB 
Further investigation shows me exactly 20 tasks Input size  is around 29MB
and rest 100 threads is some bytes.

We are using parquet-cpp to generate parquet files and then reading those
files in spark.

We want to know how the tasks are generated around 120 ( it should be 20 )?
Its blocking  our core utilization 

Thanks

Regards
Sanjeev 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark sql]: Re-execution of same operation takes less time than 1st

2018-04-02 Thread snjv
Hi,

When we execute the same operation twice, spark takes less time ( ~40%) than
the first.
Our operation is like this: 
Read 150M rows ( spread in multiple parquet files) into DF
Read 10M rows ( spread in multiple parquet files) into other DF.
Do an intersect operation.

Size of 150M row file: 587MB
size of 10M file: 50M

If first execution takes around 20 sec the next one will take just 10-12
sec.
Any specific reason for this? Is any optimization is there that we can
utilize during the first operation?

Regards
Sanjeev



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to delete empty columns in df when writing to parquet?

2018-04-02 Thread Junfeng Chen
I am trying to read data from kafka and writing them in parquet format via
Spark Streaming.
The problem is, the data from kafka are in variable data structure. For
example, app one has columns A,B,C, app two has columns B,C,D. So the data
frame I read from kafka has all columns ABCD. When I decide to write the
dataframe to parquet file partitioned with app name,
the parquet file of app one also contains columns D, where the columns D is
empty and it contains no data actually. So how to filter the empty columns
when I writing dataframe to parquet?

Thanks!


Regard,
Junfeng Chen


unsubscribe

2018-04-02 Thread 学生张洪斌






发自网易邮箱大师

Uncaught exception in thread heartbeat-receiver-event-loop-thread

2018-04-02 Thread Shiyuan
Hi,
I got an error of Uncaught exception in
thread heartbeat-receiver-event-loop-thread.  Does this error indicate that
some node is  too overloaded to be responsive?  Thanks!

ERROR Utils: Uncaught exception in thread
heartbeat-receiver-event-loop-thread
java.lang.NullPointerException
at
org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:464)
at
org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:439)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6$$anonfun$7.apply(TaskSchedulerImpl.scala:408)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6$$anonfun$7.apply(TaskSchedulerImpl.scala:408)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6.apply(TaskSchedulerImpl.scala:408)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6.apply(TaskSchedulerImpl.scala:407)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at
scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at
org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:407)
at
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
at
org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1295)
at
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Re: is there a way of register python UDF using java API?

2018-04-02 Thread Bryan Cutler
Hi Kant,

The udfDeterministic would be set to false if the results from your UDF are
non-deterministic, such as produced by random numbers, so the catalyst
optimizer will not cache and reuse results.

On Mon, Apr 2, 2018 at 12:11 PM, kant kodali  wrote:

> Looks like there is spark.udf().registerPython() like below.
>
> public void registerPython(java.lang.String name, org.apache.spark.sql.
> execution.python.UserDefinedPythonFunction udf)
>
>
> can anyone describe what *udfDeterministic *parameter does in the method
> signature below?
>
> public UserDefinedPythonFunction(java.lang.String name, 
> org.apache.spark.api.python.PythonFunction func, 
> org.apache.spark.sql.types.DataType dataType, int pythonEvalType, boolean 
> udfDeterministic) { /* compiled code */ }
>
>
> On Sun, Apr 1, 2018 at 3:46 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> All of our spark code is in Java wondering if there a way to register
>> python UDF's using java API such that the registered UDF's can be used
>> using raw spark SQL.
>> If there is any other way to achieve this goal please suggest!
>>
>> Thanks
>>
>>
>


Re: is there a way of register python UDF using java API?

2018-04-02 Thread kant kodali
Looks like there is spark.udf().registerPython() like below.

public void registerPython(java.lang.String name,
org.apache.spark.sql.execution.python.UserDefinedPythonFunction udf)


can anyone describe what *udfDeterministic *parameter does in the method
signature below?

public UserDefinedPythonFunction(java.lang.String name,
org.apache.spark.api.python.PythonFunction func,
org.apache.spark.sql.types.DataType dataType, int pythonEvalType,
boolean udfDeterministic) { /* compiled code */ }


On Sun, Apr 1, 2018 at 3:46 PM, kant kodali  wrote:

> Hi All,
>
> All of our spark code is in Java wondering if there a way to register
> python UDF's using java API such that the registered UDF's can be used
> using raw spark SQL.
> If there is any other way to achieve this goal please suggest!
>
> Thanks
>
>


unsubscribe

2018-04-02 Thread purna pradeep
unsubscribe


unsubscribe

2018-04-02 Thread Romero, Saul
unsubscribe


Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-02 Thread Aakash Basu
Hi all,

The following is the updated code, where I'm getting the avg in a DF, but
the collect() function, to store the value as a variable and pass it to the
final select query is not working. So, avg is currently a dataframe and not
a variable with value stored in it.

New code -

from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")
avg = spark.sql("select AVG(Col1) as Avg from
transformed_Stream_DF")  # .collect()[0][0]

aggregate_func = spark.sql(
"select Col1, Col2, Col2/{0} as Col3 from
transformed_Stream_DF".format(avg))  # (Col2/(AVG(Col1)) as Col3)")

# ---For Console Print---
query1 = avg \
.writeStream \
.format("console") \
.outputMode("complete") \
.start()

query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# ---Console Print ends---

query1.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py


If I uncomment the collect from the above code and use it, I get the
following error -

*pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must
be executed with writeStream.start();;\nkafka'*

Any alternative (better) solution to get this job done, would suffice too.

Any help shall be greatly acknowledged.

Thanks,
Aakash.

On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu 
wrote:

> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1Col2
> 1  10
> 2  11
> 3  12
> 4  13
> 5  14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming.*
>
> *Avg(Col1) = Running Avg*
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
> aggregate_func = spark.sql(
> "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) 
> as myAvg from transformed_Stream_DF t")  #  (Col2/(AVG(Col1)) as Col3)")
>
> # ---For Console Print---
>

Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-02 Thread Aakash Basu
Any help, guys?

On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu 
wrote:

> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1Col2
> 1  10
> 2  11
> 3  12
> 4  13
> 5  14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming.*
>
> *Avg(Col1) = Running Avg*
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
> aggregate_func = spark.sql(
> "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) 
> as myAvg from transformed_Stream_DF t")  #  (Col2/(AVG(Col1)) as Col3)")
>
> # ---For Console Print---
>
> query = aggregate_func \
> .writeStream \
> .format("console") \
> .start()
> # .outputMode("complete") \
> # ---Console Print ends---
>
> query.awaitTermination()
> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>
>
>
>
> Thanks,
> Aakash.
>


Merge query using spark sql

2018-04-02 Thread Deepak Sharma
I am using spark to run merge query in postgres sql.
The way its being done now is save the data to be merged in postgres as
temp tables.
Now run the  merge queries in postgres using java sql connection and
statment .
So basically this query runs in postgres.
The queries are insert into source table if it doesn't exists in source but
exists in temp table , else update.
Problem is both the tables got 400K records and thus this whole query takes
20 hours to run.
Is there any way to do it in spark itself and not run the query in PG , so
this can complete in reasonable time.

-- 
Thanks
Deepak


[Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-02 Thread Aakash Basu
Hi,

This is a very interesting requirement, where I am getting stuck at a few
places.

*Requirement* -

Col1Col2
1  10
2  11
3  12
4  13
5  14



*I have to calculate avg of col1 and then divide each row of col2 by that
avg. And, the Avg should be updated with every new data being fed through
Kafka into Spark Streaming.*

*Avg(Col1) = Running Avg*
*Col2 = Col2/Avg(Col1)*


*Queries* *-*


*1) I am currently trying to simply run a inner query inside a query and
print Avg with other Col value and then later do the calculation. But,
getting error.*

Query -

select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF)
as myAvg from transformed_Stream_DF t

Error -

pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources
must be executed with writeStream.start();

Even though, I already have writeStream.start(); in my code, it is probably
throwing the error because of the inner select query (I think Spark is
assuming it as another query altogether which require its own
writeStream.start. Any help?


*2) How to go about it? *I have another point in mind, i.e, querying the
table to get the avg and store it in a variable. In the second query simply
pass the variable and divide the second column to produce appropriate
result. But, is it the right approach?

*3) Final question*: How to do the calculation over the entire data and not
the latest, do I need to keep appending somewhere and repeatedly use it? My
average and all the rows of the Col2 shall change with every new incoming
data.


*Code -*

from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")
aggregate_func = spark.sql(
"select t.Col2 , (Select AVG(Col1) as Avg from
transformed_Stream_DF) as myAvg from transformed_Stream_DF t")  #
(Col2/(AVG(Col1)) as Col3)")

# ---For Console Print---

query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# ---Console Print ends---

query.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py




Thanks,
Aakash.