Re: A question about rdd transformation

2017-06-22 Thread Lionel Luffy
Now I found the root cause is a Wrapper class in AnyRef is not
Serializable, but even though I changed it to implements Serializable. the
'rows' still cannot get data... Any suggestion?

On Fri, Jun 23, 2017 at 10:56 AM, Lionel Luffy  wrote:

> Hi there,
> I'm trying to do below action while it always return 
> java.io.NotSerializableException
> in the shuffle task.
> I've checked that Array is serializable. how can I get the data of rdd in
> newRDD?
>
> step 1: val rdd: RDD[(AnyRef, Array[AnyRef]] {..}
>
> step2 :   rdd
>  .partitionBy(partitioner)
>  .map(_._2)
>
> step3:  pass rdd to newRDD as prev:
> newRDD[K, V] (
> xxx,
> xxx,
> xxx,
> prev: RDD[Array[AnyRef]] extends RDD[(K, V)] (prev) {
>
> override protected def getPartitions() {...}
>
> override def compute(split: Partition, context: TaskContext): Iterator[(K,
> V)] {...
>   val rows = firstParent[Array[AnyRef]].iterator(split, context)
>
>}
>
> }
>
>
> Thanks,
> LL
>


A question about rdd transformation

2017-06-22 Thread Lionel Luffy
Hi there,
I'm trying to do below action while it always return
java.io.NotSerializableException in the shuffle task.
I've checked that Array is serializable. how can I get the data of rdd in
newRDD?

step 1: val rdd: RDD[(AnyRef, Array[AnyRef]] {..}

step2 :   rdd
 .partitionBy(partitioner)
 .map(_._2)

step3:  pass rdd to newRDD as prev:
newRDD[K, V] (
xxx,
xxx,
xxx,
prev: RDD[Array[AnyRef]] extends RDD[(K, V)] (prev) {

override protected def getPartitions() {...}

override def compute(split: Partition, context: TaskContext): Iterator[(K,
V)] {...
  val rows = firstParent[Array[AnyRef]].iterator(split, context)

   }

}


Thanks,
LL


Re: Using Spark with Local File System/NFS

2017-06-22 Thread Michael Mior
If you put a * in the path, Spark will look for a file or directory named
*. To read all the files in a directory, just remove the star.

--
Michael Mior
michael.m...@gmail.com

On Jun 22, 2017 17:21, "saatvikshah1994"  wrote:

> Hi,
>
> I've downloaded and kept the same set of data files on all my cluster
> nodes,
> in the same absolute path - say /home/xyzuser/data/*. I am now trying to
> perform an operation(say open(filename).read()) on all these files in
> spark,
> but by passing local file paths. I was under the assumption that as long as
> the worker can find the file path it will be able to execute it. However,
> my
> Spark tasks fail with the error(/home/xyzuser/data/* is not present) - and
> Im sure its present on all my worker nodes.
>
> If this experiment was successful I was planning to setup a NFS (actually
> more like a read-only cloud persistent disk connected to my cluster nodes
> in
> dataproc) and use that instead.
>
> What exactly is going wrong here?
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Using-Spark-with-Local-File-System-NFS-tp28781.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Using Spark with Local File System/NFS

2017-06-22 Thread saatvikshah1994
Hi,

I've downloaded and kept the same set of data files on all my cluster nodes,
in the same absolute path - say /home/xyzuser/data/*. I am now trying to
perform an operation(say open(filename).read()) on all these files in spark,
but by passing local file paths. I was under the assumption that as long as
the worker can find the file path it will be able to execute it. However, my
Spark tasks fail with the error(/home/xyzuser/data/* is not present) - and
Im sure its present on all my worker nodes.

If this experiment was successful I was planning to setup a NFS (actually
more like a read-only cloud persistent disk connected to my cluster nodes in
dataproc) and use that instead.

What exactly is going wrong here?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-with-Local-File-System-NFS-tp28781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark submit - org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run error

2017-06-22 Thread Kanagha Kumar
Hi,

I am *intermittently* seeing this error while doing spark-submit for spark
2.0.2-scala 2.11 version.

I see the same issue reported in
https://issues.apache.org/jira/browse/SPARK-18343 and it seems to be
RESOLVED.  I can run successfully most of the time though. Hence I'm unsure
if it is because of any submodule version incorrectness?

Jun 22, 2017 11:01:15 PM org.apache.hadoop.mapred.Task taskCleanup
INFO: Runnning cleanup for the task
Jun 22, 2017 11:01:15 PM org.apache.hadoop.metrics2.impl.MetricsSystemImpl
stop
INFO: Stopping MapTask metrics system...
Jun 22, 2017 11:01:15 PM org.apache.hadoop.metrics2.impl.MetricsSystemImpl
stop
INFO: MapTask metrics system stopped.
Jun 22, 2017 11:01:15 PM org.apache.hadoop.metrics2.impl.MetricsSystemImpl
shutdown
INFO: MapTask metrics system shutdown complete.
Jun 22, 2017 11:01:17 PM org.apache.hadoop.mapred.Task run
INFO: Communication exception: java.io.IOException: The client is stopped
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1507)
at org.apache.hadoop.ipc.Client.call(Client.java:1451)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at
org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:242)
at com.sun.proxy.$Proxy9.ping(Unknown Source)
at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:767)
at java.lang.Thread.run(Thread.java:748)

Jun 22, 2017 11:01:20 PM org.apache.hadoop.mapred.Task run
INFO: Communication exception: java.io.IOException: The client is stopped
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1507)
at org.apache.hadoop.ipc.Client.call(Client.java:1451)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at
org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:242)
at com.sun.proxy.$Proxy9.ping(Unknown Source)
at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:767)
at java.lang.Thread.run(Thread.java:748)

Jun 22, 2017 11:01:23 PM org.apache.hadoop.mapred.Task run
INFO: Communication exception: java.io.IOException: The client is stopped
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1507)
at org.apache.hadoop.ipc.Client.call(Client.java:1451)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at
org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:242)
at com.sun.proxy.$Proxy9.ping(Unknown Source)
at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:767)
at java.lang.Thread.run(Thread.java:748)

Jun 22, 2017 11:01:23 PM org.apache.hadoop.mapred.Task logThreadInfo
INFO: Process Thread Dump: Communication exception
12 active threads
Thread 35 (DestroyJavaVM):
  State: RUNNABLE
  Blocked count: 0
  Waited count: 0
  Stack:
Thread 33 (LeaseRenewer:kpra...@kprasad-ltm.internal.salesforce.com:8020):
  State: TIMED_WAITING
  Blocked count: 0
  Waited count: 8
  Stack:
java.lang.Thread.sleep(Native Method)
org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:444)
org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)
org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:304)
java.lang.Thread.run(Thread.java:748)
Thread 31 (IPC Client (1041365481) connection to
kprasad-ltm.internal.salesforce.com/10.3.28.87:8020 from kprasad):
  State: TIMED_WAITING
  Blocked count: 2
  Waited count: 3
  Stack:
java.lang.Object.wait(Native Method)
org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:933)
org.apache.hadoop.ipc.Client$Connection.run(Client.java:978)
Thread 29 (pool-5-thread-1):
  State: TIMED_WAITING
  Blocked count: 0
  Waited count: 1
  Stack:
java.lang.Thread.sleep(Native Method)
java.lang.Thread.sleep(Thread.java:340)
java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)

gridforce.spark.delegate.SparkScriptRunner$KeepAliveThread.run(SparkScriptRunner.java:622)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)

java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)
Thread 24 (org.apache.hadoop.hdfs.PeerCache@1c80e49b):
  State: TIMED_WAITING
  Blocked count: 0
  Waited count: 23
  Stack:
java.lang.Thread.sleep(Native Method)
org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:255)
org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:46)
org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:124)
java.lang.Thread.run(Thread.java:748)
Thread 22 (communication thread):
  State: RUNNABLE
  Blocked count: 20
  Waited count: 63
  Stack:
sun.management.ThreadImpl.getThreadInfo1(Native Method)

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-22 Thread Tathagata Das
Unfortunately, I am out of ideas. I dont know whats going wrong.
If you can, try using Structured Streaming. We are more active on the
Structured streaming project.

On Thu, Jun 22, 2017 at 4:07 PM, swetha kasireddy  wrote:

> Hi TD,
>
> I am still seeing this issue with any immuatble DataStructure. Any idea
> why this happens? I use scala.collection.immutable.List[String])  and my
> reduce and inverse reduce does the following.
>
> visitorSet1 ++visitorSet2
>
>
>
> visitorSet1.filterNot(visitorSet2.contains(_)
>
>
>
> On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> I changed the datastructure to scala.collection.immutable.Set and I still
>> see the same issue. My key is a String.  I do the following in my reduce
>> and invReduce.
>>
>> visitorSet1 ++visitorSet2.toTraversable
>>
>>
>> visitorSet1 --visitorSet2.toTraversable
>>
>> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes, and in general any mutable data structure. You have to immutable
>>> data structures whose hashcode and equals is consistent enough for being
>>> put in a set.
>>>
>>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" 
>>> wrote:
>>>
 Are you suggesting against the usage of HashSet?

 On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> This may be because of HashSet is a mutable data structure, and it
> seems you are actually mutating it in "set1 ++set2". I suggest creating a
> new HashMap in the function (and add both maps into it), rather than
> mutating one of them.
>
> On Tue, Jun 6, 2017 at 11:30 AM, SRK 
> wrote:
>
>> Hi,
>>
>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>> Streaming app. I use reduce, invReduce and filterFunction as shown
>> below.
>> Any idea as to why I get the error?
>>
>>  java.lang.Exception: Neither previous window has value for key, nor
>> new
>> values found. Are you sure your key class hashes consistently?
>>
>>
>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>> HashSet[String]))
>> => (Long, HashSet[String])= {
>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>> Long,
>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>> ++set2 )
>>
>>   }
>>
>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>> HashSet[String])) => (Long, HashSet[String])= {
>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>> Long,
>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>> set1.diff(set2))
>>   }
>>
>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>> (Boolean)= {
>> case ((metricName:String, (timeStamp: Long, set:
>> HashSet[String]))) =>
>> set.size>0
>>   }
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>> ndow-in-Spark-Streaming-tp28748.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>

>>
>


Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-22 Thread swetha kasireddy
Hi TD,

I am still seeing this issue with any immuatble DataStructure. Any idea why
this happens? I use scala.collection.immutable.List[String])  and my reduce
and inverse reduce does the following.

visitorSet1 ++visitorSet2



visitorSet1.filterNot(visitorSet2.contains(_)



On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy 
wrote:

> I changed the datastructure to scala.collection.immutable.Set and I still
> see the same issue. My key is a String.  I do the following in my reduce
> and invReduce.
>
> visitorSet1 ++visitorSet2.toTraversable
>
>
> visitorSet1 --visitorSet2.toTraversable
>
> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das  > wrote:
>
>> Yes, and in general any mutable data structure. You have to immutable
>> data structures whose hashcode and equals is consistent enough for being
>> put in a set.
>>
>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" 
>> wrote:
>>
>>> Are you suggesting against the usage of HashSet?
>>>
>>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 This may be because of HashSet is a mutable data structure, and it
 seems you are actually mutating it in "set1 ++set2". I suggest creating a
 new HashMap in the function (and add both maps into it), rather than
 mutating one of them.

 On Tue, Jun 6, 2017 at 11:30 AM, SRK  wrote:

> Hi,
>
> I see the following error when I use ReduceByKeyAndWindow in my Spark
> Streaming app. I use reduce, invReduce and filterFunction as shown
> below.
> Any idea as to why I get the error?
>
>  java.lang.Exception: Neither previous window has value for key, nor
> new
> values found. Are you sure your key class hashes consistently?
>
>
>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
> HashSet[String]))
> => (Long, HashSet[String])= {
> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
> ++set2 )
>
>   }
>
>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
> HashSet[String])) => (Long, HashSet[String])= {
> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
> set1.diff(set2))
>   }
>
>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
> (Boolean)= {
> case ((metricName:String, (timeStamp: Long, set:
> HashSet[String]))) =>
> set.size>0
>   }
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
> ndow-in-Spark-Streaming-tp28748.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

>>>
>


Re: Merging multiple Pandas dataframes

2017-06-22 Thread Saatvik Shah
Hi Assaf,

Thanks for your suggestion.

I also found one other improvement which is to iteratively convert Pandas
DFs to RDDs and take a union of those(similar to dataframes). Basically
calling createDataFrame is heavy + checkpointing of DataFrames is a brand
new feature. Instead create a huge union of RDDs and finally apply
createDataFrame in the end.

Thanks and Regards,
Saatvik

On Wed, Jun 21, 2017 at 2:03 AM, Mendelson, Assaf 
wrote:

> If you do an action, most intermediate calculations would be gone for the
> next iteration.
>
> What I would do is persist every iteration, then after some (say 5) I
> would write to disk and reload. At that point you should call unpersist to
> free the memory as it is no longer relevant.
>
>
>
> Thanks,
>
>   Assaf.
>
>
>
> *From:* Saatvik Shah [mailto:saatvikshah1...@gmail.com]
> *Sent:* Tuesday, June 20, 2017 8:50 PM
> *To:* Mendelson, Assaf
> *Cc:* user@spark.apache.org
> *Subject:* Re: Merging multiple Pandas dataframes
>
>
>
> Hi Assaf,
>
> Thanks for the suggestion on checkpointing - I'll need to read up more on
> that.
>
> My current implementation seems to be crashing with a GC memory limit
> exceeded error if Im keeping multiple persist calls for a large number of
> files.
>
>
>
> Thus, I was also thinking about the constant calls to persist. Since all
> my actions are Spark transformations(union of large number of Spark
> Dataframes from Pandas dataframes), this entire process of building a large
> Spark dataframe is essentially a huge transformation. Is it necessary to
> call persist between unions? Shouldnt I instead wait for all the unions to
> complete and call persist finally?
>
>
>
>
>
> On Tue, Jun 20, 2017 at 2:52 AM, Mendelson, Assaf 
> wrote:
>
> Note that depending on the number of iterations, the query plan for the
> dataframe can become long and this can cause slowdowns (or even crashes).
> A possible solution would be to checkpoint (or simply save and reload the
> dataframe) every once in a while. When reloading from disk, the newly
> loaded dataframe's lineage is just the disk...
>
> Thanks,
>   Assaf.
>
>
> -Original Message-
> From: saatvikshah1994 [mailto:saatvikshah1...@gmail.com]
> Sent: Tuesday, June 20, 2017 2:22 AM
> To: user@spark.apache.org
> Subject: Merging multiple Pandas dataframes
>
> Hi,
>
> I am iteratively receiving a file which can only be opened as a Pandas
> dataframe. For the first such file I receive, I am converting this to a
> Spark dataframe using the 'createDataframe' utility function. The next file
> onward, I am converting it and union'ing it into the first Spark
> dataframe(the schema always stays the same). After each union, I am
> persisting it in memory(MEMORY_AND_DISK_ONLY level). After I have converted
> all such files to a single spark dataframe I am coalescing it. Following
> some tips from this Stack Overflow
> post(https://stackoverflow.com/questions/39381183/
> managing-spark-partitions-after-dataframe-unions).
>
> Any suggestions for optimizing this process further?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Merging-multiple-Pandas-dataframes-tp28770.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
> --
>
> *Saatvik Shah,*
>
> *1st  Year,*
>
> *Masters in the School of Computer Science,*
>
> *Carnegie Mellon University*
>
> *https://saatvikshah1994.github.io/ *
>



-- 
*Saatvik Shah,*
*1st  Year,*
*Masters in the School of Computer Science,*
*Carnegie Mellon University*

*https://saatvikshah1994.github.io/ *


Re: Trouble with PySpark UDFs and SPARK_HOME only on EMR

2017-06-22 Thread Nicholas Chammas
Here’s a repro for a very similar issue where Spark hangs on the UDF, which
I think is related to the SPARK_HOME issue. I posted the repro on the EMR
forum ,
but in case you can’t access it:

   1. I’m running EMR 5.6.0, Spark 2.1.1, and Python 3.5.1.
   2. Create a simple Python package by creating a directory called udftest.
   3. Inside udftest put an empty __init__.py and a nothing.py.
   4.

   nothing.py should have the following contents:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def do_nothing(s: int) -> int:
return s

do_nothing_udf = udf(do_nothing, IntegerType())

   5.

   From your home directory (the one that contains your udftest package),
   create a ZIP that we will ship to YARN.

pushd udftest/
zip -rq ../udftest.zip *
popd

   6.

   Start a PySpark shell with our test package.

export PYSPARK_PYTHON=python3
pyspark \
  --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=$PYSPARK_PYTHON" \
  --archives "udftest.zip#udftest"

   7.

   Now try to use the UDF. It will hang.

from udftest.nothing import do_nothing_udf
spark.range(10).select(do_nothing_udf('id')).show()  # hangs

   8.

   The strange thing is, if you define the exact same UDF directly in the
   active PySpark shell, it works fine! It’s only when you import it from a
   user-defined module that you see this issue.

​

On Thu, Jun 22, 2017 at 12:08 PM Nick Chammas 
wrote:

> I’m seeing a strange issue on EMR which I posted about here
> 
> .
>
> In brief, when I try to import a UDF I’ve defined, Python somehow fails to
> find Spark. This exact code works for me locally and works on our
> on-premises CDH cluster under YARN.
>
> This is the traceback:
>
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 318, in show
> print(self._jdf.showString(n, 20))
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", 
> line 1133, in __call__
>   File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
> return f(*a, **kw)
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 
> 319, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o89.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 
> (TID 3, ip-10-97-35-12.ec2.internal, executor 1): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/worker.py",
>  line 161, in main
> func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/worker.py",
>  line 91, in read_udfs
> _, udf = read_single_udf(pickleSer, infile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/worker.py",
>  line 78, in read_single_udf
> f, return_type = read_command(pickleSer, infile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/worker.py",
>  line 54, in read_command
> command = serializer._read_with_length(file)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/serializers.py",
>  line 169, in _read_with_length
> return self.loads(obj)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/serializers.py",
>  line 451, in loads
> return pickle.loads(obj, encoding=encoding)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/splinkr/person.py",
>  line 7, in 
> from splinkr.util import repartition_to_size
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/splinkr/util.py",
>  line 34, in 
> containsNull=False,
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/sql/functions.py",
>  line 1872, in udf
> return UserDefinedFunction(f, returnType)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/sql/functions.py",
>  line 1830, in __init__
> self._judf = 

Re: Flume DStream produces 0 records after HDFS node killed

2017-06-22 Thread N B
This issue got resolved.

I was able to trace it to the fact that the driver program's pom.xml was
pulling in Spark 2.1.1 which in turn was pulling in Hadoop 2.2.0.
Explicitly adding dependencies on Hadoop libraries 2.7.3 resolves it.

The following API in HDFS : DatanodeManager.getDatanodeStorageInfos() is
incompatible between clients using HDFS Client 2.2.0 and HDFS Server
running some later versions. I believe this issue should be raised with the
HDFS project, since HDFS 2.2.0 APIs were supposed to have been kept API and
Protocol compatible according to the Hadoop 2.2.0 release blurb:

>From http://hadoop.apache.org/releases.html :

"
15 October, 2013: Release 2.2.0 available

Apache Hadoop 2.2.0 is the *GA* release of Apache Hadoop 2.x.

Users are encouraged to immediately move to 2.2.0 since this release is
significantly more stable and is guaranteed to remain compatible in terms
of both APIs and protocols.
"

Thanks
N B


On Tue, Jun 20, 2017 at 11:36 PM, N B  wrote:

> Hadoop version 2.7.3
>
> On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin 
> wrote:
>
>> Which version of Hadoop are you running on?
>>
>> *Yohann Jardin*
>> Le 6/21/2017 à 1:06 AM, N B a écrit :
>>
>> Ok some more info about this issue to see if someone can shine a light on
>> what could be going on. I turned on debug logging for
>> org.apache.spark.streaming.scheduler in the driver process and this is
>> what gets thrown in the logs and keeps throwing it even after the downed
>> HDFS node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.
>>
>> 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning
>> - Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer())
>> to the WriteAheadLog.
>> org.apache.spark.SparkException: Exception thrown in awaitResult:
>> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
>> at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(
>> BatchedWriteAheadLog.scala:83)
>> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.wr
>> iteToLog(ReceivedBlockTracker.scala:234)
>> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cl
>> eanupOldBatches(ReceivedBlockTracker.scala:171)
>> at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanup
>> OldBlocksAndBatches(ReceiverTracker.scala:233)
>> at org.apache.spark.streaming.scheduler.JobGenerator.clearCheck
>> pointData(JobGenerator.scala:287)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org$apache
>> $spark$streaming$scheduler$JobGenerator$$processEvent(
>> JobGenerator.scala:187)
>> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:89)
>> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:88)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> Caused by: 
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
>> Missing storageIDs: It is likely that the HDFS client, who made this call,
>> is running in an older version of Hadoop which does not support storageIDs.
>> datanodeID.length=1, src=/vm/spark-checkpoint/recei
>> vedBlockMetadata/log-1497997390799-1497997450799, fileId=0,
>> blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872,
>> clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
>> at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManage
>> r.getDatanodeStorageInfos(DatanodeManager.java:514)
>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAddit
>> ionalDatanode(FSNamesystem.java:3353)
>> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.get
>> AdditionalDatanode(NameNodeRpcServer.java:759)
>> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServ
>> erSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProto
>> colServerSideTranslatorPB.java:515)
>> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocol
>> Protos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNam
>> enodeProtocolProtos.java)
>> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcIn
>> voker.call(ProtobufRpcEngine.java:616)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
>> upInformation.java:1698)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>
>> at org.apache.hadoop.ipc.Client.call(Client.java:1347)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(
>> ProtobufRpcEngine.java:206)
>> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
>> at 

Unsubscribe

2017-06-22 Thread LisTree Team
LisTree Team - Big Data Training Team


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2017-06-22 Thread LisTree Team
LisTree Team - Big Data Training Team

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Trouble with PySpark UDFs and SPARK_HOME only on EMR

2017-06-22 Thread Nick Chammas
I’m seeing a strange issue on EMR which I posted about here

.

In brief, when I try to import a UDF I’ve defined, Python somehow fails to
find Spark. This exact code works for me locally and works on our
on-premises CDH cluster under YARN.

This is the traceback:

Traceback (most recent call last):
  File "", line 1, in 
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",
line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py",
line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o89.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3
in stage 0.0 (TID 3, ip-10-97-35-12.ec2.internal, executor 1):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/worker.py",
line 161, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/worker.py",
line 91, in read_udfs
_, udf = read_single_udf(pickleSer, infile)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/worker.py",
line 78, in read_single_udf
f, return_type = read_command(pickleSer, infile)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/worker.py",
line 54, in read_command
command = serializer._read_with_length(file)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/serializers.py",
line 169, in _read_with_length
return self.loads(obj)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/serializers.py",
line 451, in loads
return pickle.loads(obj, encoding=encoding)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/splinkr/person.py",
line 7, in 
from splinkr.util import repartition_to_size
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/splinkr/util.py",
line 34, in 
containsNull=False,
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/sql/functions.py",
line 1872, in udf
return UserDefinedFunction(f, returnType)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/sql/functions.py",
line 1830, in __init__
self._judf = self._create_judf(name)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/sql/functions.py",
line 1834, in _create_judf
sc = SparkContext.getOrCreate()
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/context.py",
line 310, in getOrCreate
SparkContext(conf=conf or SparkConf())
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/context.py",
line 115, in __init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/context.py",
line 259, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_02/pyspark.zip/pyspark/java_gateway.py",
line 77, in launch_gateway
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
  File "/usr/lib64/python3.5/subprocess.py", line 950, in __init__
restore_signals, start_new_session)
  File "/usr/lib64/python3.5/subprocess.py", line 1544, in _execute_child
raise child_exception_type(errno_num, err_msg)
FileNotFoundError: [Errno 2] No such file or directory: './bin/spark-submit'

Does anyone have clues about what might be going on?

Nick
​




--
View this message in context: 

How does Spark deal with Data Skewness?

2017-06-22 Thread Sea aj
Hi everyone,

I have read about some interesting ideas on how to manage skew but I was
not sure if any of these techniques are being used in Spark 2.x versions or
not? To name a few, "Salting the Data" and "Dynamic Repartitioning" are
techniques introduced in Spark Summits. I am really curious to know whether
if Spark takes care of skew at all or not?





   Sent with Mailtrack



Re: Re: Re: spark2.1 kafka0.10

2017-06-22 Thread lk_spark
thank you Kumar , I will try it later.

2017-06-22 

lk_spark 



发件人:Pralabh Kumar 
发送时间:2017-06-22 20:20
主题:Re: Re: spark2.1 kafka0.10
收件人:"lk_spark"
抄送:"user.spark"

It looks like your replicas for partition are getting failed. If u have more 
brokers , can u try increasing ,replicas ,just to make sure atleast one leader 
is always available. 


On Thu, Jun 22, 2017 at 10:34 AM, lk_spark  wrote:

each topic have 5 partition  ,  2 replicas .

2017-06-22 

lk_spark 



发件人:Pralabh Kumar 
发送时间:2017-06-22 17:23
主题:Re: spark2.1 kafka0.10
收件人:"lk_spark"
抄送:"user.spark"

How many replicas ,you have for this topic . 


On Thu, Jun 22, 2017 at 9:19 AM, lk_spark  wrote:

java.lang.IllegalStateException: No current assignment for partition pages-2
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
 at scala.Option.orElse(Option.scala:289)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
 at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


2017-06-22 

lk_spark 



发件人:"lk_spark"
发送时间:2017-06-22 11:13
主题:spark2.1 kafka0.10
收件人:"user.spark"
抄送:

hi,all: 
when I run stream application for a few minutes ,I got this error : 

17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, 
weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, 
clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, 
profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, 
weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 
with generation 3
17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions 
[comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, 

Re: Why my project has this kind of error ?

2017-06-22 Thread satish lalam
Minglei - You could check your jdk path and scala library setting in
project structure. i.e., project view (alt + 1), and then pressing F4 to
open Project structure... look under SDKs and Libraries.

On Mon, Jun 19, 2017 at 10:54 PM, 张明磊  wrote:

> Hello to all,
>
> Below is my issue. I have already build again and reimport my project in
> IntelliJIDEA, but it still gives me this kind of error. But I can build
> without error by Maven. Just the IDEA gives me this error. Is there anyone
> know what happened with this ?
>
>
> Thanks
> Minglei
>
>
>
>
>
>


Re: Re: spark2.1 kafka0.10

2017-06-22 Thread Pralabh Kumar
It looks like your replicas for partition are getting failed. If u have
more brokers , can u try increasing ,replicas ,just to make sure atleast
one leader is always available.

On Thu, Jun 22, 2017 at 10:34 AM, lk_spark  wrote:

> each topic have 5 partition  ,  2 replicas .
>
> 2017-06-22
> --
> lk_spark
> --
>
> *发件人:*Pralabh Kumar 
> *发送时间:*2017-06-22 17:23
> *主题:*Re: spark2.1 kafka0.10
> *收件人:*"lk_spark"
> *抄送:*"user.spark"
>
> How many replicas ,you have for this topic .
>
> On Thu, Jun 22, 2017 at 9:19 AM, lk_spark  wrote:
>
>> java.lang.IllegalStateException: No current assignment for partition
>> pages-2
>>  at org.apache.kafka.clients.consumer.internals.SubscriptionStat
>> e.assignedState(SubscriptionState.java:264)
>>  at org.apache.kafka.clients.consumer.internals.SubscriptionStat
>> e.needOffsetReset(SubscriptionState.java:336)
>>  at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(
>> KafkaConsumer.java:1236)
>>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
>> latestOffsets(DirectKafkaInputDStream.scala:197)
>>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
>> compute(DirectKafkaInputDStream.scala:214)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>  at org.apache.spark.streaming.dstream.DStream.createRDDWithLoca
>> lProperties(DStream.scala:415)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1.apply(DStream.scala:335)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1.apply(DStream.scala:333)
>>  at scala.Option.orElse(Option.scala:289)
>>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStr
>> eam.scala:330)
>>  at org.apache.spark.streaming.dstream.ForEachDStream.generateJo
>> b(ForEachDStream.scala:48)
>>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DSt
>> reamGraph.scala:117)
>>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DSt
>> reamGraph.scala:116)
>>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:241)
>>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:241)
>>  at scala.collection.mutable.ResizableArray$class.foreach(Resiza
>> bleArray.scala:59)
>>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>  at scala.collection.TraversableLike$class.flatMap(TraversableLi
>> ke.scala:241)
>>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>>  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStream
>> Graph.scala:116)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$
>> 3.apply(JobGenerator.scala:249)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$
>> 3.apply(JobGenerator.scala:247)
>>  at scala.util.Try$.apply(Try.scala:192)
>>  at org.apache.spark.streaming.scheduler.JobGenerator.generateJo
>> bs(JobGenerator.scala:247)
>>  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache
>> $spark$streaming$scheduler$JobGenerator$$processEvent(
>> JobGenerator.scala:183)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:89)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:88)
>>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> 2017-06-22
>> --
>> lk_spark
>> --
>>
>> *发件人:*"lk_spark"
>> *发送时间:*2017-06-22 11:13
>> *主题:*spark2.1 kafka0.10
>> *收件人:*"user.spark"
>> *抄送:*
>>
>> hi,all:
>> when I run stream application for a few minutes ,I got this error :
>>
>> 17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned
>> partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1,
>> weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4,
>> weibo-4, clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4,
>> weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0,
>> bizs-0, bizs-3, pages-1, weibo-1, pages-3, clicks-2, weibo-3, clicks-4,
>> comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1
>> 17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
>> 17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group
>> 

Re: Broadcasts & Storage Memory

2017-06-22 Thread Pralabh Kumar
Hi

Broadcast variables definitely store in the spark.memory.storageFraction .

1 If we go into the code of TorrentBroadcast.scala and writeBlocks method
and navigates to BlockManager to MemoryStore . Desearlization of the
variables occures in unroll memory and then transferred to storage memory .

memoryManager.synchronized {
  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)

val success = memoryManager.acquireStorageMemory(blockId, amount,
MemoryMode.ON_HEAP)


So definitely broadcast variables are stored in  spark.memory.storageFraction
.


Can u explain how are u seeing smaller set of memory used on given executor
for broadcast variables through UI ?

Regards
Pralabh Kumar

On Thu, Jun 22, 2017 at 4:39 AM, Bryan Jeffrey 
wrote:

> Satish,
>
> I agree - that was my impression too. However I am seeing a smaller set of
> storage memory used on a given executor than the amount of memory required
> for my broadcast variables. I am wondering if the statistics in the ui are
> incorrect or if the broadcasts are simply not a part of that storage memory
> fraction.
>
> Bryan Jeffrey
>
> Get Outlook for Android 
>
>
>
>
> On Wed, Jun 21, 2017 at 6:48 PM -0400, "satish lalam" <
> satish.la...@gmail.com> wrote:
>
> My understanding is - it from storageFraction. Here cached blocks are
>> immune to eviction - so both persisted RDDs and broadcast variables sit
>> here. Ref
>> 
>>
>>
>> On Wed, Jun 21, 2017 at 1:43 PM, Bryan Jeffrey 
>> wrote:
>>
>>> Hello.
>>>
>>> Question: Do broadcast variables stored on executors count as part of
>>> 'storage memory' or other memory?
>>>
>>> A little bit more detail:
>>>
>>> I understand that we have two knobs to control memory allocation:
>>> - spark.memory.fraction
>>> - spark.memory.storageFraction
>>>
>>> My understanding is that spark.memory.storageFraction controls the
>>> amount of memory allocated for cached RDDs.  spark.memory.fraction controls
>>> how much memory is allocated to Spark operations (task serialization,
>>> operations, etc.), w/ the remainder reserved for user data structures,
>>> Spark internal metadata, etc.  This includes the storage memory for cached
>>> RDDs.
>>>
>>> You end up with executor memory that looks like the following:
>>> All memory: 0-100
>>> Spark memory: 0-75
>>> RDD Storage: 0-37
>>> Other Spark: 38-75
>>> Other Reserved: 76-100
>>>
>>> Where do broadcast variables fall into the mix?
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>
>>