Re: Writing record once after the watermarking interval in Spark Structured Streaming

2018-03-29 Thread Bowden, Chris
The watermark is just a user-provided indicator to spark that it's ok to drop 
internal state after some period of time. The watermark "interval" doesn't 
directly dictate whether hot rows are sent to a sink. Think of a hot row as 
data younger than the watermark. However, the watermark will prevent cold rows 
from being fully processed and sent to the sink (e.g., rows older than the 
watermark). There is no notion of requesting all data be queued and released 
only after the watermark has advanced past the time-based groups in that queue.


If you want to ensure only one row per time-based group is sent to the sink, 
you could get fancy with timeouts and flatMapGroupsWithState. Keep in mind, 
even in this scenario, the same row may be sent more than once if a micro-batch 
is reprocessed (this is why it is important for sinks to be idempotent, because 
it's really at-least-once effectively exactly-once).


In general, I would assume you care about this fine-grained control because 
your sink is not idempotent.


-Chris


From: karthikjay 
Sent: Thursday, March 29, 2018 5:10:09 PM
To: user@spark.apache.org
Subject: Writing record once after the watermarking interval in Spark 
Structured Streaming

I have the following query:

val ds = dataFrame
  .filter(! $"requri".endsWith(".m3u8"))
  .filter(! $"bserver".contains("trimmer"))
  .withWatermark("time", "120 seconds")
  .groupBy(window(dataFrame.col("time"),"60
seconds"),col("channelName"))
  .agg(sum("bytes")/100 as "byte_count")

How do I implement a foreach writer so that its process method is triggered
only once for every watermarking interval. i.e in the aforementioned
example, I will get the following

10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)
...



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Writing record once after the watermarking interval in Spark Structured Streaming

2018-03-29 Thread karthikjay
I have the following query:

val ds = dataFrame
  .filter(! $"requri".endsWith(".m3u8"))
  .filter(! $"bserver".contains("trimmer"))
  .withWatermark("time", "120 seconds")
  .groupBy(window(dataFrame.col("time"),"60
seconds"),col("channelName"))
  .agg(sum("bytes")/100 as "byte_count")

How do I implement a foreach writer so that its process method is triggered
only once for every watermarking interval. i.e in the aforementioned
example, I will get the following

10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)
...



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark on K8s resource staging server timeout

2018-03-29 Thread Jenna Hoole
Unfortunately the other Kubernetes cluster I was using was rebuilt from
scratch yesterday, but the RSS I have up today has pretty uninteresting
logs.

[root@nid6 ~]# kubectl logs
default-spark-resource-staging-server-7669dd57d7-xkvp6

++ id -u

+ myuid=0

++ id -g

+ mygid=0

++ getent passwd 0

+ uidentry=root:x:0:0:root:/root:/bin/ash

+ '[' -z root:x:0:0:root:/root:/bin/ash ']'

+ /sbin/tini -s -- /opt/spark/bin/spark-class
org.apache.spark.deploy.rest.k8s.ResourceStagingServer
/etc/spark-resource-staging-server/resource-staging-server.properties

2018-03-29 18:44:03 INFO  log:192 - Logging initialized @23503ms

2018-03-29 18:44:07 WARN  ContextHandler:1444 -
o.s.j.s.ServletContextHandler@7a55af6b{/,null,null} contextPath ends with /

2018-03-29 18:44:17 WARN  NativeCodeLoader:62 - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable

2018-03-29 18:44:21 INFO  SecurityManager:54 - Changing view acls to: root

2018-03-29 18:44:21 INFO  SecurityManager:54 - Changing modify acls to: root

2018-03-29 18:44:21 INFO  SecurityManager:54 - Changing view acls groups to:


2018-03-29 18:44:21 INFO  SecurityManager:54 - Changing modify acls groups
to:

2018-03-29 18:44:21 INFO  SecurityManager:54 - SecurityManager:
authentication disabled; ui acls disabled; users  with view permissions:
Set(root); groups with view permissions: Set(); users  with modify
permissions: Set(root); groups with modify permissions: Set()

2018-03-29 18:44:22 INFO  Server:345 - jetty-9.3.z-SNAPSHOT

2018-03-29 18:44:47 INFO  ContextHandler:781 - Started
o.s.j.s.ServletContextHandler@7a55af6b{/api,null,AVAILABLE}

2018-03-29 18:44:48 INFO  AbstractConnector:270 - Started
ServerConnector@4f8b4bd0{HTTP/1.1,[http/1.1]}{0.0.0.0:1}

2018-03-29 18:44:48 INFO  Server:403 - Started @68600ms

2018-03-29 18:44:48 INFO  ResourceStagingServer:54 - Resource staging
server started on port 1.


-Jenna

On Thu, Mar 29, 2018 at 1:26 PM, Matt Cheah  wrote:

> Hello Jenna,
>
>
>
> Are there any logs from the resource staging server pod? They might show
> something interesting.
>
>
>
> Unfortunately, we haven’t been maintaining the resource staging server
> because we’ve moved all of our effort to the main repository instead of the
> fork. When we consider the submission of local files in the official
> release we should probably create a mechanism that’s more resilient. Using
> a single HTTP server isn’t ideal – would ideally like something that’s
> highly available, replicated, etc.
>
>
>
> -Matt Cheah
>
>
>
> *From: *Jenna Hoole 
> *Date: *Thursday, March 29, 2018 at 10:37 AM
> *To: *"user@spark.apache.org" 
> *Subject: *Re: Spark on K8s resource staging server timeout
>
>
>
> I added overkill high timeouts to the OkHttpClient.Builder() in
> RetrofitClientFactory.scala and I don't seem to be timing out anymore.
>
>
>
> val okHttpClientBuilder = new OkHttpClient.Builder()
>
>   .dispatcher(dispatcher)
>
>   .proxy(resolvedProxy)
>
>   .connectTimeout(120, TimeUnit.SECONDS)
>
>   .writeTimeout(120, TimeUnit.SECONDS)
>
>   .readTimeout(120, TimeUnit.SECONDS)
>
>
>
> -Jenna
>
>
>
> On Tue, Mar 27, 2018 at 10:48 AM, Jenna Hoole 
> wrote:
>
> So I'm running into an issue with my resource staging server that's
> producing a stacktrace like Issue 342 [github.com]
> ,
> but I don't think for the same reasons. What's happening is that every time
> after I start up a resource staging server, the first job submitted that
> uses it will fail with a java.net [java.net]
> 
> .SocketTimeoutException: timeout, and then every subsequent job will run
> perfectly. Including with different jars and different users. It's only
> ever the first job that fails and it always fails. I know I'm also running
> into Issue 577 [github.com]
> 
>  in
> that it takes about three minutes before the resource staging server is
> accessible, but I'm still failing waiting over ten minutes or in one case
> overnight. And I'm just using the examples jar, so it's not a super large
> jar like in Issue 342.
>
>
>
> 

Re: Spark on K8s resource staging server timeout

2018-03-29 Thread Matt Cheah
Hello Jenna,

 

Are there any logs from the resource staging server pod? They might show 
something interesting.

 

Unfortunately, we haven’t been maintaining the resource staging server because 
we’ve moved all of our effort to the main repository instead of the fork. When 
we consider the submission of local files in the official release we should 
probably create a mechanism that’s more resilient. Using a single HTTP server 
isn’t ideal – would ideally like something that’s highly available, replicated, 
etc.

 

-Matt Cheah

 

From: Jenna Hoole 
Date: Thursday, March 29, 2018 at 10:37 AM
To: "user@spark.apache.org" 
Subject: Re: Spark on K8s resource staging server timeout

 

I added overkill high timeouts to the OkHttpClient.Builder() in 
RetrofitClientFactory.scala and I don't seem to be timing out anymore. 

 

val okHttpClientBuilder = new OkHttpClient.Builder()

  .dispatcher(dispatcher)

  .proxy(resolvedProxy)

  .connectTimeout(120, TimeUnit.SECONDS)

  .writeTimeout(120, TimeUnit.SECONDS)

  .readTimeout(120, TimeUnit.SECONDS)

 

-Jenna

 

On Tue, Mar 27, 2018 at 10:48 AM, Jenna Hoole  wrote:

So I'm running into an issue with my resource staging server that's producing a 
stacktrace like Issue 342 [github.com], but I don't think for the same reasons. 
What's happening is that every time after I start up a resource staging server, 
the first job submitted that uses it will fail with a java.net 
[java.net].SocketTimeoutException: timeout, and then every subsequent job will 
run perfectly. Including with different jars and different users. It's only 
ever the first job that fails and it always fails. I know I'm also running into 
Issue 577 [github.com] in that it takes about three minutes before the resource 
staging server is accessible, but I'm still failing waiting over ten minutes or 
in one case overnight. And I'm just using the examples jar, so it's not a super 
large jar like in Issue 342. 

 

This isn't great for our CI process, so has anyone seen anything like this 
before or know how to increase the timeout if it just takes a while on initial 
contact? Using spark.network.timeout has no effect.

 

[jhoole@nid6 spark]$ kubectl get pods | grep jhoole-spark

jhoole-spark-resource-staging-server-6475c8-w5cdm   1/1   Running   
  13m

[jhoole@nid6 spark]$ kubectl get svc | grep jhoole-spark

jhoole-spark-resource-staging-service   NodePort10.96.143.55   
1:30622/TCP 13m

[jhoole@nid6 spark]$ bin/spark-submit --class 
org.apache.spark.examples.SparkPi --conf spark.app.name 
[spark.app.name]=spark-pi --conf 
spark.kubernetes.resourceStagingServer.uri=http://192.168.0.1:30622 
[192.168.0.1] 
./examples/target/scala-2.11/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar 

2018-03-27 12:30:13 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable

2018-03-27 12:30:13 INFO  UserGroupInformation:966 - Login successful for user 
jhoole@local using keytab file /security/secrets/jhoole.keytab

2018-03-27 12:30:14 INFO  HadoopStepsOrchestrator:54 - Hadoop Conf directory: 
/etc/hadoop/conf

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing view acls to: jhoole

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing modify acls to: jhoole

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing view acls groups to: 

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing modify acls groups to: 

2018-03-27 12:30:14 INFO  SecurityManager:54 - SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(jhoole); groups 
with view permissions: Set(); users  with modify permissions: Set(jhoole); 
groups with modify permissions: Set()

Exception in thread "main" java.net [java.net].SocketTimeoutException: timeout

at okio.Okio$4.newTimeoutException(Okio.java:230)

at okio.AsyncTimeout.exit(AsyncTimeout.java:285)

at okio.AsyncTimeout$2.read(AsyncTimeout.java:241)

at okio.RealBufferedSource.indexOf(RealBufferedSource.java:345)

at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:217)

at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:211)

at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)

at 
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at 

Re: Spark on K8s resource staging server timeout

2018-03-29 Thread Jenna Hoole
I added overkill high timeouts to the OkHttpClient.Builder() in
RetrofitClientFactory.scala and I don't seem to be timing out anymore.

val okHttpClientBuilder = new OkHttpClient.Builder()
  .dispatcher(dispatcher)
  .proxy(resolvedProxy)
  .connectTimeout(120, TimeUnit.SECONDS)
  .writeTimeout(120, TimeUnit.SECONDS)
  .readTimeout(120, TimeUnit.SECONDS)

-Jenna

On Tue, Mar 27, 2018 at 10:48 AM, Jenna Hoole  wrote:

> So I'm running into an issue with my resource staging server that's
> producing a stacktrace like Issue 342
> , but I don't
> think for the same reasons. What's happening is that every time after I
> start up a resource staging server, the first job submitted that uses it
> will fail with a java.net.SocketTimeoutException: timeout, and then every
> subsequent job will run perfectly. Including with different jars and
> different users. It's only ever the first job that fails and it always
> fails. I know I'm also running into Issue 577
>  in that it
> takes about three minutes before the resource staging server is accessible,
> but I'm still failing waiting over ten minutes or in one case overnight.
> And I'm just using the examples jar, so it's not a super large jar like in
> Issue 342.
>
> This isn't great for our CI process, so has anyone seen anything like this
> before or know how to increase the timeout if it just takes a while on
> initial contact? Using spark.network.timeout has no effect.
>
> [jhoole@nid6 spark]$ kubectl get pods | grep jhoole-spark
>
> jhoole-spark-resource-staging-server-6475c8-w5cdm   1/1   Running
> 0  13m
>
> [jhoole@nid6 spark]$ kubectl get svc | grep jhoole-spark
>
> jhoole-spark-resource-staging-service   NodePort10.96.143.55
>   1:30622/TCP 13m
>
> [jhoole@nid6 spark]$ bin/spark-submit --class
> org.apache.spark.examples.SparkPi --conf spark.app.name=spark-pi --conf
> spark.kubernetes.resourceStagingServer.uri=http://192.168.0.1:30622
> ./examples/target/scala-2.11/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar
>
> 2018-03-27 12:30:13 WARN  NativeCodeLoader:62 - Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
>
> 2018-03-27 12:30:13 INFO  UserGroupInformation:966 - Login successful for
> user jhoole@local using keytab file /security/secrets/jhoole.keytab
>
> 2018-03-27 12:30:14 INFO  HadoopStepsOrchestrator:54 - Hadoop Conf
> directory: /etc/hadoop/conf
>
> 2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing view acls to:
> jhoole
>
> 2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing modify acls to:
> jhoole
>
> 2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing view acls groups
> to:
>
> 2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing modify acls
> groups to:
>
> 2018-03-27 12:30:14 INFO  SecurityManager:54 - SecurityManager:
> authentication disabled; ui acls disabled; users  with view permissions:
> Set(jhoole); groups with view permissions: Set(); users  with modify
> permissions: Set(jhoole); groups with modify permissions: Set()
>
> Exception in thread "main" java.net.SocketTimeoutException: timeout
>
> at okio.Okio$4.newTimeoutException(Okio.java:230)
>
> at okio.AsyncTimeout.exit(AsyncTimeout.java:285)
>
> at okio.AsyncTimeout$2.read(AsyncTimeout.java:241)
>
> at okio.RealBufferedSource.indexOf(RealBufferedSource.java:345)
>
> at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:217)
>
> at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:211)
>
> at okhttp3.internal.http1.Http1Codec.readResponseHeaders(
> Http1Codec.java:189)
>
> at okhttp3.internal.http.CallServerInterceptor.intercept(
> CallServerInterceptor.java:75)
>
> at okhttp3.internal.http.RealInterceptorChain.proceed(
> RealInterceptorChain.java:92)
>
> at okhttp3.internal.connection.ConnectInterceptor.intercept(
> ConnectInterceptor.java:45)
>
> at okhttp3.internal.http.RealInterceptorChain.proceed(
> RealInterceptorChain.java:92)
>
> at okhttp3.internal.http.RealInterceptorChain.proceed(
> RealInterceptorChain.java:67)
>
> at okhttp3.internal.cache.CacheInterceptor.intercept(
> CacheInterceptor.java:93)
>
> at okhttp3.internal.http.RealInterceptorChain.proceed(
> RealInterceptorChain.java:92)
>
> at okhttp3.internal.http.RealInterceptorChain.proceed(
> RealInterceptorChain.java:67)
>
> at okhttp3.internal.http.BridgeInterceptor.intercept(
> BridgeInterceptor.java:93)
>
> at okhttp3.internal.http.RealInterceptorChain.proceed(
> RealInterceptorChain.java:92)
>
> at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(
> RetryAndFollowUpInterceptor.java:120)
>
> at okhttp3.internal.http.RealInterceptorChain.proceed(
> RealInterceptorChain.java:92)
>
> at okhttp3.internal.http.RealInterceptorChain.proceed(
> RealInterceptorChain.java:67)
>
> at 

Multiple columns using 'isin' command in pyspark

2018-03-29 Thread Shuporno Choudhury
Hi Spark Users,

I am trying to achieve the 'IN' functionality of SQL using the isin
function in pyspark
Eg: select count(*) from tableA
  where (col1, col2) in ((1, 100),(2, 200), (3,300))

We can very well have 1 column isin statements like:
df.filter(df[0].isin(1,2,3)).count()

But, can I multiple columns in that statement like:
df.filter((df[0],df[1]).isin((1,100),(2,200),(3,300)).count()

Is this possible to achieve?
Or do I have to create multiple isin statements, merge them using '&'
condition and then execute the statemnt to get the final result?

Any help would be really appreciated.

-- 
Thanks,
Shuporno Choudhury


spark jdbc postgres query results don't match those of postgres query

2018-03-29 Thread Kevin Peng
I am running into a weird issue in Spark 1.6, which I was wondering if
anyone has encountered before. I am running a simple select query from
spark using a jdbc connection to postgres: val POSTGRES_DRIVER: String =
"org.postgresql.Driver" val srcSql = """select total_action_value,
last_updated from fb_fact_no_seg_20180123 where ad_id =
'23842688418150437'"" val r = sqlContext.read.format("jdbc").options(Map(
"url" -> jdbcUrl, "dbtable" -> s"($srcSql) as src" , "driver" ->
POSTGRES_DRIVER )).load().coalesce(1).cache() r.show
+--++ |total_action_value|
last_updated| +--++ |
2743.3301|2018-02-06 00:18:...| +--++
>From above you see that the result is 2743.3301, but when I run the same
query directly in postgres I get a slightly different answer: select
total_action_value, last_updated from fb_fact_no_seg_20180123 where ad_id =
'23842688418150437'; total_action_value | last_updated
+- 2743.33 | 2018-02-06 00:18:08 As
you can see from above the value is 2743.33. So why is the result coming
from spark off by .0001; basically where is .0001 coming from since in
postgres the decimal value is .33? Thanks, KP


Re: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when compared to Drill or Presto

2018-03-29 Thread Lalwani, Jayesh
It depends on how you have loaded data.. Ideally, if you have dozens of 
records, your input data should have them in one partition. If the input has 1 
partition, and data is small enough, Spark will keep it in one partition (as 
far as possible)

If you cannot control your data, you need to repartition the data when you load 
it  This will (eventually) cause a shuffle and all the data will be moved into 
the number of partitions that you specify. Subsequent operations will be on the 
repartitioned dataframe, and should take number of tasks. Shuffle has costs 
assosciated with it. You will need to make a call whether you want to take the 
upfront cost of a shuffle, or you want to live with large number of tasks

From: Tin Vu 
Date: Thursday, March 29, 2018 at 10:47 AM
To: "Lalwani, Jayesh" 
Cc: "user@spark.apache.org" 
Subject: Re: [SparkSQL] SparkSQL performance on small TPCDS tables is very low 
when compared to Drill or Presto

 You are right. There are too much tasks was created. How can we reduce the 
number of tasks?

On Thu, Mar 29, 2018, 7:44 AM Lalwani, Jayesh 
> wrote:
Without knowing too many details, I can only guess. It could be that Spark is 
creating a lot of tasks even though there are less records. Creation and 
distribution of tasks has a noticeable overhead on smaller datasets.

You might want to look at the driver logs, or the Spark Application Detail UI.

From: Tin Vu >
Date: Wednesday, March 28, 2018 at 8:04 PM
To: "user@spark.apache.org" 
>
Subject: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when 
compared to Drill or Presto

Hi,

I am executing a benchmark to compare performance of SparkSQL, Apache Drill and 
Presto. My experimental setup:
• TPCDS dataset with scale factor 100 (size 100GB).
• Spark, Drill, Presto have a same number of workers: 12.
• Each worked has same allocated amount of memory: 4GB.
• Data is stored by Hive with ORC format.

I executed a very simple SQL query: "SELECT * from table_name"
The issue is that for some small size tables (even table with few dozen of 
records), SparkSQL still required about 7-8 seconds to finish, while Drill and 
Presto only needed less than 1 second.
For other large tables with billions records, SparkSQL performance was 
reasonable when it required 20-30 seconds to scan the whole table.
Do you have any idea or reasonable explanation for this issue?

Thanks,




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Why doesn't spark use broadcast join?

2018-03-29 Thread Lalwani, Jayesh
Try putting a Broadcast hint like show here 
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-hint-framework.html#sql-hints

From: Vitaliy Pisarev 
Date: Thursday, March 29, 2018 at 8:42 AM
To: "user@spark.apache.org" 
Subject: Why doesn't spark use broadcast join?

I am looking at the physical plan for the following query:


SELECT f1,f2,f3,...
FROM T1
LEFT ANTI JOIN T2 ON T1.id = T2.id
WHERE  f1 = 'bla'
   AND f2 = 'bla2'
   AND some_date >= date_sub(current_date(), 1)
LIMIT 100
An important detail: the table 'T1' can be very large (hundreds of thousands of 
rows), but table T2 is rather small. Maximun in the thousands.
In this particular case, the table T2 has 2 rows.

In the physical plan, I see that a SortMergeJoin is performed. Despite it being 
the perfect candidate for a broadcast join.

What could be the reason for this?
Is there a way to hint the optimizer to perform a broadcast join in the sql 
syntax?

I am writing this in pyspark and the query itself is over parquets stored in 
Azure blob storage.




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when compared to Drill or Presto

2018-03-29 Thread Tin Vu
 You are right. There are too much tasks was created. How can we reduce the
number of tasks?

On Thu, Mar 29, 2018, 7:44 AM Lalwani, Jayesh 
wrote:

> Without knowing too many details, I can only guess. It could be that Spark
> is creating a lot of tasks even though there are less records. Creation and
> distribution of tasks has a noticeable overhead on smaller datasets.
>
>
>
> You might want to look at the driver logs, or the Spark Application Detail
> UI.
>
>
>
> *From: *Tin Vu 
> *Date: *Wednesday, March 28, 2018 at 8:04 PM
> *To: *"user@spark.apache.org" 
> *Subject: *[SparkSQL] SparkSQL performance on small TPCDS tables is very
> low when compared to Drill or Presto
>
>
>
> Hi,
>
>
>
> I am executing a benchmark to compare performance of SparkSQL, Apache
> Drill and Presto. My experimental setup:
>
> · TPCDS dataset with scale factor 100 (size 100GB).
>
> · Spark, Drill, Presto have a same number of workers: 12.
>
> · Each worked has same allocated amount of memory: 4GB.
>
> · Data is stored by Hive with ORC format.
>
> I executed a very simple SQL query: "SELECT * from table_name"
> The issue is that for some small size tables (even table with few dozen of
> records), SparkSQL still required about 7-8 seconds to finish, while Drill
> and Presto only needed less than 1 second.
> For other large tables with billions records, SparkSQL performance was
> reasonable when it required 20-30 seconds to scan the whole table.
> Do you have any idea or reasonable explanation for this issue?
>
> Thanks,
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when compared to Drill or Presto

2018-03-29 Thread Lalwani, Jayesh
Without knowing too many details, I can only guess. It could be that Spark is 
creating a lot of tasks even though there are less records. Creation and 
distribution of tasks has a noticeable overhead on smaller datasets.

You might want to look at the driver logs, or the Spark Application Detail UI.

From: Tin Vu 
Date: Wednesday, March 28, 2018 at 8:04 PM
To: "user@spark.apache.org" 
Subject: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when 
compared to Drill or Presto

Hi,

I am executing a benchmark to compare performance of SparkSQL, Apache Drill and 
Presto. My experimental setup:
· TPCDS dataset with scale factor 100 (size 100GB).
· Spark, Drill, Presto have a same number of workers: 12.
· Each worked has same allocated amount of memory: 4GB.
· Data is stored by Hive with ORC format.

I executed a very simple SQL query: "SELECT * from table_name"
The issue is that for some small size tables (even table with few dozen of 
records), SparkSQL still required about 7-8 seconds to finish, while Drill and 
Presto only needed less than 1 second.
For other large tables with billions records, SparkSQL performance was 
reasonable when it required 20-30 seconds to scan the whole table.
Do you have any idea or reasonable explanation for this issue?

Thanks,



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Stopping StreamingContext

2018-03-29 Thread Sidney Feiner
Hey,
I have a Spark Streaming application processing some events.
Sometimes, I want to stop the application if a get a specific event. I collect 
the executor's results in the driver and based on those results, I kill the 
StreamingContext using StreamingContext.stop(stopSparkContext=true).
When I do that, I can see in the logs that the app is shutting down, closing 
receivers etc.
But when I go to the master's web UI I can still see the app under "Running 
Applications". But if I click it, it says the endpoint doesn't exist.
When I check the open processes on the machine, I can see that the job's 
process is still running.
Am I closing the application wrong?

Those are the logs once I call the stop() method:

2018-03-28 11:59:04 INFO  KafkaProducer:615 - Closing the Kafka producer with 
timeoutMillis = 9223372036854775807 ms.
2018-03-28 11:59:04 INFO  ReceiverTracker:54 - Sent stop signal to all 1 
receivers
2018-03-28 11:59:05 INFO  BlockManagerInfo:54 - Added input-0-1522238344750 in 
memory on i-va-spark1:59059 (size: 1632.0 B, free: 579.2 MB)
2018-03-28 11:59:05 ERROR ReceiverTracker:70 - Deregistered receiver for stream 
0: Stopped by driver
2018-03-28 11:59:05 INFO  BlockManagerInfo:54 - Added input-0-1522238345000 in 
memory on i-va-spark1:59059 (size: 272.0 B, free: 579.2 MB)
2018-03-28 11:59:05 INFO  TaskSetManager:54 - Finished task 0.0 in stage 2.0 
(TID 70) in 30213 ms on i-va-spark1 (executor 0) (1/1)
2018-03-28 11:59:05 INFO  TaskSchedulerImpl:54 - Removed TaskSet 2.0, whose 
tasks have all completed, from pool
2018-03-28 11:59:05 INFO  DAGScheduler:54 - ResultStage 2 (start at 
UserLocationHistoryJob.scala:38) finished in 30.213 s
2018-03-28 11:59:05 INFO  ReceiverTracker:54 - All of the receivers have 
deregistered successfully
2018-03-28 11:59:05 INFO  ReceiverTracker:54 - ReceiverTracker stopped
2018-03-28 11:59:05 INFO  JobGenerator:54 - Stopping JobGenerator immediately
2018-03-28 11:59:05 INFO  RecurringTimer:54 - Stopped timer for JobGenerator 
after time 152223834
2018-03-28 11:59:05 INFO  JobGenerator:54 - Stopped JobGenerator
2018-03-28 11:59:07 INFO  JobScheduler:54 - Stopped JobScheduler
2018-03-28 11:59:07 INFO  StreamingContext:54 - StreamingContext stopped 
successfully
2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_5_piece0 on 
10.0.0.243:41976 in memory (size: 2.4 KB, free: 488.4 MB)
2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_5_piece0 on 
i-va-spark1:59059 in memory (size: 2.4 KB, free: 579.2 MB)
2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_4_piece0 on 
10.0.0.243:41976 in memory (size: 23.9 KB, free: 488.4 MB)
2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_4_piece0 on 
i-va-spark1:59059 in memory (size: 23.9 KB, free: 579.2 MB)
2018-03-28 11:59:37 WARN  QueuedThreadPool:178 - 
SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop 
Thread[SparkUI-171-selector-ServerConnectorManager@478b3e9/2,5,main]
2018-03-28 11:59:37 WARN  QueuedThreadPool:178 - 
SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop 
Thread[SparkUI-172-selector-ServerConnectorManager@478b3e9/3,5,main]
2018-03-28 11:59:37 WARN  QueuedThreadPool:178 - 
SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop 
Thread[SparkUI-169-selector-ServerConnectorManager@478b3e9/0,5,main]
2018-03-28 11:59:37 WARN  QueuedThreadPool:178 - 
SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop 
Thread[SparkUI-170-selector-ServerConnectorManager@478b3e9/1,5,main]
2018-03-28 13:22:01 INFO  DiskBlockManager:54 - Shutdown hook called
2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Deleting directory 
/data/spark/scratch/spark-69a3a8a6-5504-4153-a4c1-059676861581
2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Deleting directory 
/data/spark/scratch/spark-69a3a8a6-5504-4153-a4c1-059676861581/userFiles-8a970eec-da41-442b-9ccf-1621b9e9e045



Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




Best practices for optimizing the structure of parquet schema

2018-03-29 Thread Vitaliy Pisarev
There is a lot of talk that in order to really benefit from fast queries
over parquet and hdfs, we need to make sure that the data is stored in a
manner that is friendly to compression.

Unfortunately, I did not find any specific guidelines or tips online that
describe do-s and dont-s
in designing the parquet schema.

I am wondering that perhaps someone here can either share sych material or
his or her own experience regarding this.

For example:

I have the following logical structure that I want to store:

{
 root: [
 [int, int float, float],
 [int, int float, float],
 [int, int float, float],
 ,
 .
 ]
}

This is of course a list of lists. All the sublists are actually vectors of
the same length where the coordinates match in meaning and type.

If I understand correctly, the best way to *store* this structure is by
going for the columnar paradigm, where I will have 4 very long vectors, one
for each coordinate. rather than many vectors that are short.

What other consideration can I apply?


Why doesn't spark use broadcast join?

2018-03-29 Thread Vitaliy Pisarev
I am looking at the physical plan for the following query:

SELECT f1,f2,f3,...
FROM T1
LEFT ANTI JOIN T2 ON T1.id = T2.id
WHERE  f1 = 'bla'
   AND f2 = 'bla2'
   AND some_date >= date_sub(current_date(), 1)
LIMIT 100

An important detail: the table 'T1' can be very large (hundreds of
thousands of rows), but table T2 is rather small. Maximun in the thousands.
In this particular case, the table T2 has 2 rows.

In the physical plan, I see that a SortMergeJoin is performed. Despite it
being the perfect candidate for a broadcast join.

What could be the reason for this?
Is there a way to hint the optimizer to perform a broadcast join in the sql
syntax?

I am writing this in pyspark and the query itself is over parquets stored
in Azure blob storage.


[Query] Columnar transformation without Structured Streaming

2018-03-29 Thread Aakash Basu
Hi,

I started my Spark Streaming journey from Structured Streaming using Spark
2.3, where I can easily do Spark SQL transformations on streaming data.

But, I want to know, how can I do columnar transformation (like, running
aggregation or casting, et al) using the prior utility of DStreams? Is
there a way? Do I have to use map on RDD and go about the complex
transformative steps? Or can I convert a DStream into DF and do the job?

Appreciations in advance!

Thanks,
Aakash.


[Structured Streaming] Kafka Sink in Spark 2.3

2018-03-29 Thread Lalwani, Jayesh
Hi

I have a custom streaming sink that internally uses 
org.apache.spark.sql.kafka010.KafkaSink. This was working in 2.2.. When I 
upgraded to 2.3, I get this exception.

Does spark-sql-Kafka010 work on Spark 2.3?

84705281f4b]] DEBUG com.capitalone.sdp.spark.source.SdpSink - Writing batch to 
Kafka: batchId=0
2018-03-29 06:45:35,628 [stream execution thread for [id = 
512dc41e-9435-4260-92c0-f99114b82bd4, runId = 
d4f9e964-355d-4d7c-af2b-384705281f4b]] ERROR 
org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query [id = 
512dc41e-9435-4260-92c0-f99114b82bd4, runId = 
d4f9e964-355d-4d7c-af2b-384705281f4b] terminated with error
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be 
executed with writeStream.start();;
LogicalRDD [value#939], true

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
   at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)