Multiple queries on same stream

2017-08-08 Thread Raghavendra Pandey
I am using structured streaming to evaluate multiple rules on same running
stream.
I have two options to do that. One is to use forEach and evaluate all the
rules on the row..
The other option is to express rules in spark sql dsl and run multiple
queries.
I was wondering if option 1 will result in better performance even though I
can get catalyst optimization in option 2.

Thanks
Raghav


Unsubscribe

2017-08-08 Thread Chandrashekhar Vaidya
 

 

From: Sumit Saraswat [mailto:sumitxapa...@gmail.com] 
Sent: Tuesday, August 8, 2017 10:33 PM
To: user@spark.apache.org
Subject: Unsubscribe

 

Unsubscribe


Reusing dataframes for streaming (spark 1.6)

2017-08-08 Thread Ashwin Raju
Hi,

We've built a batch application on Spark 1.6.1. I'm looking into how to run
the same code as a streaming (DStream based) application. This is using
pyspark.

In the batch application, we have a sequence of transforms that read from
file, do dataframe operations, then write to file. I was hoping to swap out
the read from file with textFileStream, then use the dataframe operations
as is. This would mean that if we change the batch pipeline, so long as it
is a sequence of dataframe operations, the streaming version can just reuse
the code.

Looking at the sql_network_wordcount

example, it looks like I'd have to do DStream.foreachRDD, convert the
passed in RDD into a dataframe and then do my sequence of dataframe
operations. However, that list of dataframe operations looks to be
hardcoded into the process method, is there any way to pass in a function
that takes a dataframe as input and returns a dataframe?

what i see from the example:

words.foreachRDD(process)

def process(time, rdd):
# create dataframe from RDD
# hardcoded operations on the dataframe

what i would like to do instead:
def process(time, rdd):
# create dataframe from RDD - input_df
# output_df = dataframe_pipeline_fn(input_df)

-ashwin


Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
>
> 1) Parsing data/Schema creation: The Bro IDS logs have a 8 line header
> that contains the 'schema' for the data, each log http/dns/etc will have
> different columns with different data types. So would I create a specific
> CSV reader inherited from the general one?  Also I'm assuming this would
> need to be in Scala/Java? (I suck at both of those :)
>

This is a good question. What I have seen others do is actually run
different streams for the different log types.  This way you can customize
the schema to the specific log type.

Even without using Scala/Java you could also use the text data source
(assuming the logs are new line delimited) and then write the parser for
each line in python.  There will be a performance penalty here though.


> 2) Dynamic Tailing: Does the CSV/TSV data sources support dynamic tailing
> and handle log rotations?
>

The file based sources work by tracking which files have been processed and
then scanning (optionally using glob patterns) for new files.  There a two
assumptions here: files are immutable when they arrive and files always
have a unique name. If files are deleted, we ignore that, so you are okay
to rotate them out.

The full pipeline that I have seen often involves the logs getting uploaded
to something like S3.  This is nice because you get atomic visibility of
files that have already been rotated.  So I wouldn't really call this
dynamically tailing, but we do support looking for new files at some
location.


StructuredStreaming: java.util.concurrent.TimeoutException: Cannot fetch record for offset

2017-08-08 Thread aravias
Hi,
we have a structured streaming app consuming data from kafka and writing to
s3.
I keep getting this timeout exception whenever the executor is specified and
running with more than one core per executor. If someone can share any info
related to this  if you know it would be great.



17/08/08 21:58:28 WARN TaskSetManager: Lost task 2.1 in stage 0.0 (TID 21,
ip-10-120-1-44.ec2.internal, executor 6):
java.util.concurrent.TimeoutException: Cannot fetch record for offset
362976936 in 12 milliseconds
at
org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:208)
at
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:158)
at
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:149)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at
com.homeaway.omnihub.task.SparkStreamingTask.lambda$transform$349866b9$1(SparkStreamingTask.java:113)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:2245)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:2245)
at
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:186)
at
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:183)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-java-util-concurrent-TimeoutException-Cannot-fetch-record-for-offset-tp29045.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to get the lag of a column in a spark streaming dataframe?

2017-08-08 Thread Prashanth Kumar Murali
I have data streaming into my spark scala application in this format

idmark1 mark2 mark3 time
uuid1 100   200   300   Tue Aug  8 14:06:02 PDT 2017
uuid1 100   200   300   Tue Aug  8 14:06:22 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:32 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:52 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:58 PDT 2017

I have it read into columns id, mark1, mark2, mark3 and time. The time is
converted to datetime format as well. I want to get this grouped by id and
get the lag for mark1 which gives the previous row's mark1 value. Something
like this:

idmark1 mark2 mark3 prev_mark time
uuid1 100   200   300   null  Tue Aug  8 14:06:02 PDT 2017
uuid1 100   200   300   100   Tue Aug  8 14:06:22 PDT 2017
uuid2 150   250   350   null  Tue Aug  8 14:06:32 PDT 2017
uuid2 150   250   350   150   Tue Aug  8 14:06:52 PDT 2017
uuid2 150   250   350   150   Tue Aug  8 14:06:58 PDT 2017

Consider the dataframe to be markDF. I have tried:

val window = Window.partitionBy("uuid").orderBy("timestamp") val newerDF =
newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

which says non time windows cannot be applied on streaming/appending
datasets/frames.

I have also tried:

val window =
Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10) val
newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

To get a window for few rows which did not work either. The streaming
window something like: window("timestamp", "10 minutes") cannot be used to
send over the lag. I am super confused on how to do this. Any help would be
awesome!!


Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2017-08-08 Thread dcam
Considering the @transient annotations and the work done in the instance
initializer, not much state is really be broadcast to the executors. It
might be simpler to just create these instances on the executors, rather
than trying to broadcast them?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698p29044.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
I can see your point that you don't really want an external process being
used for the streaming data sourceOkay so on the CSV/TSV front, I have
two follow up questions:

1) Parsing data/Schema creation: The Bro IDS logs have a 8 line header that
contains the 'schema' for the data, each log http/dns/etc will have
different columns with different data types. So would I create a specific
CSV reader inherited from the general one?  Also I'm assuming this would
need to be in Scala/Java? (I suck at both of those :)

2) Dynamic Tailing: Does the CSV/TSV data sources support dynamic tailing
and handle log rotations?

Thanks and BTW your Spark Summit talks are really well done and
informative. You're an excellent speaker.

-Brian

On Tue, Aug 8, 2017 at 2:09 PM, Michael Armbrust 
wrote:

> Cool stuff! A pattern I have seen is to use our CSV/TSV or JSON support to
> read bro logs, rather than a python library.  This is likely to have much
> better performance since we can do all of the parsing on the JVM without
> having to flow it though an external python process.
>
> On Tue, Aug 8, 2017 at 9:35 AM, Brian Wylie 
> wrote:
>
>> Hi All,
>>
>> I've read the new information about Structured Streaming in Spark, looks
>> super great.
>>
>> Resources that I've looked at
>> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
>> - https://databricks.com/blog/2016/07/28/structured-streaming-
>> in-apache-spark.html
>> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Stru
>> ctured%20Streaming%20using%20Python%20DataFrames%20API.html
>>
>> + YouTube videos from Spark Summit 2016/2017
>>
>> So finally getting to my question:
>>
>> I have Python code that yields a Python generator... this is a great
>> streaming approach within Python. I've used it for network packet
>> processing and a bunch of other stuff. I'd love to simply hook up this
>> generator (that yields python dictionaries) along with a schema definition
>> to create an  'unbounded DataFrame' as discussed in
>> https://databricks.com/blog/2016/07/28/structured-streaming-
>> in-apache-spark.html
>>
>> Possible approaches:
>> - Make a custom receiver in Python: https://spark.apache.o
>> rg/docs/latest/streaming-custom-receivers.html
>> - Use Kafka (this is definitely possible and good but overkill for my use
>> case)
>> - Send data out a socket and use socketTextStream to pull back in (seems
>> a bit silly to me)
>> - Other???
>>
>> Since Python Generators so naturally fit into streaming pipelines I'd
>> think that this would be straightforward to 'couple' a python generator
>> into a Spark structured streaming pipeline..
>>
>> I've put together a small notebook just to give a concrete example
>> (streaming Bro IDS network data) https://github.com/Kitwa
>> re/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb
>>
>> Any thoughts/suggestions/pointers are greatly appreciated.
>>
>> -Brian
>>
>>
>


[Spark Structured Streaming]: truncated Parquet after driver crash or kill

2017-08-08 Thread dcam
Hello list


We have a Spark application that performs a set of ETLs: reading messages
from a Kafka topic, categorizing them, and writing the contents out as
Parquet files on HDFS. After writing, we are querying the data from HDFS
using Presto's hive integration. We are having problems because the Parquet
files are frequently truncated after the Spark driver is killed or crashes.

The meat of the (Scala) Spark jobs look like this:
Spark
  .openSession()
  .initKafkaStream("our_topic")
  .filter(...)
  .map(...)
  .coalesce(1)
  .writeStream
  .trigger(ProcessingTime("1 hours"))
  .outputMode("append")
  .queryName("MyETL")
  .format("parquet")
  .option("path", path)
  .start()

Is it expected that Parquet files could be truncated during crashes? 

Sometimes the files are only 4 bytes long, sometimes they are longer but
still too short to be valid Parquet files. Presto detects the short files
and refuses to query the entire table. I hoped the write out of the files
would be transactional, so that incomplete files would not be output.

We can fix crashes as they come up, but we will always need to kill the job
periodically to deploy new versions of the code. We want to run the
application as a long lived process that is continually reading from the
Kafka queue and writing out to HDFS for archival purposes.


Thanks,
Dave Cameron




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-truncated-Parquet-after-driver-crash-or-kill-tp29043.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
Cool stuff! A pattern I have seen is to use our CSV/TSV or JSON support to
read bro logs, rather than a python library.  This is likely to have much
better performance since we can do all of the parsing on the JVM without
having to flow it though an external python process.

On Tue, Aug 8, 2017 at 9:35 AM, Brian Wylie  wrote:

> Hi All,
>
> I've read the new information about Structured Streaming in Spark, looks
> super great.
>
> Resources that I've looked at
> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
> - https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/
> Structured%20Streaming%20using%20Python%20DataFrames%20API.html
>
> + YouTube videos from Spark Summit 2016/2017
>
> So finally getting to my question:
>
> I have Python code that yields a Python generator... this is a great
> streaming approach within Python. I've used it for network packet
> processing and a bunch of other stuff. I'd love to simply hook up this
> generator (that yields python dictionaries) along with a schema definition
> to create an  'unbounded DataFrame' as discussed in
> https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
>
> Possible approaches:
> - Make a custom receiver in Python: https://spark.apache.
> org/docs/latest/streaming-custom-receivers.html
> - Use Kafka (this is definitely possible and good but overkill for my use
> case)
> - Send data out a socket and use socketTextStream to pull back in (seems a
> bit silly to me)
> - Other???
>
> Since Python Generators so naturally fit into streaming pipelines I'd
> think that this would be straightforward to 'couple' a python generator
> into a Spark structured streaming pipeline..
>
> I've put together a small notebook just to give a concrete example
> (streaming Bro IDS network data) https://github.com/
> Kitware/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb
>
> Any thoughts/suggestions/pointers are greatly appreciated.
>
> -Brian
>
>


Unsubscribe

2017-08-08 Thread Benjamin Soemartopo



From: john_test_test 
Sent: Wednesday, August 9, 2017 3:09:44 AM
To: user@spark.apache.org
Subject: speculative execution in spark

Is it possible by anyhow to take advantage of the already processed portion
of the failed task so I can use the speculative execution to reassign only
the what is left from the original task to another node? if yes then how can
I read it from memroy?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/speculative-execution-in-spark-tp29042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming job statistics

2017-08-08 Thread KhajaAsmath Mohammed
No. will take a look now.

On Tue, Aug 8, 2017 at 1:47 PM, Riccardo Ferrari  wrote:

> Hi,
>
> Have you tried to check the "Streaming" tab menu?
>
> Best,
>
> On Tue, Aug 8, 2017 at 4:15 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> I am running spark streaming job which receives data from azure iot hub.
>> I am not sure if the connection was successful and receving any data. does
>> the input column show how much data it has read if the connection was
>> successful?
>>
>> [image: Inline image 1]
>>
>
>


Re: Spark Streaming job statistics

2017-08-08 Thread Riccardo Ferrari
Hi,

Have you tried to check the "Streaming" tab menu?

Best,

On Tue, Aug 8, 2017 at 4:15 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am running spark streaming job which receives data from azure iot hub. I
> am not sure if the connection was successful and receving any data. does
> the input column show how much data it has read if the connection was
> successful?
>
> [image: Inline image 1]
>


Re: count exceed int.MaxValue

2017-08-08 Thread Vadim Semenov
Scala doesn't support ranges >= Int.MaxValue
https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/immutable/Range.scala?utf8=✓#L89

You can create two RDDs and unionize them:

scala> val rdd = sc.parallelize(1L to
Int.MaxValue.toLong).union(sc.parallelize(1L to Int.MaxValue.toLong))
rdd: org.apache.spark.rdd.RDD[Long] = UnionRDD[10] at union at :24

scala> rdd.count
[Stage 0:>  (0 + 4)
/ 8]


Also instead of creating the range on the driver, you can create your RDD
in parallel:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val numberOfParts = 100
val numberOfElementsInEachPart = Int.MaxValue.toDouble / 100

val rdd = sc.parallelize(1 to numberOfParts).flatMap(partNum => {
  val begin = ((partNum - 1) * numberOfElementsInEachPart).toLong
  val end = (partNum * numberOfElementsInEachPart).toLong
  begin to end
})

// Exiting paste mode, now interpreting.

numberOfParts: Int = 100
numberOfElementsInEachPart: Double = 2.147483647E7
rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[15] at flatMap at
:31

scala> rdd.count
res10: Long = 2147483747

On Tue, Aug 8, 2017 at 1:26 PM, makoto  wrote:

> Hello,
> I'd like to count more than Int.MaxValue. But I encountered the following
> error.
>
> scala> val rdd = sc.parallelize(1L to Int.MaxValue*2.toLong)
> rdd: org.apache.spark.rdd.RDD[Long] = ParallelCollectionRDD[28] at
> parallelize at :24
>
> scala> rdd.count
> java.lang.IllegalArgumentException: More than Int.MaxValue elements.
>   at scala.collection.immutable.NumericRange$.check$1(
> NumericRange.scala:304)
>   at scala.collection.immutable.NumericRange$.count(
> NumericRange.scala:314)
>   at scala.collection.immutable.NumericRange.numRangeElements$
> lzycompute(NumericRange.scala:52)
>   at scala.collection.immutable.NumericRange.numRangeElements(
> NumericRange.scala:51)
>   at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
>   at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:145)
>   at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
>   ... 48 elided
>
> How can I avoid the error ?
> A similar problem is as follows:
> scala> rdd.reduce((a,b)=> (a + b))
> java.lang.IllegalArgumentException: More than Int.MaxValue elements.
>   at scala.collection.immutable.NumericRange$.check$1(
> NumericRange.scala:304)
>   at scala.collection.immutable.NumericRange$.count(
> NumericRange.scala:314)
>   at scala.collection.immutable.NumericRange.numRangeElements$
> lzycompute(NumericRange.scala:52)
>   at scala.collection.immutable.NumericRange.numRangeElements(
> NumericRange.scala:51)
>   at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
>   at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:145)
>   at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
>   ... 48 elided
>
>
>


count exceed int.MaxValue

2017-08-08 Thread makoto
Hello,
I'd like to count more than Int.MaxValue. But I encountered the following
error.

scala> val rdd = sc.parallelize(1L to Int.MaxValue*2.toLong)
rdd: org.apache.spark.rdd.RDD[Long] = ParallelCollectionRDD[28] at
parallelize at :24

scala> rdd.count
java.lang.IllegalArgumentException: More than Int.MaxValue elements.
  at
scala.collection.immutable.NumericRange$.check$1(NumericRange.scala:304)
  at scala.collection.immutable.NumericRange$.count(NumericRange.scala:314)
  at
scala.collection.immutable.NumericRange.numRangeElements$lzycompute(NumericRange.scala:52)
  at
scala.collection.immutable.NumericRange.numRangeElements(NumericRange.scala:51)
  at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
  at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:145)
  at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
  ... 48 elided

How can I avoid the error ?
A similar problem is as follows:
scala> rdd.reduce((a,b)=> (a + b))
java.lang.IllegalArgumentException: More than Int.MaxValue elements.
  at
scala.collection.immutable.NumericRange$.check$1(NumericRange.scala:304)
  at scala.collection.immutable.NumericRange$.count(NumericRange.scala:314)
  at
scala.collection.immutable.NumericRange.numRangeElements$lzycompute(NumericRange.scala:52)
  at
scala.collection.immutable.NumericRange.numRangeElements(NumericRange.scala:51)
  at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
  at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:145)
  at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
  ... 48 elided


Fwd: Python question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
Hi All,

I've read the new information about Structured Streaming in Spark, looks
super great.

Resources that I've looked at
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
- https://databricks.com/blog/2016/07/28/structured-streamin
g-in-apache-spark.html
- https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.
0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html

+ YouTube videos from Spark Summit 2016/2017

So finally getting to my question:

I have Python code that yields a Python generator... this is a great
streaming approach within Python. I've used it for network packet
processing and a bunch of other stuff. I'd love to simply hook up this
generator (that yields python dictionaries) along with a schema definition
to create an  'unbounded DataFrame' as discussed in https://databricks.com/
blog/2016/07/28/structured-streaming-in-apache-spark.html

Possible approaches:
- Make a custom receiver in Python: https://spark.apache.o
rg/docs/latest/streaming-custom-receivers.html
- Use Kafka (this is definitely possible and good but overkill for my use
case)
- Send data out a socket and use socketTextStream to pull back in (seems a
bit silly to me)
- Other???

Since Python Generators so naturally fit into streaming pipelines I'd think
that this would be straightforward to 'couple' a python generator into a
Spark structured streaming pipeline..

I've put together a small notebook just to give a concrete example
(streaming Bro IDS network data) https://github.com/Kitwa
re/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb

Any thoughts/suggestions/pointers are greatly appreciated.

-Brian


speculative execution in spark

2017-08-08 Thread john_test_test
Is it possible by anyhow to take advantage of the already processed portion
of the failed task so I can use the speculative execution to reassign only
the what is left from the original task to another node? if yes then how can
I read it from memroy?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/speculative-execution-in-spark-tp29042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2017-08-08 Thread Sumit Saraswat
Unsubscribe


Question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
Hi All,

I've read the new information about Structured Streaming in Spark, looks
super great.

Resources that I've looked at
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
-
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
- https://spark.apache.org/docs/latest/streaming-custom-receivers.html
-
http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html

+ YouTube videos from Spark Summit 2016/2017

So finally getting to my question:

I have Python code that yields a Python generator... this is a great
streaming approach within Python. I've used it for network packet
processing and a bunch of other stuff. I'd love to simply hook up this
generator (that yields python dictionaries) along with a schema definition
to create an  'unbounded DataFrame' as discussed in
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html

Possible approaches:
- Make a custom receiver in Python:
https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- Use Kafka (this is definitely possible and good but overkill for my use
case)
- Send data out a socket and use socketTextStream to pull back in (seems a
bit silly to me)
- Other???

Since Python Generators so naturally fit into streaming pipelines I'd think
that this would be straightforward to 'couple' a python generator into a
Spark structured streaming pipeline..

I've put together a small notebook just to give a concrete example
(streaming Bro IDS network data)
https://github.com/Kitware/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb

Any thoughts/suggestions/pointers are greatly appreciated.

-Brian


unsubscribe

2017-08-08 Thread Ramesh Krishnan
unsubscribe


IndexOutOfBoundException in catalyst when doing multiple approxDistinctCount

2017-08-08 Thread AssafMendelson
Hi,

I am doing a large number of aggregations on a dataframe (without groupBy) to 
get some statistics. As part of this I am doing an approx_count_distinct(c, 
0.01)
Everything works fine but when I do the same aggregation a second time (for 
each column) I get the following error:



[Stage 2:>  (0 + 2) / 
2][WARN] org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Error 
calculating stats of compiled class.
java.lang.IndexOutOfBoundsException: Index: 4355, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at 
org.codehaus.janino.util.ClassFile.getConstantPoolInfo(ClassFile.java:556)
at 
org.codehaus.janino.util.ClassFile.getConstantUtf8(ClassFile.java:572)
at 
org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1513)
at 
org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
at 
org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
at org.codehaus.janino.util.ClassFile.(ClassFile.java:280)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:996)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:993)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:993)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:961)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1027)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1024)
at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
at 
org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
at 
org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:906)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:412)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:366)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:890)
at 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:130)
at 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:140)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.generateResultProjection(AggregationIterator.scala:235)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.(AggregationIterator.scala:266)
at 
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.(SortBasedAggregationIterator.scala:39)
at 
org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:86)
at 
org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:77)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at 

Spark Streaming job statistics

2017-08-08 Thread KhajaAsmath Mohammed
Hi,

I am running spark streaming job which receives data from azure iot hub. I
am not sure if the connection was successful and receving any data. does
the input column show how much data it has read if the connection was
successful?

[image: Inline image 1]


Re:

2017-08-08 Thread Ramesh Krishnan
unsubscribe

On Mon, Aug 7, 2017 at 2:57 PM, Sumit Saraswat 
wrote:

> Unsubscribe
>


Re: [SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-08 Thread Jacek Laskowski
Hi Michael,

That reflects my sentiments so well. Thanks for having confirmed my thoughts!

https://issues.apache.org/jira/browse/SPARK-21667

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Aug 8, 2017 at 12:37 AM, Michael Armbrust
 wrote:
> I think there is really no good reason for this limitation.
>
> On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> While exploring checkpointing with kafka source and console sink I've
>> got the exception:
>>
>> // today's build from the master
>> scala> spark.version
>> res8: String = 2.3.0-SNAPSHOT
>>
>> scala> val q = records.
>>  |   writeStream.
>>  |   format("console").
>>  |   option("truncate", false).
>>  |   option("checkpointLocation", "/tmp/checkpoint"). // <--
>> checkpoint directory
>>  |   trigger(Trigger.ProcessingTime(10.seconds)).
>>  |   outputMode(OutputMode.Update).
>>  |   start
>> org.apache.spark.sql.AnalysisException: This query does not support
>> recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
>> start over.;
>>   at
>> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222)
>>   at
>> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>>   at
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
>>   ... 61 elided
>>
>> The "trigger" is the change
>> https://issues.apache.org/jira/browse/SPARK-16116 and this line in
>> particular
>> https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277.
>>
>> Why is this needed? I can't think of a use case where console sink
>> could not recover from checkpoint location (since all the information
>> is available). I'm lost on it and would appreciate some help (to
>> recover :))
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Matrix multiplication and cluster / partition / blocks configuration

2017-08-08 Thread Phillip Henry
Hi, John.

I've had similar problems. IIRC, the driver was GCing madly. I don't know
why the driver was doing so much work but I quickly implemented an
alternative approach. The code I wrote belongs to my client but I wrote
something that should be equivalent. It can be found at:

https://github.com/PhillHenry/Algernon

It's not terribly complicated and you could roll-your-own if you prefer
(the rough idea can be found at
http://javaagile.blogspot.co.at/2016/11/an-alternative-approach-to-matrices-in.html).
But anyway, I got good performance this way.

Phill


On Thu, May 11, 2017 at 10:12 PM, John Compitello 
wrote:

> Hey all,
>
> I’ve found myself in a position where I need to do a relatively large
> matrix multiply (at least, compared to what I normally have to do). I’m
> looking to multiply a 100k by 500k dense matrix by its transpose to yield
> 100k by 100k matrix. I’m trying to do this on Google Cloud, so I don’t have
> any real limits on cluster size or memory. However, I have no idea where to
> begin as far as number of cores / number of partitions / how big to make
> the block size for best performance. Is there anywhere where Spark users
> collect optimal configurations for methods relative to data input size?
> Does anyone have any suggestions? I’ve tried throwing 900 cores at a 100k
> by 100k matrix multiply with 1000 by 1000 sized blocks, and that seemed to
> hang forever and eventually fail.
>
> Thanks ,
>
> John
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: tuning - Spark data serialization for cache() ?

2017-08-08 Thread Kazuaki Ishizaki
For Dataframe (and Dataset) cache(), neither Java nor Kryo serialization 
is used. There is no way to use Java or Kryo serialization for 
DataFrame.cache() or Dataset.cache() for in-memory.
Are you talking about serialization to Disk?  In previous mail, I talked 
about only in-memory.

Regards, 
Kazuaki Ishizaki



From:   Ofir Manor 
To: Kazuaki Ishizaki 
Cc: user 
Date:   2017/08/08 03:12
Subject:Re: tuning - Spark data serialization for cache() ?



Thanks a lot for the quick pointer!
So, is the advice I linked to in official Spark 2.2 documentation 
misleading? You are saying that Spark 2.2 does not use by Java 
serialization? And the tip to switch to Kyro is also outdated?

Ofir Manor
Co-Founder & CTO | Equalum
Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io

On Mon, Aug 7, 2017 at 8:47 PM, Kazuaki Ishizaki  
wrote:
For Dataframe (and Dataset), cache() already uses fast 
serialization/deserialization with data compression schemes.

We already identified some performance issues regarding cache(). We are 
working for alleviating these issues in 
https://issues.apache.org/jira/browse/SPARK-14098.
We expect that these PRs will be integrated into Spark 2.3.

Kazuaki Ishizaki



From:Ofir Manor 
To:user 
Date:2017/08/08 02:04
Subject:tuning - Spark data serialization for cache() ?




Hi,
I'm using Spark 2.2, and have a big batch job, using dataframes (with 
built-in, basic types). It references the same intermediate dataframe 
multiple times, so I wanted to try to cache() that and see if it helps, 
both in memory footprint and performance.

Now, the Spark 2.2 tuning page (
http://spark.apache.org/docs/latest/tuning.html) clearly says:
1. The default Spark serialization is Java serialization.
2. It is recommended to switch to Kyro serialization.
3. "Since Spark 2.0.0, we internally use Kryo serializer when shuffling 
RDDs with simple types, arrays of simple types, or string type".

Now, I remember that in 2.0 launch, there were discussion of a third 
serialization format that is much more performant and compact. (Encoder?), 
but it is not referenced in the tuning guide and its Scala doc is not very 
clear to me. Specifically, Databricks shared some graphs etc of how much 
it is better than Kyro and Java serialization - see Encoders here:
https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html


So, is that relevant to cache()? If so, how can I enable it - and is it 
for MEMORY_AND_DISK_ONLY or MEMORY_AND_DISK_SER?

I tried to play with some other variations, like enabling Kyro by the 
tuning guide instructions, but didn't see any impact on the cached 
dataframe size (same tens of GBs in the UI). So any tips around that?

Thanks.
Ofir Manor
Co-Founder & CTO | Equalum
Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io