Workarounds for OOM during serialization

2018-02-02 Thread J. McConnell
It would seem that I have hit SPARK-10787, an OOME
during ClosureCleaner#ensureSerializable(). I am trying to run LSH over a
SparseVector consisting of ~4M features with no more than 3K non-zero
values per vector. I am hitting this OOME before even the hashes are
calculated.

I know the issue is reported and open. My question is whether there have
since been any decent workarounds found or if anyone has any suggestions on
how I can debug whether there are specific instances of data that are
causing my issue that I might filter out prior to hitting this point.

I have set spark.serializer.objectStreamReset to 1 and it did not help. I
am rather new to Spark and I'm having a tough time following exactly where
in the execution of the job this is occurring and whether there is anything
within my control to avoid it.

Any insights would be greatly appreciated.

Thanks,

- J.


can we expect UUID type in Spark 2.3?

2018-02-02 Thread kant kodali
Hi All,

can we expect UUID type in Spark 2.3? It looks like it can help lot of
downstream sources to model.

Thanks!


Running Spark 2.2.1 with extra packages

2018-02-02 Thread Conconscious
Hi list,

I have a Spark cluster with 3 nodes. I'm calling spark-shell with some
packages to connect to AWS S3 and Cassandra:

spark-shell \
  --packages
org.apache.hadoop:hadoop-aws:2.7.3,com.amazonaws:aws-java-sdk:1.7.4,datastax:spark-cassandra-connector:2.0.6-s_2.11
\
  --conf spark.cassandra.connection.host=10.100.120.100,10.100.120.101 \
  --conf spark.cassandra.auth.username=cassandra \
  --conf spark.cassandra.auth.password=cassandra \
  --master spark://10.100.120.104:7077

Then running this test app:

sc.stop
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.from_json
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._

import java.sql.Timestamp
import java.io.File

import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset
import org.apache.hadoop.fs

System.setProperty("com.amazonaws.services.s3.enableV4", "true")

val region = "eu-central-1"

val conf = new SparkConf(true).setMaster("local[*]").setAppName("S3
connect")

val sc = new SparkContext(conf)
   sc.setLocalProperty("spark.default.parallelism", "30")
   sc.hadoopConfiguration.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
   sc.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
   sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3." + region +
".amazonaws.com")

val sqlContext = new SQLContext(sc)
val s3r = sqlContext.read.json("s3a://mybucket/folder/file.json")
s3r.take(1)

With .setMaster("local[*]") the application runs nice, but removing the
setmaster and let the entire cluster work I'm getting:

WARN TaskSchedulerImpl: Initial job has not accepted any resources;
check your cluster UI to ensure that workers are registered and have
sufficient resources

How can I make my extra packages available to the entire cluster?

Thanks in advance



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Apache Spark - Spark Structured Streaming - Watermark usage

2018-02-02 Thread M Singh
Hi Vishu/Jacek:
Thanks for your responses.
Jacek - At the moment, the current time for my use case is processing time.

Vishnu - Spark documentation 
(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
 does indicate that it can dedup using watermark.  So I believe there are more 
use cases for watermark and that is what I am trying to find.
I am hoping that TD can clarify or point me to the documentation.
Thanks
 

On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath 
 wrote:
 

 Hi Mans,
Watermark is Spark is used to decide when to clear the state, so if the even it 
delayed more than when the state is cleared by Spark, then it will be ignored.I 
recently wrote a blog post on this : 
http://vishnuviswanath.com/spark_structured_streaming.html#watermark

Yes, this State is applicable for aggregation only. If you are having only a 
map function and don't want to process it, you could do a filter based on its 
EventTime field, but I guess you will have to compare it with the processing 
time since there is no API to access Watermark by the user. 
-Vishnu
On Fri, Jan 26, 2018 at 1:14 PM, M Singh  wrote:

Hi:
I am trying to filter out records which are lagging behind (based on event 
time) by a certain amount of time.  
Is the watermark api applicable to this scenario (ie, filtering lagging 
records) or it is only applicable with aggregation ?  I could not get a clear 
understanding from the documentation which only refers to it's usage with 
aggregation.

Thanks
Mans



   

[Spark Core] Limit the task duration (and kill it!)

2018-02-02 Thread Thomas Decaux
Hello,

I would like to limit task duration to prevent big task such as « SELECT * FROM 
toto » , or limit the CPU-time, then kill the task/job.

Is that possible ? (A kind of watch dog)

Many thanks,

Thomas Decaux

Re: spark 2.2.1

2018-02-02 Thread Mihai Iacob
Turns out it was the master recovery directory, that was messing things up. What was written there was on spark 2.0.2 and after replacing the master, the recovery process would fail with that error, but there were no clues that's what was happening.
 
Regards, 

Mihai IacobDSX Local - Security, IBM Analytics
 
 
- Original message -From: Bill Schwanitz To: Mihai Iacob Cc: User Subject: Re: spark 2.2.1Date: Fri, Feb 2, 2018 8:23 AM 
What version of java? 
 
On Feb 1, 2018 11:30 AM, "Mihai Iacob"  wrote:

I am setting up a spark 2.2.1 cluster, however, when I bring up the master and workers (both on spark 2.2.1) I get this error. I tried spark 2.2.0 and get the same error. It works fine on spark 2.0.2. Have you seen this before, any idea what's wrong?
 
I found this, but it's in a different situation: https://github.com/apache/spark/pull/19802
 
18/02/01 05:07:22 ERROR Utils: Exception encountered
java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = -1223633663228316618, local class serialVersionUID = 1835832137613908542
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:687)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:563)
        at org.apache.spark.deploy.master.WorkerInfo$$anonfun$readObject$1.apply$mcV$sp(WorkerInfo.scala:52)
        at org.apache.spark.deploy.master.WorkerInfo$$anonfun$readObject$1.apply(WorkerInfo.scala:51)
        at org.apache.spark.deploy.master.WorkerInfo$$anonfun$readObject$1.apply(WorkerInfo.scala:51)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
        at org.apache.spark.deploy.master.WorkerInfo.readObject(WorkerInfo.scala:51)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.deploy.master.FileSystemPersistenceEngine.org$apache$spark$deploy$master$FileSystemPersistenceEngine$$deserializeFromFile(FileSystemPersistenceEngine.scala:80)
        at org.apache.spark.deploy.master.FileSystemPersistenceEngine$$anonfun$read$1.apply(FileSystemPersistenceEngine.scala:56)
        at org.apache.spark.deploy.master.FileSystemPersistenceEngine$$anonfun$read$1.apply(FileSystemPersistenceEngine.scala:56)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.deploy.master.FileSystemPersistenceEngine.read(FileSystemPersistenceEngine.scala:56)
        at org.apache.spark.deploy.master.PersistenceEngine$$anonfun$readPersistedData$1.apply(PersistenceEngine.scala:87)
        at org.apache.spark.deploy.master.PersistenceEngine$$anonfun$readPersistedData$1.apply(PersistenceEngine.scala:86)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:316)
       packet_write_wait: Connection to 9.30.118.193 port 22: Broken 

Re: spark 2.2.1

2018-02-02 Thread Bill Schwanitz
What version of java?

On Feb 1, 2018 11:30 AM, "Mihai Iacob"  wrote:

> I am setting up a spark 2.2.1 cluster, however, when I bring up the master
> and workers (both on spark 2.2.1) I get this error. I tried spark 2.2.0 and
> get the same error. It works fine on spark 2.0.2. Have you seen this
> before, any idea what's wrong?
>
> I found this, but it's in a different situation: https://github.com/
> apache/spark/pull/19802
>
>
> 18/02/01 05:07:22 ERROR Utils: Exception encountered
>
> java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local
> class incompatible: stream classdesc serialVersionUID =
> -1223633663228316618, local class serialVersionUID = 1835832137613908542
>
> at java.io.ObjectStreamClass.initNonProxy(
> ObjectStreamClass.java:687)
>
> at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:1885)
>
> at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1751)
>
> at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:1885)
>
> at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1751)
>
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2042)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1573)
>
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2287)
>
> at java.io.ObjectInputStream.defaultReadObject(
> ObjectInputStream.java:563)
>
> at org.apache.spark.deploy.master.WorkerInfo$$anonfun$
> readObject$1.apply$mcV$sp(WorkerInfo.scala:52)
>
> at org.apache.spark.deploy.master.WorkerInfo$$anonfun$
> readObject$1.apply(WorkerInfo.scala:51)
>
> at org.apache.spark.deploy.master.WorkerInfo$$anonfun$
> readObject$1.apply(WorkerInfo.scala:51)
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
>
> at org.apache.spark.deploy.master.WorkerInfo.readObject(
> WorkerInfo.scala:51)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at java.io.ObjectStreamClass.invokeReadObject(
> ObjectStreamClass.java:1158)
>
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2178)
>
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2069)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1573)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:433)
>
> at org.apache.spark.serializer.JavaDeserializationStream.
> readObject(JavaSerializer.scala:75)
>
> at org.apache.spark.deploy.master.FileSystemPersistenceEngine.org
> $apache$spark$deploy$master$FileSystemPersistenceEngine$$
> deserializeFromFile(FileSystemPersistenceEngine.scala:80)
>
> at org.apache.spark.deploy.master.FileSystemPersistenceEngine$$
> anonfun$read$1.apply(FileSystemPersistenceEngine.scala:56)
>
> at org.apache.spark.deploy.master.FileSystemPersistenceEngine$$
> anonfun$read$1.apply(FileSystemPersistenceEngine.scala:56)
>
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>
> at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(
> ArrayOps.scala:186)
>
> at scala.collection.TraversableLike$class.map(
> TraversableLike.scala:234)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>
> at org.apache.spark.deploy.master.FileSystemPersistenceEngine.
> read(FileSystemPersistenceEngine.scala:56)
>
> at org.apache.spark.deploy.master.PersistenceEngine$$
> anonfun$readPersistedData$1.apply(PersistenceEngine.scala:87)
>
> at org.apache.spark.deploy.master.PersistenceEngine$$
> anonfun$readPersistedData$1.apply(PersistenceEngine.scala:86)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>
> at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(
> NettyRpcEnv.scala:316)
>
>packet_write_wait: Connection to 9.30.118.193 port 22: Broken
> pipeData(PersistenceEngine.scala:86)
> ​​​
>
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local  - Security, IBM Analytics
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Kryo serialization failed: Buffer overflow : Broadcast Join

2018-02-02 Thread Pralabh Kumar
I am using spark 2.1.0

On Fri, Feb 2, 2018 at 5:08 PM, Pralabh Kumar 
wrote:

> Hi
>
> I am performing broadcast join where my small table is 1 gb .  I am
> getting following error .
>
> I am using
>
>
> org.apache.spark.SparkException:
> . Available: 0, required: 28869232. To avoid this, increase
> spark.kryoserializer.buffer.max value
>
>
>
> I increase the value to
>
> spark.conf.set("spark.kryoserializer.buffer.max","2g")
>
>
> But I am still getting the error .
>
> Please help
>
> Thx
>


Kryo serialization failed: Buffer overflow : Broadcast Join

2018-02-02 Thread Pralabh Kumar
Hi

I am performing broadcast join where my small table is 1 gb .  I am getting
following error .

I am using


org.apache.spark.SparkException:
. Available: 0, required: 28869232. To avoid this, increase
spark.kryoserializer.buffer.max value



I increase the value to

spark.conf.set("spark.kryoserializer.buffer.max","2g")


But I am still getting the error .

Please help

Thx


Re: Prefer Structured Streaming over Spark Streaming (DStreams)?

2018-02-02 Thread Biplob Biswas
Great to hear 2 different viewpoints, and thanks a lot for your input
Michael. For now, our application perform an etl process where it reads
data from kafka and stores it in HBase and then performs basic enhancement
and pushes data out on a kafka topic.

We have a conflict of opinion here as few people want to go with DStreams
stating that it provides the primitive rdd abstraction and functionality is
better and easier than structured streaming. We don't have any event time
requirement and also not using windowing mechanism, some basic grouping,
enhancement and storing.

Thats why the question was directed towards Structured Streaming vs
DStreams.

Also, when you say,

> Structured streaming is a completely new implementation that does not use
> DStreams at all, but instead directly runs jobs using RDDs

I understand it doesn't it doesn't use Dstream but I thought Structured
Streaming runs jobs on RDD's via dataframes and in the future, if the RDD
abstraction needs to be switched, it will be done by removing RDD with
something else. Please correct me if I understood this wrong.

Thanks & Regards
Biplob Biswas

On Thu, Feb 1, 2018 at 12:12 AM, Michael Armbrust 
wrote:

> At this point I recommend that new applications are built using structured
> streaming. The engine was GA-ed as of Spark 2.2 and I know of several very
> large (trillions of records) production jobs that are running in Structured
> Streaming.  All of our production pipelines at databricks are written using
> structured streaming as well.
>
> Regarding the comparison with RDDs: The situation here is different than
> when thinking about batch DataFrames vs. RDDs.  DataFrames are "just" a
> higher-level abstraction on RDDs.  Structured streaming is a completely new
> implementation that does not use DStreams at all, but instead directly runs
> jobs using RDDs.  The advantages over DStreams include:
>  - The ability to start and stop individual queries (rather than needing
> to start/stop a separate StreamingContext)
>  - The ability to upgrade your stream and still start from an existing
> checkpoint
>  - Support for working with Spark SQL data sources (json, parquet, etc)
>  - Higher level APIs (DataFrames and SQL) and lambda functions (Datasets)
>  - Support for event time aggregation
>
> At this point, with the addition of mapGroupsWithState and
> flatMapGroupsWithState, I think we should be at feature parity with
> DStreams (and the state store does incremental checkpoints that are more
> efficient than the DStream store).  However if there are applications you
> are having a hard time porting over, please let us know!
>
> On Wed, Jan 31, 2018 at 5:42 AM, vijay.bvp  wrote:
>
>> here is my two cents, experts please correct me if wrong
>>
>> its important to understand why one over other and for what kind of use
>> case. There might be sometime in future where low level API's are
>> abstracted
>> and become legacy but for now in Spark RDD API is the core and low level
>> API, all higher APIs translate to RDD ultimately,  and RDD's are
>> immutable.
>>
>> https://spark.apache.org/docs/latest/structured-streaming-pr
>> ogramming-guide.html#unsupported-operations
>> these are things that are not supported and this list needs to be
>> validated
>> with the use case you have.
>>
>> From my experience Structured Streaming is still new and DStreams API is a
>> matured API.
>> some things that are missing or need to explore more.
>>
>> watermarking/windowing based on no of records in a particular window
>>
>> assuming you have watermark and windowing on event time of the data,  the
>> resultant dataframe is grouped data set, only thing you can do is run
>> aggregate functions. you can't simply use that output as another dataframe
>> and manipulate. There is a custom aggregator but I feel its limited.
>>
>> https://spark.apache.org/docs/latest/structured-streaming-pr
>> ogramming-guide.html#arbitrary-stateful-operations
>> There is option to do stateful operations, using GroupState where the
>> function gets iterator of events for that window. This is the closest
>> access
>> to StateStore a developer could get.
>> This arbitrary state that programmer could keep across invocations has its
>> limitations as such how much state we could keep?, is that state stored in
>> driver memory? What happens if the spark job fails is this checkpointed or
>> restored?
>>
>> thanks
>> Vijay
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>