Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread N B
Thanks for the response, Conor. I tried with those settings and for a while
it seemed like it was cleaning up shuffle files after itself. However,
after exactly 5 hours later it started throwing exceptions and eventually
stopped working again. A sample stack trace is below. What is curious about
5 hours is that I set the cleaner ttl to 5 hours after changing the max
window size to 1 hour (down from 6 hours in order to test). It also stopped
cleaning the shuffle files after this started happening.

Any idea why this could be happening?

2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
java.lang.Exception: Could not compute split, block input-0-1429706099000
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Thanks
NB


On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com
wrote:

 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote:

 We already do have a cron job in place to clean just the shuffle files.
 However, what I would really like to know is whether there is a proper
 way of telling spark to clean up these files once its done with them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and
 not being cleaned up by Spark. Since this is a Spark streaming application,
 it is expected to stay up indefinitely, so shuffle files not being cleaned
 up is a big problem right now. Our max window size is 6 hours, so we have
 set up a cron job to clean up shuffle files older than 12 hours otherwise
 it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and
 meanwhile how to handle shuffle files will be greatly appreciated.

 Thanks
 NB










Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-21 Thread N B
We already do have a cron job in place to clean just the shuffle files.
However, what I would really like to know is whether there is a proper
way of telling spark to clean up these files once its done with them?

Thanks
NB


On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec
 rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and not
 being cleaned up by Spark. Since this is a Spark streaming application, it
 is expected to stay up indefinitely, so shuffle files not being cleaned up
 is a big problem right now. Our max window size is 6 hours, so we have set
 up a cron job to clean up shuffle files older than 12 hours otherwise it
 will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and meanwhile
 how to handle shuffle files will be greatly appreciated.

 Thanks
 NB








Shuffle files not cleaned up (Spark 1.2.1)

2015-04-20 Thread N B
Hi all,

I had posed this query as part of a different thread but did not get a
response there. So creating a new thread hoping to catch someone's
attention.

We are experiencing this issue of shuffle files being left behind and not
being cleaned up by Spark. Since this is a Spark streaming application, it
is expected to stay up indefinitely, so shuffle files not being cleaned up
is a big problem right now. Our max window size is 6 hours, so we have set
up a cron job to clean up shuffle files older than 12 hours otherwise it
will eat up all our disk space.

Please see the following. It seems the non-cleaning of shuffle files is
being documented in 1.3.1.

https://github.com/apache/spark/pull/5074/files
https://issues.apache.org/jira/browse/SPARK-5836


Also, for some reason, the following JIRAs that were reported as functional
issues were closed as Duplicates of the above Documentation bug. Does this
mean that this issue won't be tackled at all?

https://issues.apache.org/jira/browse/SPARK-3563
https://issues.apache.org/jira/browse/SPARK-4796
https://issues.apache.org/jira/browse/SPARK-6011

Any further insight into whether this is being looked into and meanwhile
how to handle shuffle files will be greatly appreciated.

Thanks
NB


Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-24 Thread N B
Hi TD,

That little experiment helped a bit. This time we did not see any
exceptions for about 16 hours but eventually it did throw the same
exceptions as before. The cleaning of the shuffle files also stopped much
before these exceptions happened - about 7-1/2 hours after startup.

I am not quite sure how to proceed from here. Any suggestions on how to
avoid these errors?

Thanks
NB


On Fri, Apr 24, 2015 at 12:57 AM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 That may very well have been the case. There may be some delay on our
 output side. I have made a change just for testing that sends the output
 nowhere. I will see if that helps get rid of these errors. Then we can try
 to find out how we can optimize so that we do not lag.

 Questions: How can we ever be sure that a lag even if temporary never
 occur in the future? Also, should Spark not clean up any temp files that it
 still knows it might need in the (near?) future.

 Thanks
 Nikunj


 On Thu, Apr 23, 2015 at 12:29 PM, Tathagata Das t...@databricks.com
 wrote:

 What was the state of your streaming application? Was it falling behind
 with a large increasing scheduling delay?

 TD

 On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote:

 Thanks for the response, Conor. I tried with those settings and for a
 while it seemed like it was cleaning up shuffle files after itself.
 However, after exactly 5 hours later it started throwing exceptions and
 eventually stopped working again. A sample stack trace is below. What is
 curious about 5 hours is that I set the cleaner ttl to 5 hours after
 changing the max window size to 1 hour (down from 6 hours in order to
 test). It also stopped cleaning the shuffle files after this started
 happening.

 Any idea why this could be happening?

 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
 java.lang.Exception: Could not compute split, block
 input-0-1429706099000 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Thanks
 NB


 On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell 
 conor.fenn...@altocloud.com wrote:

 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote:

 We already do have a cron job in place to clean just the shuffle
 files. However, what I would really like to know is whether there is a
 proper way of telling spark to clean up these files once its done with
 them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {}
 \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d
 -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get
 a response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind
 and not being cleaned up by Spark. Since this is a Spark streaming
 application, it is expected to stay up indefinitely, so shuffle files 
 not
 being cleaned up is a big problem right now. Our max window size is 6
 hours, so we have set up a cron job to clean up shuffle files older 
 than 12
 hours otherwise it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files
 is being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK

Re: Broadcast variables can be rebroadcast?

2015-05-15 Thread N B
Thanks Ilya. Does one have to call broadcast again once the underlying data
is updated in order to get the changes visible on all nodes?

Thanks
NB


On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote:

 The broadcast variable is like a pointer. If the underlying data changes
 then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(), can
 it
 ever be updated again? The use case is for something like the underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread N B
Hi Imran,

If I understood you correctly, you are suggesting to simply call broadcast
again from the driver program. This is exactly what I am hoping will work
as I have the Broadcast data wrapped up and I am indeed (re)broadcasting
the wrapper over again when the underlying data changes. However,
documentation seems to suggest that one cannot re-broadcast. Is my
understanding accurate?

Thanks
NB


On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com wrote:

 Rather than updating the broadcast variable, can't you simply create a
 new one?  When the old one can be gc'ed in your program, it will also get
 gc'ed from spark's cache (and all executors).

 I think this will make your code *slightly* more complicated, as you need
 to add in another layer of indirection for which broadcast variable to use,
 but not too bad.  Eg., from

 var myBroadcast = sc.broadcast( ...)
 (0 to 20).foreach{ iteration =
   //  ... some rdd operations that involve myBroadcast ...
   myBroadcast.update(...) // wrong! dont' update a broadcast variable
 }

 instead do something like:

 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
  ...
 }

 var myBroadcast = sc.broadcast(...)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with
 whatever you need to update it
 }

 On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is broadcast
 in order to ensure that all nodes get the same value of the broadcast
 variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the underlying
 data is updated in order to get the changes visible on all nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. If the underlying data
 changes then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(),
 can it
 ever be updated again? The use case is for something like the
 underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Best Regards,
 Ayan Guha






Re: Timeout errors from Akka in Spark 1.2.1

2015-04-16 Thread N B
Hi Guillaume,

Interesting that you brought up Shuffle. In fact we are experiencing this
issue of shuffle files being left behind and not being cleaned up. Since
this is a Spark streaming application, it is expected to stay up
indefinitely, so shuffle files being left is a big problem right now. Since
our max window size is 6 hours, we have set a cron job to clean up shuffle
files older than 12 hours otherwise it will eat up all our disk space.

Please see the following. It seems the non-cleaning of shuffle files is
being documented in 1.3.1.

https://github.com/apache/spark/pull/5074/files
https://issues.apache.org/jira/browse/SPARK-5836


Also, for some reason, the following issues had been opened but were closed
as Duplicates of the above Documentation bug.

https://issues.apache.org/jira/browse/SPARK-3563
https://issues.apache.org/jira/browse/SPARK-4796
https://issues.apache.org/jira/browse/SPARK-6011

Any further insight into how to handle shuffle files will be greatly
appreciated.

Thanks
NB




On Fri, Apr 10, 2015 at 12:33 AM, Guillaume Pitel 
guillaume.pi...@exensa.com wrote:

  Hi,

 From my experience, the main causes of timeout are related to file
 cleanup, for instance after a shuffle. If your parallelism is very high and
 you didn't set the consolidate option, there are many files created by
 Spark, and when they are cleaned up, the calls starts timeouting. So you
 may find your solution by monitoring the folder where spark store its
 shuffles.

 Guillaume

 Thanks TD. I believe that might have been the issue. Will try for a few
 days after passing in the GC option on the java command line when we start
 the process.

  Thanks for your timely help.
 NB

 On Wed, Apr 8, 2015 at 6:08 PM, Tathagata Das t...@databricks.com wrote:

 Yes, in local mode they the driver and executor will be same the process.
 And in that case the Java options in  SparkConf configuration will not
 work.

 On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote:

 Since we are running in local mode, won't all the executors be in the
 same JVM as the driver?

  Thanks
  NB

 On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com
 wrote:

 Its does take effect on the executors, not on the driver. Which is okay
 because executors have all the data and therefore have GC issues, not so
 usually for the driver. If you want to double-sure, print the JVM flag
 (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)

 However, the GC i was referring to that initiates the RDD and shuffle
 cleanup was the GC on the driver. Thought I would clarify.

  TD

 On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. Since you mentioned GC, this got me thinking.

  Given that we are running in local mode (all in a single JVM) for
 now, does the option spark.executor.extraJavaOptions set to
 -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all 
 before
 we use it to create the StreamingContext? I ask because that is what we 
 are
 doing right now. If not, perhaps we have not been running with the
 Concurrent Mark Sweep at all and is that recommended instead of forcing GC
 periodically?

  Thanks
  NB


 On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com
 wrote:

 There are a couple of options. Increase timeout (see Spark
 configuration).

  Also see past mails in the mailing list.

  Another option you may try (I have gut feeling that may work, but I
 am not sure) is calling GC on the driver periodically. The cleaning up of
 stuff is tied to GCing of RDD objects and regular cleaning may help keep
 things clean more rigorously rather than in unpredictable bursts of GC
 activity.

  Let us know how it works out.

  TD

 On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com
 wrote:

 I have a standalone and local Spark streaming process where we are
 reading inputs using FlumeUtils. Our longest window size is 6 hours. 
 After
 about a day and a half of running without any issues, we start seeing
 Timeout errors while cleaning up input blocks. This seems to cause 
 reading
 from Flume to cease.


  ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
  at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
  at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
  at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
  at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
  at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread N B
Since we are running in local mode, won't all the executors be in the same
JVM as the driver?

Thanks
NB

On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote:

 Its does take effect on the executors, not on the driver. Which is okay
 because executors have all the data and therefore have GC issues, not so
 usually for the driver. If you want to double-sure, print the JVM flag
 (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)

 However, the GC i was referring to that initiates the RDD and shuffle
 cleanup was the GC on the driver. Thought I would clarify.

 TD

 On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. Since you mentioned GC, this got me thinking.

 Given that we are running in local mode (all in a single JVM) for now,
 does the option spark.executor.extraJavaOptions set to
 -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before
 we use it to create the StreamingContext? I ask because that is what we are
 doing right now. If not, perhaps we have not been running with the
 Concurrent Mark Sweep at all and is that recommended instead of forcing GC
 periodically?

 Thanks
 NB


 On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com
 wrote:

 There are a couple of options. Increase timeout (see Spark
 configuration).

 Also see past mails in the mailing list.

 Another option you may try (I have gut feeling that may work, but I am
 not sure) is calling GC on the driver periodically. The cleaning up of
 stuff is tied to GCing of RDD objects and regular cleaning may help keep
 things clean more rigorously rather than in unpredictable bursts of GC
 activity.

 Let us know how it works out.

 TD

 On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com
 wrote:

 I have a standalone and local Spark streaming process where we are
 reading inputs using FlumeUtils. Our longest window size is 6 hours. After
 about a day and a half of running without any issues, we start seeing
 Timeout errors while cleaning up input blocks. This seems to cause reading
 from Flume to cease.


 ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
 at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
 at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
 at
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out
 after [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
 at
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 ... 17 more

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread N B
Thanks TD. I believe that might have been the issue. Will try for a few
days after passing in the GC option on the java command line when we start
the process.

Thanks for your timely help.
NB

On Wed, Apr 8, 2015 at 6:08 PM, Tathagata Das t...@databricks.com wrote:

 Yes, in local mode they the driver and executor will be same the process.
 And in that case the Java options in  SparkConf configuration will not
 work.

 On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote:

 Since we are running in local mode, won't all the executors be in the
 same JVM as the driver?

 Thanks
 NB

 On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com
 wrote:

 Its does take effect on the executors, not on the driver. Which is okay
 because executors have all the data and therefore have GC issues, not so
 usually for the driver. If you want to double-sure, print the JVM flag
 (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)

 However, the GC i was referring to that initiates the RDD and shuffle
 cleanup was the GC on the driver. Thought I would clarify.

 TD

 On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. Since you mentioned GC, this got me thinking.

 Given that we are running in local mode (all in a single JVM) for now,
 does the option spark.executor.extraJavaOptions set to
 -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before
 we use it to create the StreamingContext? I ask because that is what we are
 doing right now. If not, perhaps we have not been running with the
 Concurrent Mark Sweep at all and is that recommended instead of forcing GC
 periodically?

 Thanks
 NB


 On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com
 wrote:

 There are a couple of options. Increase timeout (see Spark
 configuration).

 Also see past mails in the mailing list.

 Another option you may try (I have gut feeling that may work, but I am
 not sure) is calling GC on the driver periodically. The cleaning up of
 stuff is tied to GCing of RDD objects and regular cleaning may help keep
 things clean more rigorously rather than in unpredictable bursts of GC
 activity.

 Let us know how it works out.

 TD

 On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com
 wrote:

 I have a standalone and local Spark streaming process where we are
 reading inputs using FlumeUtils. Our longest window size is 6 hours. 
 After
 about a day and a half of running without any issues, we start seeing
 Timeout errors while cleaning up input blocks. This seems to cause 
 reading
 from Flume to cease.


 ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
 at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
 at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
 at
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out
 after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223

Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread N B
Thanks Imran. It does help clarify. I believe I had it right all along then
but was confused by documentation talking about never changing the
broadcasted variables.

I've tried it on a local mode process till now and does seem to work as
intended. When (and if !) we start running on a real cluster, I hope this
holds up.

Thanks
NB


On Tue, May 19, 2015 at 6:25 AM, Imran Rashid iras...@cloudera.com wrote:

 hmm, I guess it depends on the way you look at it.  In a way, I'm saying
 that spark does *not* have any built in auto-re-broadcast if you try to
 mutate a broadcast variable.  Instead, you should create something new, and
 just broadcast it separately.  Then just have all the code you have
 operating on your RDDs look at the new broadcast variable.

 But I guess there is another way to look at it -- you are creating new
 broadcast variables each time, but they all point to the same underlying
 mutable data structure.  So in a way, you are rebroadcasting the same
 underlying data structure.

 Let me expand my example from earlier a little bit more:


 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
  ...
 }

 // this is a val, because the data structure itself is mutable
 val myMutableDataStructue = ...
 // this is a var, because you will create new broadcasts
 var myBroadcast = sc.broadcast(myMutableDataStructure)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   // update your mutable data structure in place
   myMutableDataStructure.update(...)
   // ... but that doesn't effect the broadcast variables living out on the
 cluster, so we need to
   // create a new one

   // this line is not required -- the broadcast var will automatically get
 unpersisted when a gc
   // cleans up the old broadcast on the driver, but I'm including this
 here for completeness,
   // in case you want to more proactively clean up old blocks if you are
 low on space
   myBroadcast.unpersist()

   // now we create a new broadcast which has the updated data in our
 mutable data structure
   myBroadcast = sc.broadcast(myMutableDataStructure)
 }


 hope this clarifies things!

 Imran

 On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote:

 Hi Imran,

 If I understood you correctly, you are suggesting to simply call
 broadcast again from the driver program. This is exactly what I am hoping
 will work as I have the Broadcast data wrapped up and I am indeed
 (re)broadcasting the wrapper over again when the underlying data changes.
 However, documentation seems to suggest that one cannot re-broadcast. Is my
 understanding accurate?

 Thanks
 NB


 On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Rather than updating the broadcast variable, can't you simply create a
 new one?  When the old one can be gc'ed in your program, it will also get
 gc'ed from spark's cache (and all executors).

 I think this will make your code *slightly* more complicated, as you
 need to add in another layer of indirection for which broadcast variable to
 use, but not too bad.  Eg., from

 var myBroadcast = sc.broadcast( ...)
 (0 to 20).foreach{ iteration =
   //  ... some rdd operations that involve myBroadcast ...
   myBroadcast.update(...) // wrong! dont' update a broadcast variable
 }

 instead do something like:

 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit
 = {
  ...
 }

 var myBroadcast = sc.broadcast(...)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here,
 with whatever you need to update it
 }

 On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is
 broadcast in order to ensure that all nodes get the same value of the
 broadcast variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the
 underlying data is updated in order to get the changes visible on all 
 nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. If the underlying data
 changes then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using

Re: Possible to combine all RDDs from a DStream batch into one?

2015-07-15 Thread N B
Hi Jon,

In Spark streaming, 1 batch = 1 RDD. Essentially, the terms are used
interchangeably. If you are trying to collect multiple batches across a
DStream into a single RDD, look at the window() operations.

Hope this helps
Nikunj


On Wed, Jul 15, 2015 at 7:00 PM, Jon Chase jon.ch...@gmail.com wrote:

 I should note that the amount of data in each batch is very small, so I'm
 not concerned with performance implications of grouping into a single RDD.

 On Wed, Jul 15, 2015 at 9:58 PM, Jon Chase jon.ch...@gmail.com wrote:

 I'm currently doing something like this in my Spark Streaming program
 (Java):

 dStream.foreachRDD((rdd, batchTime) - {
 log.info(processing RDD from batch {}, batchTime);
 
 // my rdd processing code
 
 });

 Instead of having my rdd processing code called once for each RDD in the
 batch, is it possible to essentially group all of the RDDs from the batch
 into a single RDD and single partition and therefore operate on all of the
 elements in the batch at once?

 My goal here is to do an operation exactly once for every batch.  As I
 understand it, foreachRDD is going to do the operation once for each RDD in
 the batch, which is not what I want.

 I've looked at DStream.repartition(int), but the docs make it sound like
 it only changes the number of partitions in the batch's existing RDDs, not
 the number of RDDs.





Re: Spark streaming Processing time keeps increasing

2015-07-16 Thread N B
Thanks Akhil. For doing reduceByKeyAndWindow, one has to have checkpointing
enabled. So, yes we do have it enabled. But not Write Ahead Log because we
don't have a need for recovery and we do not recover the process state on
restart.

I don't know if IO Wait fully explains the increasing processing time.
Below is a full minute of 'sar' output every 2 seconds. The iowait values
don't seem too bad to me except for a brief small spike in the middle.
Also, how does one explain the continued degradation of processing time
even beyond the largest window interval?

Thanks
Nikunj


$ sar 2 30
Linux 3.13.0-48-generic (ip-X-X-X-X)07/16/2015  _x86_64_
 (16 CPU)

01:11:14 AM CPU %user %nice   %system   %iowait%steal
%idle
01:11:16 AM all 66.70  0.03 11.10  0.03  0.00
22.13
01:11:18 AM all 79.99  0.00 10.81  0.00  0.03
 9.17
01:11:20 AM all 62.66  0.03 10.84  0.00  0.03
26.43
01:11:22 AM all 68.59  0.00 10.83  0.00  0.10
20.49
01:11:24 AM all 77.74  0.00 10.83  0.00  0.03
11.40
01:11:26 AM all 65.01  0.00 10.83  0.03  0.07
24.06
01:11:28 AM all 66.33  0.00 10.87  0.00  0.03
22.77
01:11:30 AM all 72.38  0.03 12.48  0.54  0.06
14.50
01:11:32 AM all 68.35  0.00 12.98  7.46  0.03
11.18
01:11:34 AM all 75.94  0.03 14.02  3.27  0.03
 6.71
01:11:36 AM all 68.60  0.00 14.34  2.76  0.03
14.27
01:11:38 AM all 61.99  0.03 13.34  0.07  0.07
24.51
01:11:40 AM all 52.21  0.03 12.79  1.04  0.13
33.79
01:11:42 AM all 37.91  0.03 12.43  0.03  0.10
49.48
01:11:44 AM all 26.92  0.00 11.68  0.14  0.10
61.16
01:11:46 AM all 24.86  0.00 12.07  0.00  0.10
62.97
01:11:48 AM all 25.49  0.00 11.96  0.00  0.10
62.45
01:11:50 AM all 21.16  0.00 12.35  0.03  0.14
66.32
01:11:52 AM all 29.89  0.00 12.06  0.03  0.10
57.91
01:11:54 AM all 26.77  0.00 11.81  0.00  0.10
61.32
01:11:56 AM all 25.34  0.03 11.81  0.03  0.14
62.65
01:11:58 AM all 22.42  0.00 12.60  0.00  0.10
64.88
01:12:00 AM all 30.27  0.00 12.10  0.03  0.14
57.46
01:12:02 AM all 80.59  0.00 10.58  0.35  0.03
 8.44
01:12:04 AM all 49.05  0.00 12.89  0.66  0.07
37.32
01:12:06 AM all 31.21  0.03 13.54  6.54  0.17
48.50
01:12:08 AM all 31.66  0.00 13.26  6.30  0.10
48.67
01:12:10 AM all 36.19  0.00 12.87  3.04  0.14
47.76
01:12:12 AM all 82.63  0.03 10.60  0.00  0.03
 6.70
01:12:14 AM all 77.72  0.00 10.66  0.00  0.03
11.59
Average:all 52.22  0.01 12.04  1.08  0.08
34.58


On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 What is your data volume? Are you having checkpointing/WAL enabled? In
 that case make sure you are having SSD disks as this behavior is mainly due
 to the IO wait.

 Thanks
 Best Regards

 On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote:

 Hello,

 We have a Spark streaming application and the problem that we are
 encountering is that the batch processing time keeps on increasing and
 eventually causes the application to start lagging. I am hoping that
 someone here can point me to any underlying cause of why this might happen.

 The batch interval is 1 minute as of now and the app does some maps,
 filters, joins and reduceByKeyAndWindow operations. All the reduces are
 invertible functions and so we do provide the inverse-reduce functions in
 all those. The largest window size we have is 1 hour right now. When the
 app is started, we see that the batch processing time is between 20 and 30
 seconds. It keeps creeping up slowly and by the time it hits the 1 hour
 mark, it somewhere around 35-40 seconds. Somewhat expected and still not
 bad!

 I would expect that since the largest window we have is 1 hour long, the
 application should stabilize around the 1 hour mark and start processing
 subsequent batches within that 35-40 second zone. However, that is not what
 is happening. The processing time still keeps increasing and eventually in
 a few hours it exceeds 1 minute mark and then starts lagging. Eventually
 the lag builds up and becomes in minutes at which point we have to restart
 the system.

 Any pointers on why this could be happening and what we can do to
 troubleshoot further?

 Thanks
 Nikunj





Re: Spark streaming Processing time keeps increasing

2015-07-16 Thread N B
Hi TD,

Yes, we do have the invertible function provided. However, I am not sure I
understood how to use the filterFunction. Is there an example somewhere
showing its usage?

The header comment on the function says :

* @param filterFunc function to filter expired key-value pairs;
*   only pairs that satisfy the function are retained
*   set this to null if you do not want to filter

These are the questions I am confused about:

1. The code comment seems to imply that the filterFunc is only used to
figure out which keyvalue pairs are used to form the window but how
does it actually help expire the old data?

2. Shouldn't the values that are falling off of the window period
automatically be removed without the need for an additional filter
function?

3. Which side of the key-value pairs are passed to this function? The
ones that are coming in or the ones that are going out of window or
both?

4. The key-value pairs in use in a particular reduceByKeyAndWindow
operation may not have  the requisite info (such as a timestamp or
similar eg if its aggregated data) to help determine whether to return
true or false. What is the semantic expected here?


As always, thanks for your help

Nikunj





On Thu, Jul 16, 2015 at 1:16 AM, Tathagata Das t...@databricks.com wrote:

 MAke sure you provide the filterFunction with the invertible
 reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the
 key space will continue increase. This is what is leading to the lag. So
 use the filtering function to filter out the keys that are not needed any
 more.

 On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What is your data volume? Are you having checkpointing/WAL enabled? In
 that case make sure you are having SSD disks as this behavior is mainly due
 to the IO wait.

 Thanks
 Best Regards

 On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote:

 Hello,

 We have a Spark streaming application and the problem that we are
 encountering is that the batch processing time keeps on increasing and
 eventually causes the application to start lagging. I am hoping that
 someone here can point me to any underlying cause of why this might happen.

 The batch interval is 1 minute as of now and the app does some maps,
 filters, joins and reduceByKeyAndWindow operations. All the reduces are
 invertible functions and so we do provide the inverse-reduce functions in
 all those. The largest window size we have is 1 hour right now. When the
 app is started, we see that the batch processing time is between 20 and 30
 seconds. It keeps creeping up slowly and by the time it hits the 1 hour
 mark, it somewhere around 35-40 seconds. Somewhat expected and still not
 bad!

 I would expect that since the largest window we have is 1 hour long, the
 application should stabilize around the 1 hour mark and start processing
 subsequent batches within that 35-40 second zone. However, that is not what
 is happening. The processing time still keeps increasing and eventually in
 a few hours it exceeds 1 minute mark and then starts lagging. Eventually
 the lag builds up and becomes in minutes at which point we have to restart
 the system.

 Any pointers on why this could be happening and what we can do to
 troubleshoot further?

 Thanks
 Nikunj






Re: Spark streaming Processing time keeps increasing

2015-07-17 Thread N B
Hi TD,

Thanks for the response. I do believe I understand the concept and the need
for the filterfunction now. I made the requisite code changes and keeping
it running overnight to see the effect of it. Hopefully this should fix our
issue.

However, there was one place where I encountered a followup issue and had
to disable that reduce operation for the moment in order to proceed with my
testing for the rest of the changes.

This particular reduceByKeyAndWindow operates on a key-value pair String,
HashSetLong. Once the size of a HashSet drops to 0, we remove the
corresponding Key with the filterfunction specified as

( p - ! p._2().isEmpty())

That looks about right to me. However, soon after the first slide occurs in
this window, its throwing the following exceptions and aborting that batch.
The stack trace is below. I am not quite sure what to make of it (perhaps
partly due to the late hour :-D ). Any idea what could be wrong here? As
far as I know, String and HashSetLong should hash quite consistently.

Also, if there is no way to avoid this issue, I am thinking of rewriting
that part of the code to use a foldByKey or combineByKey operation instead
of reduceByKey.

Thanks
Nikunj


java.lang.Exception: Neither previous window has value for key, nor new
values found. Are you sure your key class hashes consistently?
at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:147)
at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:134)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)




On Fri, Jul 17, 2015 at 12:39 AM, Tathagata Das t...@databricks.com wrote:

 Responses inline.

 On Thu, Jul 16, 2015 at 9:27 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Yes, we do have the invertible function provided. However, I am not sure
 I understood how to use the filterFunction. Is there an example
 somewhere showing its usage?

 The header comment on the function says :

 * @param filterFunc function to filter expired key-value pairs;
 *   only pairs that satisfy the function are retained
 *   set this to null if you do not want to filter

Re: Counting distinct values for a key?

2015-07-19 Thread N B
Hi Jerry,

It does not work directly for 2 reasons:

1. I am trying to do this using Spark Streaming (Window DStreams) and
DataFrames API does not work with Streaming yet.

2. The query equivalent has a distinct embedded in it i.e. I am looking
to achieve the equivalent of

SELECT key, count(distinct(value)) from table group by key

Thanks
Nikunj


On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam chiling...@gmail.com wrote:

 You mean this does not work?

 SELECT key, count(value) from table group by key



 On Sun, Jul 19, 2015 at 2:28 PM, N B nb.nos...@gmail.com wrote:

 Hello,

 How do I go about performing the equivalent of the following SQL clause
 in Spark Streaming? I will be using this on a Windowed DStream.

 SELECT key, count(distinct(value)) from table group by key;

 so for example, given the following dataset in the table:

  key | value
 -+---
  k1  | v1
  k1  | v1
  k1  | v2
  k1  | v3
  k1  | v3
  k2  | vv1
  k2  | vv1
  k2  | vv2
  k2  | vv2
  k2  | vv2
  k3  | vvv1
  k3  | vvv1

 the result will be:

  key | count
 -+---
  k1  | 3
  k2  | 2
  k3  | 1

 Thanks
 Nikunj





Re: Counting distinct values for a key?

2015-07-19 Thread N B
Hi Suyog,

That code outputs the following:

key2 val22 : 1
key1 val1 : 2
key2 val2 : 2

while the output I want to achieve would have been (with your example):

key1 : 2
key2 : 2

because there are 2 distinct types of values for each key ( regardless of
their actual duplicate counts .. hence the use of the DISTINCT keyword in
the query equivalent ).

Thanks
Nikunj


On Sun, Jul 19, 2015 at 2:37 PM, suyog choudhari suyogchoudh...@gmail.com
wrote:

 public static void main(String[] args) {

  SparkConf sparkConf = new SparkConf().setAppName(CountDistinct);

  JavaSparkContext jsc = new JavaSparkContext(sparkConf);

   ListTuple2String, String list = new ArrayListTuple2String,
 String();

   list.add(new Tuple2String, String(key1, val1));

  list.add(new Tuple2String, String(key1, val1));

  list.add(new Tuple2String, String(key2, val2));

  list.add(new Tuple2String, String(key2, val2));

  list.add(new Tuple2String, String(key2, val22));

 JavaPairRDDString, Integer rdd =  jsc.parallelize(list).mapToPair(t
 - new Tuple2String, Integer(t._1 +   +t._2, 1));

   JavaPairRDDString, Integer rdd2 = rdd.reduceByKey((c1, c2) - c1+c2 );

 ListTuple2String, Integer output =  rdd2.collect();

   for (Tuple2?,? tuple : output) {

 System.out.println( tuple._1() +  :  + tuple._2() );

 }

   }

 On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam chiling...@gmail.com wrote:

 You mean this does not work?

 SELECT key, count(value) from table group by key



 On Sun, Jul 19, 2015 at 2:28 PM, N B nb.nos...@gmail.com wrote:

 Hello,

 How do I go about performing the equivalent of the following SQL clause
 in Spark Streaming? I will be using this on a Windowed DStream.

 SELECT key, count(distinct(value)) from table group by key;

 so for example, given the following dataset in the table:

  key | value
 -+---
  k1  | v1
  k1  | v1
  k1  | v2
  k1  | v3
  k1  | v3
  k2  | vv1
  k2  | vv1
  k2  | vv2
  k2  | vv2
  k2  | vv2
  k3  | vvv1
  k3  | vvv1

 the result will be:

  key | count
 -+---
  k1  | 3
  k2  | 2
  k3  | 1

 Thanks
 Nikunj






Counting distinct values for a key?

2015-07-19 Thread N B
Hello,

How do I go about performing the equivalent of the following SQL clause in
Spark Streaming? I will be using this on a Windowed DStream.

SELECT key, count(distinct(value)) from table group by key;

so for example, given the following dataset in the table:

 key | value
-+---
 k1  | v1
 k1  | v1
 k1  | v2
 k1  | v3
 k1  | v3
 k2  | vv1
 k2  | vv1
 k2  | vv2
 k2  | vv2
 k2  | vv2
 k3  | vvv1
 k3  | vvv1

the result will be:

 key | count
-+---
 k1  | 3
 k2  | 2
 k3  | 1

Thanks
Nikunj


Re: Spark streaming Processing time keeps increasing

2015-07-19 Thread N B
Hi TD,

Yay! Thanks for the help. That solved our issue of ever increasing
processing time. I added filter functions to all our reduceByKeyAndWindow()
operations and now its been stable for over 2 days already! :-).

One small feedback about the API though. The one that accepts the filter
function also expects either a partitioner or number of partitions. There
should be another call there that just accepts the filter function in
addition to other parameters but uses the default Parallelism from the
spark context set already. I ended up doing that in our code but would be
nice to have that as an overloaded call also.

Also, I am still trying to fix the issue in the reduceByKeyAndWindow
operation that throws those exceptions. However, I do believe I know what
is wrong there as the logic itself may be wrong. I am looking for an
alternative of doing it (preferably purely using Spark transformations
only) and will pose that question on this mailing list separately.

Thanks
Nikunj


On Fri, Jul 17, 2015 at 2:45 AM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. I do believe I understand the concept and the
 need for the filterfunction now. I made the requisite code changes and
 keeping it running overnight to see the effect of it. Hopefully this should
 fix our issue.

 However, there was one place where I encountered a followup issue and had
 to disable that reduce operation for the moment in order to proceed with my
 testing for the rest of the changes.

 This particular reduceByKeyAndWindow operates on a key-value pair String,
 HashSetLong. Once the size of a HashSet drops to 0, we remove the
 corresponding Key with the filterfunction specified as

 ( p - ! p._2().isEmpty())

 That looks about right to me. However, soon after the first slide occurs
 in this window, its throwing the following exceptions and aborting that
 batch. The stack trace is below. I am not quite sure what to make of it
 (perhaps partly due to the late hour :-D ). Any idea what could be wrong
 here? As far as I know, String and HashSetLong should hash quite
 consistently.

 Also, if there is no way to avoid this issue, I am thinking of rewriting
 that part of the code to use a foldByKey or combineByKey operation instead
 of reduceByKey.

 Thanks
 Nikunj


 java.lang.Exception: Neither previous window has value for key, nor new
 values found. Are you sure your key class hashes consistently?
 at
 org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:147)
 at
 org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:134)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276)
 at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244

Re: Counting distinct values for a key?

2015-07-20 Thread N B
Hi Jerry,

In fact, HashSet approach is what we took earlier. However, this did not
work with a Windowed DStream (i.e. if we provide a forward and inverse
reduce operation). The reason is that the inverse reduce tries to remove
values that may still exist elsewhere in the window and should not have
been removed. We discovered the logic error recently i.e. basically this is
not an invertible function.

I thought of doing this without an inverse reduce function but even over a
15 minute window, this is going to be an expensive operation.

Instead, I have put in place a solution using HashMap where I keep the
actual counts around and in inverse reduce we decrement the counts. At the
end we remove the keys for which the value is 0 and then take the size of
the map (in terms of how many keys it has that have non-zero counts). This
gives us the distinct count.

I was hoping that there is a more straightforward way of doing this within
Spark itself without having to resort to a hack like this.

Thanks
Nikunj



On Sun, Jul 19, 2015 at 6:13 PM, Jerry Lam chiling...@gmail.com wrote:

 Hi Nikunj,

 Sorry, I totally misread your question.
 I think you need to first groupbykey (get all values of the same key
 together), then follow by mapValues (probably put the values into a set and
 then take the size of it because you want a distinct count)

 HTH,

 Jerry

 Sent from my iPhone

 On 19 Jul, 2015, at 8:48 pm, N B nb.nos...@gmail.com wrote:

 Hi Suyog,

 That code outputs the following:

 key2 val22 : 1
 key1 val1 : 2
 key2 val2 : 2

 while the output I want to achieve would have been (with your example):

 key1 : 2
 key2 : 2

 because there are 2 distinct types of values for each key ( regardless of
 their actual duplicate counts .. hence the use of the DISTINCT keyword in
 the query equivalent ).

 Thanks
 Nikunj


 On Sun, Jul 19, 2015 at 2:37 PM, suyog choudhari suyogchoudh...@gmail.com
  wrote:

 public static void main(String[] args) {

  SparkConf sparkConf = new SparkConf().setAppName(CountDistinct);

  JavaSparkContext jsc = new JavaSparkContext(sparkConf);

   ListTuple2String, String list = new ArrayListTuple2String,
 String();

   list.add(new Tuple2String, String(key1, val1));

  list.add(new Tuple2String, String(key1, val1));

  list.add(new Tuple2String, String(key2, val2));

  list.add(new Tuple2String, String(key2, val2));

  list.add(new Tuple2String, String(key2, val22));

 JavaPairRDDString, Integer rdd =  jsc.parallelize(list).mapToPair(t
 - new Tuple2String, Integer(t._1 +   +t._2, 1));

   JavaPairRDDString, Integer rdd2 = rdd.reduceByKey((c1, c2) - c1+c2
 );

 ListTuple2String, Integer output =  rdd2.collect();

   for (Tuple2?,? tuple : output) {

 System.out.println( tuple._1() +  :  + tuple._2() );

 }

   }

 On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam chiling...@gmail.com wrote:

 You mean this does not work?

 SELECT key, count(value) from table group by key



 On Sun, Jul 19, 2015 at 2:28 PM, N B nb.nos...@gmail.com wrote:

 Hello,

 How do I go about performing the equivalent of the following SQL clause
 in Spark Streaming? I will be using this on a Windowed DStream.

 SELECT key, count(distinct(value)) from table group by key;

 so for example, given the following dataset in the table:

  key | value
 -+---
  k1  | v1
  k1  | v1
  k1  | v2
  k1  | v3
  k1  | v3
  k2  | vv1
  k2  | vv1
  k2  | vv2
  k2  | vv2
  k2  | vv2
  k3  | vvv1
  k3  | vvv1

 the result will be:

  key | count
 -+---
  k1  | 3
  k2  | 2
  k3  | 1

 Thanks
 Nikunj







Re: Is HDFS required for Spark streaming?

2015-09-05 Thread N B
Hi TD,

Thanks!

So our application does turn on checkpoints but we do not recover upon
application restart (we just blow the checkpoint directory away first and
re-create the StreamingContext) as we don't have a real need for that type
of recovery. However, because the application does reduceeByKeyAndWindow
operations, checkpointing has to be turned on. Do you think this scenario
will also only work with HDFS or having local directories suffice?

Thanks
Nikunj



On Fri, Sep 4, 2015 at 3:09 PM, Tathagata Das <t...@databricks.com> wrote:

> Shuffle spills will use local disk, HDFS not needed.
> Spark and Spark Streaming checkpoint info WILL NEED HDFS for
> fault-tolerance. So that stuff can be recovered even if the spark cluster
> nodes go down.
>
> TD
>
> On Fri, Sep 4, 2015 at 2:45 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Hello,
>>
>> We have a Spark Streaming program that is currently running on a single
>> node in "local[n]" master mode. We currently give it local directories for
>> Spark's own state management etc. The input is streaming from network/flume
>> and output is also to network/kafka etc, so the process as such does not
>> need any distributed file system.
>>
>> Now, we do want to start distributing this procesing across a few
>> machines and make a real cluster out of it. However, I am not sure if HDFS
>> is a hard requirement for that to happen. I am thinking about the Shuffle
>> spills, DStream/RDD persistence and checkpoint info. Do any of these
>> require the state to be shared via HDFS? Are there other alternatives that
>> can be utilized if state sharing is accomplished via the file system only.
>>
>> Thanks
>> Nikunj
>>
>>
>


Is HDFS required for Spark streaming?

2015-09-04 Thread N B
Hello,

We have a Spark Streaming program that is currently running on a single
node in "local[n]" master mode. We currently give it local directories for
Spark's own state management etc. The input is streaming from network/flume
and output is also to network/kafka etc, so the process as such does not
need any distributed file system.

Now, we do want to start distributing this procesing across a few machines
and make a real cluster out of it. However, I am not sure if HDFS is a hard
requirement for that to happen. I am thinking about the Shuffle spills,
DStream/RDD persistence and checkpoint info. Do any of these require the
state to be shared via HDFS? Are there other alternatives that can be
utilized if state sharing is accomplished via the file system only.

Thanks
Nikunj


Re: Is HDFS required for Spark streaming?

2015-09-09 Thread N B
Thanks Cody and TD.

If we do run with local directories, I suppose the checkpoint operation
will write the various partitions of an RDD into their own local dirs (of
course). So what's the worst that can happen in case of a node failure?
Will the streaming batches continue to process (i.e. does the lost
checkpointed data get recovered or recreated?) or will the entire
Application start seeing Errors from that point onwards?

Thanks
Nikunj


On Tue, Sep 8, 2015 at 11:54 AM, Tathagata Das <t...@databricks.com> wrote:

> You can use local directories in that case but it is not recommended and
> not a well-test code path (so I have no idea what can happen).
>
> On Tue, Sep 8, 2015 at 6:59 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Yes, local directories will be sufficient
>>
>> On Sat, Sep 5, 2015 at 10:44 AM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Hi TD,
>>>
>>> Thanks!
>>>
>>> So our application does turn on checkpoints but we do not recover upon
>>> application restart (we just blow the checkpoint directory away first and
>>> re-create the StreamingContext) as we don't have a real need for that type
>>> of recovery. However, because the application does reduceeByKeyAndWindow
>>> operations, checkpointing has to be turned on. Do you think this scenario
>>> will also only work with HDFS or having local directories suffice?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>>
>>> On Fri, Sep 4, 2015 at 3:09 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Shuffle spills will use local disk, HDFS not needed.
>>>> Spark and Spark Streaming checkpoint info WILL NEED HDFS for
>>>> fault-tolerance. So that stuff can be recovered even if the spark cluster
>>>> nodes go down.
>>>>
>>>> TD
>>>>
>>>> On Fri, Sep 4, 2015 at 2:45 PM, N B <nb.nos...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We have a Spark Streaming program that is currently running on a
>>>>> single node in "local[n]" master mode. We currently give it local
>>>>> directories for Spark's own state management etc. The input is streaming
>>>>> from network/flume and output is also to network/kafka etc, so the process
>>>>> as such does not need any distributed file system.
>>>>>
>>>>> Now, we do want to start distributing this procesing across a few
>>>>> machines and make a real cluster out of it. However, I am not sure if HDFS
>>>>> is a hard requirement for that to happen. I am thinking about the Shuffle
>>>>> spills, DStream/RDD persistence and checkpoint info. Do any of these
>>>>> require the state to be shared via HDFS? Are there other alternatives that
>>>>> can be utilized if state sharing is accomplished via the file system only.
>>>>>
>>>>> Thanks
>>>>> Nikunj
>>>>>
>>>>>
>>>>
>>>
>>
>


Tungsten and Spark Streaming

2015-09-09 Thread N B
Hello,

How can we start taking advantage of the performance gains made under
Project Tungsten in Spark 1.5 for a Spark Streaming program?

>From what I understand, this is available by default for Dataframes. But
for a program written using Spark Streaming, would we see any potential
gains "out of the box" in 1.5 or will we have to rewrite some portions of
the application code to realize that benefit?

Any insight/documentation links etc in this regard will be appreciated.

Thanks
Nikunj


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
Hi Dibyendu,

I am not sure I understand completely. But are you suggesting that
currently there is no way to enable Checkpoint directory to be in Tachyon?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi,
>
> Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
> Streaming and make sure Spark Streaming can recover from Driver failure and
> recover the blocks form Tachyon.
>
> The The Motivation for this PR is  :
>
> If Streaming application stores the blocks OFF_HEAP, it may not need any
> WAL like feature to recover from Driver failure. As long as the writing of
> blocks to Tachyon from Streaming receiver is durable, it should be
> recoverable from Tachyon directly on Driver failure.
> This can solve the issue of expensive WAL write and duplicating the blocks
> both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
> channel using OFF_HEAP store.
>
> https://github.com/apache/spark/pull/8817
>
> This PR still under review . But having done various fail over testing in
> my environment , I see this PR worked perfectly fine without any data loss
> . Let see what TD and other have to say on this PR .
>
> Below is the configuration I used to test this PR ..
>
>
> Spark : 1.6 from Master
> Tachyon : 0.7.1
>
> SparkConfiguration Details :
>
> SparkConf conf = new SparkConf().setAppName("TestTachyon")
> .set("spark.streaming.unpersist", "true")
> .set("spark.local.dir", "/mnt1/spark/tincan")
> .set("tachyon.zookeeper.address","10.252.5.113:2182")
> .set("tachyon.usezookeeper","true")
> .set("spark.externalBlockStore.url", "tachyon-ft://
> ip-10-252-5-113.asskickery.us:19998")
> .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
> .set("spark.externalBlockStore.folderName","pearson")
> .set("spark.externalBlockStore.dirId", "subpub")
>
> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>
> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
> 1));
>
> String checkpointDirectory = "hdfs://
> 10.252.5.113:9000/user/hadoop/spark/wal";
>
> jsc.checkpoint(checkpointDirectory);
>
>
> //I am using the My Receiver Based Consumer (
> https://github.com/dibbhatt/kafka-spark-consumer) . But
> KafkaUtil.CreateStream will also work
>
> JavaDStream unionStreams = ReceiverLauncher.launch(
> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>
>
>
>
> Regards,
> Dibyendu
>
> On Sat, Sep 26, 2015 at 11:59 AM, N B <nb.nos...@gmail.com> wrote:
>
>> Hi Dibyendu,
>>
>> How does one go about configuring spark streaming to use tachyon as its
>> place for storing checkpoints? Also, can one do this with tachyon running
>> on a completely different node than where spark processes are running?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi Tathagata,
>>>
>>> Thanks for looking into this. Further investigating I found that the
>>> issue is with Tachyon does not support File Append. The streaming receiver
>>> which writes to WAL when failed, and again restarted, not able to append to
>>> same WAL file after restart.
>>>
>>> I raised this with Tachyon user group, and Haoyuan told that within 3
>>> months time Tachyon file append will be ready. Will revisit this issue
>>> again then .
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Looks like somehow the file size reported by the FSInputDStream of
>>>> Tachyon's FileSystem interface, is returning zero.
>>>>
>>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>
>>>>> Just to follow up this thread further .
>>>>>
>>>>> I was doing some fault tolerant testing of Spark Streaming with
>>>>> Tachyon as OFF_HEAP block store. As I said in earlier email, I could able
>>>>> to solve the BlockNotFound exception when I used Hierarchical Storage
>>>>> of Tachyon ,  which is good.
>>>>>
>>>>> I continue doing some testing around 

Re: Weird worker usage

2015-09-26 Thread N B
Hello,

Does anyone have an insight into what could be the issue here?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 10:44 AM, N B <nb.nos...@gmail.com> wrote:

> Hi Akhil,
>
> I do have 25 partitions being created. I have set
> the spark.default.parallelism property to 25. Batch size is 30 seconds and
> block interval is 1200 ms which also gives us roughly 25 partitions from
> the input stream. I can see 25 partitions being created and used in the
> Spark UI also. Its just that those tasks are waiting for cores on N1 to get
> free before being scheduled while N2 is sitting idle.
>
> The cluster configuration is:
>
> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.
>
> for a grand total of 28 cores. But it still does most of the processing on
> N1 (divided among the 2 workers running) but almost completely disregarding
> N2 until its the final stage where data is being written out to
> Elasticsearch. I am not sure I understand the reason behind it not
> distributing more partitions to N2 to begin with and use it effectively.
> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
> send some of those partitions to N2 as well?
>
> Thanks
> Nikunj
>
>
> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Parallel tasks totally depends on the # of partitions that you are
>> having, if you are not receiving sufficient partitions (partitions > total
>> # cores) then try to do a .repartition.
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Sep 25, 2015 at 1:44 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> I have a Spark streaming application that reads from a Flume Stream,
>>> does quite a few maps/filters in addition to a few reduceByKeyAndWindow and
>>> join operations before writing the analyzed output to ElasticSearch inside
>>> a foreachRDD()...
>>>
>>> I recently started to run this on a 2 node cluster (Standalone) with the
>>> driver program directly submitting to Spark master on the same host. The
>>> way I have divided the resources is as follows:
>>>
>>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>>> worker)
>>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>>
>>> The application works just fine but it is underusing N2 completely. It
>>> seems to use N1 (note that both executors on N1 get used) for all the
>>> analytics but when it comes to writing to Elasticsearch, it does divide the
>>> data around into all 4 executors which then write to ES on a separate host.
>>>
>>> I am puzzled as to why the data is not being distributed evenly from the
>>> get go into all 4 executors and why would it only do so in the final step
>>> of the pipeline which seems counterproductive as well?
>>>
>>> CPU usage on N1 is near the peak while on N2 is < 10% of overall
>>> capacity.
>>>
>>> Any help in getting the resources more evenly utilized on N1 and N2 is
>>> welcome.
>>>
>>> Thanks in advance,
>>> Nikunj
>>>
>>>
>>
>


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
Hi Dibyendu,

Thanks. I believe I understand why it has been an issue using S3 for
checkpoints based on your explanation. But does this limitation apply only
if recovery is needed in case of driver failure?

What if we are not interested in recovery after a driver failure. However,
just for the purposes of running streaming pipelines that do
reduceByKeyAndWindow() and updateStateByKey() operations it needs to have a
checkpoint directory configured.

Do you think this usage will also run into issues if an S3 location is
provided for the checkpoint directory. We will not use it to do any
explicit recovery like I stated above.

Thanks
Nikunj



On Sat, Sep 26, 2015 at 3:13 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> In Spark Streaming , Checkpoint Directory is used for two purpose
>
> 1. Metadata checkpointing
>
> 2. Data checkpointing
>
> If you enable WAL to recover from Driver failure, Spark Streaming will
> also write the Received Blocks in WAL which stored in checkpoint directory.
>
> For streaming solution to recover from any failure without any data loss ,
> you need to enable Meta Data Check pointing and WAL.  You do not need to
> enable Data Check pointing.
>
> From my experiments and the PR I mentioned , I configured the Meta Data
> Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
> did not use any WAL . The PR I proposed would recover from Driver fail-over
> without using any WAL like feature because Blocks are already available in
> Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
> received blocks.
>
> Now the question is , can I configure Tachyon as my Metadata Checkpoint
> location ? I tried that , and Streaming application writes the
> receivedBlockMeataData to Tachyon, but on driver failure, it can not
> recover the received block meta data from Tachyon. I sometime see Zero size
> files in Tachyon checkpoint location , and it can not recover past events .
> I need to understand what is the issue of storing meta data in Tachyon .
> That needs a different JIRA I guess.
>
> Let me know I am able to explain the current scenario around Spark
> Streaming and Tachyon .
>
> Regards,
> Dibyendu
>
>
>
>
> On Sat, Sep 26, 2015 at 1:04 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Hi Dibyendu,
>>
>> I am not sure I understand completely. But are you suggesting that
>> currently there is no way to enable Checkpoint directory to be in Tachyon?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for
>>> Spark Streaming and make sure Spark Streaming can recover from Driver
>>> failure and recover the blocks form Tachyon.
>>>
>>> The The Motivation for this PR is  :
>>>
>>> If Streaming application stores the blocks OFF_HEAP, it may not need any
>>> WAL like feature to recover from Driver failure. As long as the writing of
>>> blocks to Tachyon from Streaming receiver is durable, it should be
>>> recoverable from Tachyon directly on Driver failure.
>>> This can solve the issue of expensive WAL write and duplicating the
>>> blocks both in MEMORY and also WAL and also guarantee end to end
>>> No-Data-Loss channel using OFF_HEAP store.
>>>
>>> https://github.com/apache/spark/pull/8817
>>>
>>> This PR still under review . But having done various fail over testing
>>> in my environment , I see this PR worked perfectly fine without any data
>>> loss . Let see what TD and other have to say on this PR .
>>>
>>> Below is the configuration I used to test this PR ..
>>>
>>>
>>> Spark : 1.6 from Master
>>> Tachyon : 0.7.1
>>>
>>> SparkConfiguration Details :
>>>
>>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>>> .set("spark.streaming.unpersist", "true")
>>> .set("spark.local.dir", "/mnt1/spark/tincan")
>>> .set("tachyon.zookeeper.address","10.252.5.113:2182")
>>> .set("tachyon.usezookeeper","true")
>>> .set("spark.externalBlockStore.url", "tachyon-ft://
>>> ip-10-252-5-113.asskickery.us:19998")
>>> .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>>> .set("spark.externalBlockStore.folderName","pearson")
>>> .set("spark.extern

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread N B
I wanted to add that we are not configuring the WAL in our scenario.

Thanks again,
Nikunj


On Sat, Sep 26, 2015 at 11:35 AM, N B <nb.nos...@gmail.com> wrote:

> Hi Dibyendu,
>
> Thanks. I believe I understand why it has been an issue using S3 for
> checkpoints based on your explanation. But does this limitation apply only
> if recovery is needed in case of driver failure?
>
> What if we are not interested in recovery after a driver failure. However,
> just for the purposes of running streaming pipelines that do
> reduceByKeyAndWindow() and updateStateByKey() operations it needs to have a
> checkpoint directory configured.
>
> Do you think this usage will also run into issues if an S3 location is
> provided for the checkpoint directory. We will not use it to do any
> explicit recovery like I stated above.
>
> Thanks
> Nikunj
>
>
>
> On Sat, Sep 26, 2015 at 3:13 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> In Spark Streaming , Checkpoint Directory is used for two purpose
>>
>> 1. Metadata checkpointing
>>
>> 2. Data checkpointing
>>
>> If you enable WAL to recover from Driver failure, Spark Streaming will
>> also write the Received Blocks in WAL which stored in checkpoint directory.
>>
>> For streaming solution to recover from any failure without any data loss
>> , you need to enable Meta Data Check pointing and WAL.  You do not need to
>> enable Data Check pointing.
>>
>> From my experiments and the PR I mentioned , I configured the Meta Data
>> Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
>> did not use any WAL . The PR I proposed would recover from Driver fail-over
>> without using any WAL like feature because Blocks are already available in
>> Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
>> received blocks.
>>
>> Now the question is , can I configure Tachyon as my Metadata Checkpoint
>> location ? I tried that , and Streaming application writes the
>> receivedBlockMeataData to Tachyon, but on driver failure, it can not
>> recover the received block meta data from Tachyon. I sometime see Zero size
>> files in Tachyon checkpoint location , and it can not recover past events .
>> I need to understand what is the issue of storing meta data in Tachyon .
>> That needs a different JIRA I guess.
>>
>> Let me know I am able to explain the current scenario around Spark
>> Streaming and Tachyon .
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>>
>> On Sat, Sep 26, 2015 at 1:04 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Hi Dibyendu,
>>>
>>> I am not sure I understand completely. But are you suggesting that
>>> currently there is no way to enable Checkpoint directory to be in Tachyon?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for
>>>> Spark Streaming and make sure Spark Streaming can recover from Driver
>>>> failure and recover the blocks form Tachyon.
>>>>
>>>> The The Motivation for this PR is  :
>>>>
>>>> If Streaming application stores the blocks OFF_HEAP, it may not need
>>>> any WAL like feature to recover from Driver failure. As long as the writing
>>>> of blocks to Tachyon from Streaming receiver is durable, it should be
>>>> recoverable from Tachyon directly on Driver failure.
>>>> This can solve the issue of expensive WAL write and duplicating the
>>>> blocks both in MEMORY and also WAL and also guarantee end to end
>>>> No-Data-Loss channel using OFF_HEAP store.
>>>>
>>>> https://github.com/apache/spark/pull/8817
>>>>
>>>> This PR still under review . But having done various fail over testing
>>>> in my environment , I see this PR worked perfectly fine without any data
>>>> loss . Let see what TD and other have to say on this PR .
>>>>
>>>> Below is the configuration I used to test this PR ..
>>>>
>>>>
>>>> Spark : 1.6 from Master
>>>> Tachyon : 0.7.1
>>>>
>>>> SparkConfiguration Details :
>>>>
>>>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>>>> .set("spark.streaming.unpersist", "true")

Re: Weird worker usage

2015-09-25 Thread N B
Hi Akhil,

I do have 25 partitions being created. I have set
the spark.default.parallelism property to 25. Batch size is 30 seconds and
block interval is 1200 ms which also gives us roughly 25 partitions from
the input stream. I can see 25 partitions being created and used in the
Spark UI also. Its just that those tasks are waiting for cores on N1 to get
free before being scheduled while N2 is sitting idle.

The cluster configuration is:

N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.

for a grand total of 28 cores. But it still does most of the processing on
N1 (divided among the 2 workers running) but almost completely disregarding
N2 until its the final stage where data is being written out to
Elasticsearch. I am not sure I understand the reason behind it not
distributing more partitions to N2 to begin with and use it effectively.
Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
send some of those partitions to N2 as well?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Parallel tasks totally depends on the # of partitions that you are having,
> if you are not receiving sufficient partitions (partitions > total # cores)
> then try to do a .repartition.
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:44 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Hello all,
>>
>> I have a Spark streaming application that reads from a Flume Stream, does
>> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
>> operations before writing the analyzed output to ElasticSearch inside a
>> foreachRDD()...
>>
>> I recently started to run this on a 2 node cluster (Standalone) with the
>> driver program directly submitting to Spark master on the same host. The
>> way I have divided the resources is as follows:
>>
>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>> worker)
>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>
>> The application works just fine but it is underusing N2 completely. It
>> seems to use N1 (note that both executors on N1 get used) for all the
>> analytics but when it comes to writing to Elasticsearch, it does divide the
>> data around into all 4 executors which then write to ES on a separate host.
>>
>> I am puzzled as to why the data is not being distributed evenly from the
>> get go into all 4 executors and why would it only do so in the final step
>> of the pipeline which seems counterproductive as well?
>>
>> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>>
>> Any help in getting the resources more evenly utilized on N1 and N2 is
>> welcome.
>>
>> Thanks in advance,
>> Nikunj
>>
>>
>


Weird worker usage

2015-09-25 Thread N B
Hello all,

I have a Spark streaming application that reads from a Flume Stream, does
quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
operations before writing the analyzed output to ElasticSearch inside a
foreachRDD()...

I recently started to run this on a 2 node cluster (Standalone) with the
driver program directly submitting to Spark master on the same host. The
way I have divided the resources is as follows:

N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
worker)
N2: 2 spark workers (16 gb + 8 cores each worker).

The application works just fine but it is underusing N2 completely. It
seems to use N1 (note that both executors on N1 get used) for all the
analytics but when it comes to writing to Elasticsearch, it does divide the
data around into all 4 executors which then write to ES on a separate host.

I am puzzled as to why the data is not being distributed evenly from the
get go into all 4 executors and why would it only do so in the final step
of the pipeline which seems counterproductive as well?

CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.

Any help in getting the resources more evenly utilized on N1 and N2 is
welcome.

Thanks in advance,
Nikunj


Dynamic lookup table

2015-08-28 Thread N B
Hi all,

I have the following use case that I wanted to get some insight on how to
go about doing in Spark Streaming.

Every batch is processed through the pipeline and at the end, it has to
update some statistics information. This updated info should be reusable in
the next batch of this DStream e.g for looking up the relevant stat and it
in turn refines the stats further. It has to continue doing this for every
batch processed. First batch in the DStream can work with empty stats
lookup without issue. Essentially, we are trying to do a feedback loop.

What is a good pattern to apply for something like this? Some approaches
that I considered are:

1. Use updateStateByKey(). But this produces a new DStream that I cannot
refer back in the pipeline, so seems like a no-go but would be happy to be
proven wrong.

2. Use broadcast variables to maintain this state in a Map for example and
continue re-brodcasting it after every batch. I am not sure if this has
performance implications or if its even a good idea.

3. IndexedRDD? Looked promising initially but I quickly realized that it
might have the same issue as the updateStateByKey() approach, i.e. its not
available in the pipeline before its created.

4. Any other ideas that are obvious and I am missing?

Thanks
Nikunj


Re: Dynamic lookup table

2015-08-28 Thread N B
Hi Jason,

Thanks for the response. I believe I can look into a Redis based solution
for storing this state externally. However, would it be possible to refresh
this from the store with every batch i.e. what code can be written inside
the pipeline to fetch this info from the external store? Also, seems like a
waste since all that info should be available from within spark already.
The number of keys and amount of data is going to be quite limited in this
case. It just needs to be updated periodically.

The Accumulator based solution only works for simple counting and we have a
larger stats object that includes things like Averages, Variance etc. The
Accumulable interface potentially can be implemented for this but then the
other restriction seems to be that the values of such an accumulator can
only be accessed on the driver/master. We want this info to be available in
the data processing for the next batch as a lookup for example. Do you know
of a way to make that possible with Accumulable?

Thanks
Nikunj



On Fri, Aug 28, 2015 at 3:10 PM, Jason ja...@jasonknight.us wrote:

 Hi Nikunj,

 Depending on what kind of stats you want to accumulate, you may want to
 look into the Accumulator/Accumulable API, or if you need more control, you
 can store these things in an external key-value store (HBase, redis, etc..)
 and do careful updates there. Though be careful and make sure your updates
 are atomic (transactions or CAS semantics) or you could run into race
 condition problems.

 Jason

 On Fri, Aug 28, 2015 at 11:39 AM N B nb.nos...@gmail.com wrote:

 Hi all,

 I have the following use case that I wanted to get some insight on how to
 go about doing in Spark Streaming.

 Every batch is processed through the pipeline and at the end, it has to
 update some statistics information. This updated info should be reusable in
 the next batch of this DStream e.g for looking up the relevant stat and it
 in turn refines the stats further. It has to continue doing this for every
 batch processed. First batch in the DStream can work with empty stats
 lookup without issue. Essentially, we are trying to do a feedback loop.

 What is a good pattern to apply for something like this? Some approaches
 that I considered are:

 1. Use updateStateByKey(). But this produces a new DStream that I cannot
 refer back in the pipeline, so seems like a no-go but would be happy to be
 proven wrong.

 2. Use broadcast variables to maintain this state in a Map for example
 and continue re-brodcasting it after every batch. I am not sure if this has
 performance implications or if its even a good idea.

 3. IndexedRDD? Looked promising initially but I quickly realized that it
 might have the same issue as the updateStateByKey() approach, i.e. its not
 available in the pipeline before its created.

 4. Any other ideas that are obvious and I am missing?

 Thanks
 Nikunj




Re: Neither previous window has value for key, nor new values found.

2016-06-16 Thread N B
That post from TD that you reference has a good explanation of the issue
you are encountering. The issue in my view here is that the reduce and the
inverseReduce function that you have specified are not perfect opposites of
each other. Consider the following strings:

"a"
"b"
"a"

forward reduce will form them into "aba".
When the first "a" falls off the window, your inverse reduce function will
produce "b" and not "ba" as would be required by the opposite of the
concatenation. Now when the second "a" falls off, the check for this being
inconsistent is triggered with the exception you see.

HTH
NB


On Fri, Jun 10, 2016 at 1:19 PM, Marco1982  wrote:

> Hi all,
>
> I'm running a Spark Streaming application that uses reduceByKeyAndWindow().
> The window interval is 2 hours, while the slide interval is 1 hour. I have
> a
> JavaPairRDD in which both keys and values are strings. Each time the
> reduceByKeyAndWindow() function is called, it uses appendString() and
> removeString() functions below to incrementally build a windowed stream of
> data:
>
> Function2 appendString = new Function2 String, String>() {
>   @Override
>   public String call(String s1, String s2) {
> return s1 + s2;
>   }
> };
>
> Function2 removeString = new Function2 String, String>() {
>   @Override
>   public String call(String s1, String s2) {
> return s1.replace(s2, "");
>   }
> };
>
> filterEmptyRecords() removes keys that eventually won't contain any value:
>
> Function, Boolean> filterEmptyRecords =
> new Function, Boolean>() {
>   @Override
>   public Boolean call(scala.Tuple2 t) {
> return (!t._2().isEmpty());
>   }
> };
>
> The windowed operation is then:
>
> JavaPairDStream cdr_kv =
> cdr_filtered.reduceByKeyAndWindow(appendString, removeString,
> Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION),
> PARTITIONS, filterEmptyRecords);
>
> After a few hours of operation, this function raises the following
> exception:
> "Neither previous window has value for key, nor new values found. Are you
> sure your key class hashes consistently?"
>
> I've found this post from 2013:
> https://groups.google.com/forum/#!msg/spark-users/9OM1YvWzwgE/PhFgdSTP2OQJ
> which however doesn't solve my problem. I'm using String to represent keys,
> which I'm pretty sure hash consistently.
>
> Any clue why this happens and possible suggestions to fix it?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Neither-previous-window-has-value-for-key-nor-new-values-found-tp27140.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Kafka stream processing time increasing gradually

2016-06-16 Thread N B
We had this same issue with the reduceByKeyAndWindow API that you are
using. For fixing this issue, you have to use  different flavor of that
API, specifically the 2 versions that allow you to give a 'Filter function'
to them. Putting in the filter functions helped stabilize our application
too.

HTH
NB


On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh 
wrote:

> Hi all,
> I have a python streaming job which is supposed to run 24x7. I am unable
> to stabilize it. The job just counts no of links shared in a 30 minute
> sliding window. I am using reduceByKeyAndWindow operation with a batch of
> 30 seconds, slide interval of 60 seconds.
>
> The kafka queue has a rate of nearly 2200 messages/second which can
> increase to 3000 but the mean is 2200.
>
> I have played around with batch size, slide interval, and by increasing
> parallelism with no fruitful result. These just delay the destabilization.
>
> GC time is usually between 60-100 ms.
>
> I also noticed that the jobs were not distributed to other nodes in the
> spark UI, for which I have used configured spark.locality.wait as 100ms.
> After which I have noticed that the job is getting distributed properly.
>
> I have a cluster of 6 slaves and one master each with 16 cores and 15gb of
> ram.
>
> Code and configuration: http://pastebin.com/93DMSiji
>
> Streaming screenshot: http://imgur.com/psNfjwJ
>
> I need help in debugging the issue. Any help will be appreciated.
>
> --
> Roshan Singh
>
>


Re: Spark Kafka stream processing time increasing gradually

2016-06-20 Thread N B
Its actually necessary to retire keys that become "Zero" or "Empty" so to
speak. In your case, the key is "imageURL" and values are a dictionary, one
of whose fields is "count" that you are maintaining. For simplicity and
illustration's sake I will assume imageURL to be a strings like "abc". Your
slide duration is 60 and window duration is 1800 seconds.

Now consider the following chain of events in your stream.

batch 1 : "abc"
batch 2 : "xyz"
batch 3 : "abc"

and now for the rest of the stream, the keys "abc" or "xyz" never occur.

At the end of the third batch, the generated window rdd has
{ "abc" -> count = 2, "xyz" -> count = 1 }.
When the first batch falls off after 1800 seconds, it will become
{ "abc -> count = 1, "xyz" -> count = 1 }.
60 seconds later, it will become
{ "abc" -> count = 1, "xyz" -> count = 0 }
and a further 60 seconds later, the 3rd batch is removed from the window
and the new window rdd becomes
{ "abc" -> count = 0, "xyz" -> count = 0 }.

I hope you can see what is wrong with this. These keys will be perpetually
held in memory even though there is no need for them to be there. This
growth in the size of the generated window rdd is what's giving rise to the
deteriorating processing time in your case.

A filter function that's equivalent of "count != 0" will suffice to
remember only those keys that have not become "Zero".

HTH,
NB



On Thu, Jun 16, 2016 at 8:12 PM, Roshan Singh <singh.rosha...@gmail.com>
wrote:

> Hi,
> According to the docs (
> https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
> filerFunc can be used to retain expiring keys. I do not want to retain any
> expiring key, so I do not understand how can this help me stabilize it.
> Please correct me if this is not the case.
>
> I am also specifying both reduceFunc and invReduceFunc. Can you can a
> sample code of what you are using.
>
> Thanks.
>
> On Fri, Jun 17, 2016 at 3:43 AM, N B <nb.nos...@gmail.com> wrote:
>
>> We had this same issue with the reduceByKeyAndWindow API that you are
>> using. For fixing this issue, you have to use  different flavor of that
>> API, specifically the 2 versions that allow you to give a 'Filter function'
>> to them. Putting in the filter functions helped stabilize our application
>> too.
>>
>> HTH
>> NB
>>
>>
>> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh <singh.rosha...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>> I have a python streaming job which is supposed to run 24x7. I am unable
>>> to stabilize it. The job just counts no of links shared in a 30 minute
>>> sliding window. I am using reduceByKeyAndWindow operation with a batch of
>>> 30 seconds, slide interval of 60 seconds.
>>>
>>> The kafka queue has a rate of nearly 2200 messages/second which can
>>> increase to 3000 but the mean is 2200.
>>>
>>> I have played around with batch size, slide interval, and by increasing
>>> parallelism with no fruitful result. These just delay the destabilization.
>>>
>>> GC time is usually between 60-100 ms.
>>>
>>> I also noticed that the jobs were not distributed to other nodes in the
>>> spark UI, for which I have used configured spark.locality.wait as 100ms.
>>> After which I have noticed that the job is getting distributed properly.
>>>
>>> I have a cluster of 6 slaves and one master each with 16 cores and 15gb
>>> of ram.
>>>
>>> Code and configuration: http://pastebin.com/93DMSiji
>>>
>>> Streaming screenshot: http://imgur.com/psNfjwJ
>>>
>>> I need help in debugging the issue. Any help will be appreciated.
>>>
>>> --
>>> Roshan Singh
>>>
>>>
>>
>
>
> --
> Roshan Singh
> http://roshansingh.in
>


Overriding toString and hashCode with Spark streaming

2016-02-02 Thread N B
Hello,

In our Spark streaming application, we are forming DStreams made of objects
a rather large composite class. I have discovered that in order to do some
operations like RDD.subtract(), they are only successful for complex
objects such as these by overriding toString() and hashCode() methods for
this class.

Are there any issues that we should be aware of in general for doing so in
a Spark program?

Thanks
NB


Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
Fixed a typo in the code to avoid any confusion Please comment on the
code below...

dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
 public SomeClass initialValue() { return new SomeClass(); }
};
somefunc(p, d.get());
d.remove();
return p;
}; );

On Fri, Jan 29, 2016 at 4:32 PM, N B <nb.nos...@gmail.com> wrote:

> So this use of ThreadLocal will be inside the code of a function executing
> on the workers i.e. within a call from one of the lambdas. Would it just
> look like this then:
>
> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>  public SomeClass initialValue() { return new SomeClass(); }
> };
> somefunc(p, d.get());
> d.remove();
> return p;
> }; );
>
> Will this make sure that all threads inside the worker clean up the
> ThreadLocal once they are done with processing this task?
>
> Thanks
> NB
>
>
> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Spark Streaming uses threadpools so you need to remove ThreadLocal when
>> it's not used.
>>
>> On Fri, Jan 29, 2016 at 12:55 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Thanks for the response Ryan. So I would say that it is in fact the
>>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as the
>>> thread lives. I guess my concern is around usage of threadpools and whether
>>> Spark streaming will internally create many threads that rotate between
>>> tasks on purpose thereby holding onto ThreadLocals that may actually never
>>> be used again.
>>>
>>> Thanks
>>>
>>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> Of cause. If you use a ThreadLocal in a long living thread and forget
>>>> to remove it, it's definitely a memory leak.
>>>>
>>>> On Thu, Jan 28, 2016 at 9:31 PM, N B <nb.nos...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Does anyone know if there are any potential pitfalls associated with
>>>>> using ThreadLocal variables in a Spark streaming application? One things I
>>>>> have seen mentioned in the context of app servers that use thread pools is
>>>>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>>>>> also?
>>>>>
>>>>> Thanks
>>>>> Nikunj
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
So this use of ThreadLocal will be inside the code of a function executing
on the workers i.e. within a call from one of the lambdas. Would it just
look like this then:

dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
 public SomeClass initialValue() { return new SomeClass(); }
};
somefunc(p, d.get());
d.remove();
return p;
}; );

Will this make sure that all threads inside the worker clean up the
ThreadLocal once they are done with processing this task?

Thanks
NB


On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Spark Streaming uses threadpools so you need to remove ThreadLocal when
> it's not used.
>
> On Fri, Jan 29, 2016 at 12:55 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Thanks for the response Ryan. So I would say that it is in fact the
>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as the
>> thread lives. I guess my concern is around usage of threadpools and whether
>> Spark streaming will internally create many threads that rotate between
>> tasks on purpose thereby holding onto ThreadLocals that may actually never
>> be used again.
>>
>> Thanks
>>
>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Of cause. If you use a ThreadLocal in a long living thread and forget to
>>> remove it, it's definitely a memory leak.
>>>
>>> On Thu, Jan 28, 2016 at 9:31 PM, N B <nb.nos...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Does anyone know if there are any potential pitfalls associated with
>>>> using ThreadLocal variables in a Spark streaming application? One things I
>>>> have seen mentioned in the context of app servers that use thread pools is
>>>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>>>> also?
>>>>
>>>> Thanks
>>>> Nikunj
>>>>
>>>>
>>>
>>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
Well won't the code in lambda execute inside multiple threads in the worker
because it has to process many records? I would just want to have a single
copy of SomeClass instantiated per thread rather than once per each record
being processed. That was what triggered this thought anyways.

Thanks
NB


On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"?
> You don't need to use ThreadLocal if there are no multiple threads in your
> codes.
>
> On Fri, Jan 29, 2016 at 4:39 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Fixed a typo in the code to avoid any confusion Please comment on the
>> code below...
>>
>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>  public SomeClass initialValue() { return new SomeClass(); }
>> };
>> somefunc(p, d.get());
>> d.remove();
>> return p;
>> }; );
>>
>> On Fri, Jan 29, 2016 at 4:32 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> So this use of ThreadLocal will be inside the code of a function
>>> executing on the workers i.e. within a call from one of the lambdas. Would
>>> it just look like this then:
>>>
>>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>>  public SomeClass initialValue() { return new SomeClass(); }
>>> };
>>> somefunc(p, d.get());
>>> d.remove();
>>> return p;
>>> }; );
>>>
>>> Will this make sure that all threads inside the worker clean up the
>>> ThreadLocal once they are done with processing this task?
>>>
>>> Thanks
>>> NB
>>>
>>>
>>> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> Spark Streaming uses threadpools so you need to remove ThreadLocal when
>>>> it's not used.
>>>>
>>>> On Fri, Jan 29, 2016 at 12:55 PM, N B <nb.nos...@gmail.com> wrote:
>>>>
>>>>> Thanks for the response Ryan. So I would say that it is in fact the
>>>>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as 
>>>>> the
>>>>> thread lives. I guess my concern is around usage of threadpools and 
>>>>> whether
>>>>> Spark streaming will internally create many threads that rotate between
>>>>> tasks on purpose thereby holding onto ThreadLocals that may actually never
>>>>> be used again.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>>>>> shixi...@databricks.com> wrote:
>>>>>
>>>>>> Of cause. If you use a ThreadLocal in a long living thread and forget
>>>>>> to remove it, it's definitely a memory leak.
>>>>>>
>>>>>> On Thu, Jan 28, 2016 at 9:31 PM, N B <nb.nos...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Does anyone know if there are any potential pitfalls associated with
>>>>>>> using ThreadLocal variables in a Spark streaming application? One 
>>>>>>> things I
>>>>>>> have seen mentioned in the context of app servers that use thread pools 
>>>>>>> is
>>>>>>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>>>>>>> also?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Nikunj
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark streaming and ThreadLocal

2016-02-01 Thread N B
Is each partition guaranteed to execute in a single thread in a worker?

Thanks
N B


On Fri, Jan 29, 2016 at 6:53 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> I see. Then you should use `mapPartitions` rather than using ThreadLocal.
> E.g.,
>
> dstream.mapPartitions( iter ->
> val d = new SomeClass();
> return iter.map { p =>
>somefunc(p, d.get())
> };
> }; );
>
>
> On Fri, Jan 29, 2016 at 5:29 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Well won't the code in lambda execute inside multiple threads in the
>> worker because it has to process many records? I would just want to have a
>> single copy of SomeClass instantiated per thread rather than once per each
>> record being processed. That was what triggered this thought anyways.
>>
>> Thanks
>> NB
>>
>>
>> On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> It looks weird. Why don't you just pass "new SomeClass()" to
>>> "somefunc"? You don't need to use ThreadLocal if there are no multiple
>>> threads in your codes.
>>>
>>> On Fri, Jan 29, 2016 at 4:39 PM, N B <nb.nos...@gmail.com> wrote:
>>>
>>>> Fixed a typo in the code to avoid any confusion Please comment on
>>>> the code below...
>>>>
>>>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>>>  public SomeClass initialValue() { return new SomeClass(); }
>>>> };
>>>> somefunc(p, d.get());
>>>> d.remove();
>>>> return p;
>>>> }; );
>>>>
>>>> On Fri, Jan 29, 2016 at 4:32 PM, N B <nb.nos...@gmail.com> wrote:
>>>>
>>>>> So this use of ThreadLocal will be inside the code of a function
>>>>> executing on the workers i.e. within a call from one of the lambdas. Would
>>>>> it just look like this then:
>>>>>
>>>>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>>>>  public SomeClass initialValue() { return new SomeClass(); }
>>>>> };
>>>>>     somefunc(p, d.get());
>>>>> d.remove();
>>>>> return p;
>>>>> }; );
>>>>>
>>>>> Will this make sure that all threads inside the worker clean up the
>>>>> ThreadLocal once they are done with processing this task?
>>>>>
>>>>> Thanks
>>>>> NB
>>>>>
>>>>>
>>>>> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
>>>>> shixi...@databricks.com> wrote:
>>>>>
>>>>>> Spark Streaming uses threadpools so you need to remove ThreadLocal
>>>>>> when it's not used.
>>>>>>
>>>>>> On Fri, Jan 29, 2016 at 12:55 PM, N B <nb.nos...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the response Ryan. So I would say that it is in fact the
>>>>>>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as 
>>>>>>> the
>>>>>>> thread lives. I guess my concern is around usage of threadpools and 
>>>>>>> whether
>>>>>>> Spark streaming will internally create many threads that rotate between
>>>>>>> tasks on purpose thereby holding onto ThreadLocals that may actually 
>>>>>>> never
>>>>>>> be used again.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>>>>>>> shixi...@databricks.com> wrote:
>>>>>>>
>>>>>>>> Of cause. If you use a ThreadLocal in a long living thread and
>>>>>>>> forget to remove it, it's definitely a memory leak.
>>>>>>>>
>>>>>>>> On Thu, Jan 28, 2016 at 9:31 PM, N B <nb.nos...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> Does anyone know if there are any potential pitfalls associated
>>>>>>>>> with using ThreadLocal variables in a Spark streaming application? One
>>>>>>>>> things I have seen mentioned in the context of app servers that use 
>>>>>>>>> thread
>>>>>>>>> pools is that ThreadLocals can leak memory. Could this happen in Spark
>>>>>>>>> streaming also?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Nikunj
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread N B
Thanks for the response Ryan. So I would say that it is in fact the purpose
of a ThreadLocal i.e. to have a copy of the variable as long as the thread
lives. I guess my concern is around usage of threadpools and whether Spark
streaming will internally create many threads that rotate between tasks on
purpose thereby holding onto ThreadLocals that may actually never be used
again.

Thanks

On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> Of cause. If you use a ThreadLocal in a long living thread and forget to
> remove it, it's definitely a memory leak.
>
> On Thu, Jan 28, 2016 at 9:31 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Hello,
>>
>> Does anyone know if there are any potential pitfalls associated with
>> using ThreadLocal variables in a Spark streaming application? One things I
>> have seen mentioned in the context of app servers that use thread pools is
>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>> also?
>>
>> Thanks
>> Nikunj
>>
>>
>


Spark streaming and ThreadLocal

2016-01-28 Thread N B
Hello,

Does anyone know if there are any potential pitfalls associated with using
ThreadLocal variables in a Spark streaming application? One things I have
seen mentioned in the context of app servers that use thread pools is that
ThreadLocals can leak memory. Could this happen in Spark streaming also?

Thanks
Nikunj


Kryo (with Spark 1.6.3) class registration slows down processing

2017-01-20 Thread N B
Hello,

Here is something I am unable to explain and goes against Kryo's
documentation, numerous suggestions on the web and on this list as well as
pure intuition.

Our Spark application runs in a single JVM (perhaps this is relevant, hence
mentioning it). We have been using Kryo serialization with Spark (setting
the spark.serializer property to
org.apache.spark.serializer.KryoSerializer) without explicitly registering
classes and everything seems to work well enough. Recently, I have been
looking into making some performance improvements and decided to register
classes.

I turned on the "spark.kryo.registrationRequired" property and started to
register all classes as they were reported by the resulting Exceptions.
Eventually I managed to register them all. BTW, there is a fairly large
number of internal Spark and Scala classes that also I had to register but
that's besides the point here.

I was hoping to gain some performance improvement as per the suggestions of
registering classes. However, what I saw was the exact opposite and
surprising. Performance (throughput) actually deteriorated by at least a
factor of 50%. I turned off the registrationRequired property but kept the
explicit registrations in place with the same result. Then I reduced the
number of registrations and performance started to get better again.
Eventually I got rid of all the explicit registrations (back to where I
started basically) and performance improved back to where it was.

I am unable to explain why I am observing this behavior as this is
counter-intuitive. Explicit registration is supposed to write smaller
amount of data (class names as Strings vs just class Ids as integers) and
hence help improve performance. Is the fact that Spark is running in local
mode (single JVM) a factor here? Any insights will be appreciated.

Thanks
NB


Spark Streaming user function exceptions causing hangs

2016-08-24 Thread N B
Hello,

We have a Spark streaming application (running Spark 1.6.1) that consumes
data from a message queue. The application is running in local[*] mode so
driver and executors are in a single JVM.

The issue that we are dealing with these days is that if any of our lambda
functions throw any Exception, the entire processing hangs and the task
keeps getting retried ad infinitum. According to the documentation here :

http://spark.apache.org/docs/1.6.1/configuration.html

the default value of spark.task.maxFailures is set to 4 and we have not
changed that so I don't quite get why the tasks keep getting retried
forever.

How can we make sure that a single bad record causing a Runtime Exception
does not stop all the processing?

Till now I have tried to add try/catch blocks around the lambda functions
and returning null in case an exception does get thrown but that causes
NullPointerExceptions within Spark with the same net effect of processing
of the subsequent batches getting stopped.

Thanks in advance,
NB


Implementing Dynamic Sampling in a Spark Streaming Application

2017-07-12 Thread N B
Hi all,

Spark has had a backpressure implementation since 1.5 that helps to
stabilize a Spark Streaming application in terms of keeping the processing
time/batch under control and less than the batch interval. This
implementation leaves excess records in the source (Kafka, Flume etc) and
they get picked up in subsequent batches.

However, there are use cases where it would be useful to pick up the whole
batch of records from the source and randomly sample it down to a
dynamically computed "desired" batch size. This would allow the application
to not lag behind in processing the latest traffic with the trade off being
that some traffic could be lost. I believe such a random sampling strategy
has been proposed in the original backpressure JIRA (SPARK-7398) design doc
but not natively implemented yet.

I have written a blog post about implementing such a technique in the
application using the PIDEstimator used in Spark's Backpressure
implementation and randomly sampling the batch using its outcome.

Implementing a Dynamic Sampling Strategy in a Spark Streaming Application


Hope that some people find it useful. Comments and discussion are welcome.

Thanks,
Nikunj


Re: Flume DStream produces 0 records after HDFS node killed

2017-06-20 Thread N B
Ok some more info about this issue to see if someone can shine a light on
what could be going on. I turned on debug logging for
org.apache.spark.streaming.scheduler in the driver process and this is what
gets thrown in the logs and keeps throwing it even after the downed HDFS
node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.

2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning -
Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer()) to
the WriteAheadLog.
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
at
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
at
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:171)
at
org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:287)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:187)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
Missing storageIDs: It is likely that the HDFS client, who made this call,
is running in an older version of Hadoop which does not support storageIDs.
datanodeID.length=1,
src=/vm/spark-checkpoint/receivedBlockMetadata/log-1497997390799-1497997450799,
fileId=0, blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872,
clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
at
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:514)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3353)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:759)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:515)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:919)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator
ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in
the Write Ahead Log.

Thanks
N B


On Tue, Jun 20, 2017 at 10:24 AM, N B <nb.nos...@gmail.com> wrote:

> BTW, this is running on Spark 2.1.1.
>
> I have been trying to debug this issue and what I have found till now is
> that it is somehow relate

Flume DStream produces 0 records after HDFS node killed

2017-06-19 Thread N B
Hi all,

We are running a Standalone Spark Cluster for running a streaming
application. The application consumes data from Flume using a Flume Polling
stream created as such :

flumeStream = FlumeUtils.createPollingStream(streamingContext,
socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);


The checkpoint directory is configured to be on an HDFS cluster and Spark
workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to be on
their respective local filesystems.

What we are seeing is some odd behavior and unable to explain. During
normal operation, everything runs as expected with flume delivering events
to Spark. However, while running, if I kill one of the HDFS nodes (does not
matter which one), the Flume Receiver in Spark stops producing any data to
the data processing.

I enabled debug logging for org.apache.spark.streaming.flume on Spark
worker nodes and looked at the logs for the one that gets to run the Flume
Receiver and it keeps chugging along receiving data from Flume as shown in
a sample of the log below, but the resulting batches in the Stream start
receiving 0 records soon as the HDFS node is killed, with no errors being
produced to indicate any issue.

*17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
09fa05f59050*
*17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events
with sequence number: 09fa05f59052*
*17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
number: 09fa05f59052*
*17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
09fa05f59052*
*17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events
with sequence number: 09fa05f59054*
*17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
number: 09fa05f59054*

The driver output for the application shows (printed via
Dstream.count().map().print()):

---
Time: 149792077 ms
---
Received 0 flume events.


Any insights about where to look in order to find the root cause will be
greatly appreciated.

Thanks
N B


Spark 2.1.1 and Hadoop version 2.2 or 2.7?

2017-06-20 Thread N B
I had downloaded the pre build package labeled "Spark 2.1.1 prebuilt with
Hadoop 2.7 or later" from the direct download link on spark.apache.org.

However, I am seeing compatibility errors running against a deployed HDFS
2.7.3. (See my earlier message about Flume DStream producing 0 records
after HDFS node restarted) I have been digging into this issue and have
started to suspect versions mismatch between Hadoop server and client. I
decided to look at Spark 2.1.1's pom.xml. It states hadoop,version as
2.2.0. There seems to be some mismtach here that I am not sure if that's
the root cause of the issues I have been seeing.

Can someone please confirm if the package mentioned above was indeed
compiled with Hadoop 2.7? Or should I fall back on an HDFS Server 2.2
instead?

Thanks
N B


Re: Flume DStream produces 0 records after HDFS node killed

2017-06-21 Thread N B
Hadoop version 2.7.3

On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin <yohannjar...@hotmail.com>
wrote:

> Which version of Hadoop are you running on?
>
> *Yohann Jardin*
> Le 6/21/2017 à 1:06 AM, N B a écrit :
>
> Ok some more info about this issue to see if someone can shine a light on
> what could be going on. I turned on debug logging for
> org.apache.spark.streaming.scheduler in the driver process and this is
> what gets thrown in the logs and keeps throwing it even after the downed
> HDFS node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.
>
> 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning
> - Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer())
> to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult:
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
> at org.apache.spark.streaming.util.BatchedWriteAheadLog.
> write(BatchedWriteAheadLog.scala:83)
> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(
> ReceivedBlockTracker.scala:234)
> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.
> cleanupOldBatches(ReceivedBlockTracker.scala:171)
> at org.apache.spark.streaming.scheduler.ReceiverTracker.
> cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
> at org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(
> JobGenerator.scala:287)
> at org.apache.spark.streaming.scheduler.JobGenerator.org$
> apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.
> scala:187)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:89)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.
> HadoopIllegalArgumentException): Missing storageIDs: It is likely that
> the HDFS client, who made this call, is running in an older version of
> Hadoop which does not support storageIDs. datanodeID.length=1,
> src=/vm/spark-checkpoint/receivedBlockMetadata/log-1497997390799-1497997450799,
> fileId=0, blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872,
> clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
> at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.
> getDatanodeStorageInfos(DatanodeManager.java:514)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> getAdditionalDatanode(FSNamesystem.java:3353)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> getAdditionalDatanode(NameNodeRpcServer.java:759)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:515)
> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1347)
> at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:206)
> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:186)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> addDatanode2ExistingPipeline(DFSOutputStream.java:919)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataSt

Re: Flume DStream produces 0 records after HDFS node killed

2017-06-22 Thread N B
This issue got resolved.

I was able to trace it to the fact that the driver program's pom.xml was
pulling in Spark 2.1.1 which in turn was pulling in Hadoop 2.2.0.
Explicitly adding dependencies on Hadoop libraries 2.7.3 resolves it.

The following API in HDFS : DatanodeManager.getDatanodeStorageInfos() is
incompatible between clients using HDFS Client 2.2.0 and HDFS Server
running some later versions. I believe this issue should be raised with the
HDFS project, since HDFS 2.2.0 APIs were supposed to have been kept API and
Protocol compatible according to the Hadoop 2.2.0 release blurb:

>From http://hadoop.apache.org/releases.html :

"
15 October, 2013: Release 2.2.0 available

Apache Hadoop 2.2.0 is the *GA* release of Apache Hadoop 2.x.

Users are encouraged to immediately move to 2.2.0 since this release is
significantly more stable and is guaranteed to remain compatible in terms
of both APIs and protocols.
"

Thanks
N B


On Tue, Jun 20, 2017 at 11:36 PM, N B <nb.nos...@gmail.com> wrote:

> Hadoop version 2.7.3
>
> On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin <yohannjar...@hotmail.com>
> wrote:
>
>> Which version of Hadoop are you running on?
>>
>> *Yohann Jardin*
>> Le 6/21/2017 à 1:06 AM, N B a écrit :
>>
>> Ok some more info about this issue to see if someone can shine a light on
>> what could be going on. I turned on debug logging for
>> org.apache.spark.streaming.scheduler in the driver process and this is
>> what gets thrown in the logs and keeps throwing it even after the downed
>> HDFS node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.
>>
>> 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning
>> - Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer())
>> to the WriteAheadLog.
>> org.apache.spark.SparkException: Exception thrown in awaitResult:
>> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
>> at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(
>> BatchedWriteAheadLog.scala:83)
>> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.wr
>> iteToLog(ReceivedBlockTracker.scala:234)
>> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cl
>> eanupOldBatches(ReceivedBlockTracker.scala:171)
>> at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanup
>> OldBlocksAndBatches(ReceiverTracker.scala:233)
>> at org.apache.spark.streaming.scheduler.JobGenerator.clearCheck
>> pointData(JobGenerator.scala:287)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org$apache
>> $spark$streaming$scheduler$JobGenerator$$processEvent(
>> JobGenerator.scala:187)
>> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:89)
>> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:88)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> Caused by: 
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
>> Missing storageIDs: It is likely that the HDFS client, who made this call,
>> is running in an older version of Hadoop which does not support storageIDs.
>> datanodeID.length=1, src=/vm/spark-checkpoint/recei
>> vedBlockMetadata/log-1497997390799-1497997450799, fileId=0,
>> blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872,
>> clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
>> at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManage
>> r.getDatanodeStorageInfos(DatanodeManager.java:514)
>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAddit
>> ionalDatanode(FSNamesystem.java:3353)
>> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.get
>> AdditionalDatanode(NameNodeRpcServer.java:759)
>> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServ
>> erSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProto
>> colServerSideTranslatorPB.java:515)
>> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocol
>> Protos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNam
>> enodeProtocolProtos.java)
>> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcIn
>> voker.call(ProtobufRpcEngine.java:616)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at org.apache.hadoop.s

Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-26 Thread N B
Hi Swetha,

We have dealt with this issue a couple years ago and have solved it. The
key insight here was that adding to a HashSet and removing from a HashSet
are actually not inverse operations of each other.

For example, if you added a key K1 in batch1 and then again added that same
key K1 during a later batch lets say batch9, the inverse function needs to
be able to remove this key *twice* from the HashSet which is exactly not
what a HashSet does. Once the key is removed due to batch1 falling off, the
resulting new HashSet now has this key missing and when times comes to
remove bacth9, it will barf with the error that you are experiencing.

The solution is to actually maintain a count of how many times you have
encountered that particular key and care to decrement it in your invert
function. Once the count reaches 0, your filter function should then remove
that key from consideration.

We achieved it using a HashMap that maintains counts instead of a Set.

Hope this helps,
N B


On Thu, Jun 22, 2017 at 4:07 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> Hi TD,
>
> I am still seeing this issue with any immuatble DataStructure. Any idea
> why this happens? I use scala.collection.immutable.List[String])  and my
> reduce and inverse reduce does the following.
>
> visitorSet1 ++visitorSet2
>
>
>
> visitorSet1.filterNot(visitorSet2.contains(_)
>
>
>
> On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> I changed the datastructure to scala.collection.immutable.Set and I still
>> see the same issue. My key is a String.  I do the following in my reduce
>> and invReduce.
>>
>> visitorSet1 ++visitorSet2.toTraversable
>>
>>
>> visitorSet1 --visitorSet2.toTraversable
>>
>> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes, and in general any mutable data structure. You have to immutable
>>> data structures whose hashcode and equals is consistent enough for being
>>> put in a set.
>>>
>>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> Are you suggesting against the usage of HashSet?
>>>>
>>>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> This may be because of HashSet is a mutable data structure, and it
>>>>> seems you are actually mutating it in "set1 ++set2". I suggest creating a
>>>>> new HashMap in the function (and add both maps into it), rather than
>>>>> mutating one of them.
>>>>>
>>>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <swethakasire...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>>>> below.
>>>>>> Any idea as to why I get the error?
>>>>>>
>>>>>>  java.lang.Exception: Neither previous window has value for key, nor
>>>>>> new
>>>>>> values found. Are you sure your key class hashes consistently?
>>>>>>
>>>>>>
>>>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>>> HashSet[String]))
>>>>>> => (Long, HashSet[String])= {
>>>>>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>>>>>> Long,
>>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>>>> ++set2 )
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>>>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2:
>>>>>> Long,
>>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>>>> set1.diff(set2))
>>>>>>   }
>>>>>>
>>>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>>>> (Boolean)= {
>>>>>> case ((metricName:String, (timeStamp: Long, set:
>>>>>> HashSet[String]))) =>
>>>>>> set.size>0
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>>>> ndow-in-Spark-Streaming-tp28748.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>


Re: Flume DStream produces 0 records after HDFS node killed

2017-06-20 Thread N B
BTW, this is running on Spark 2.1.1.

I have been trying to debug this issue and what I have found till now is
that it is somehow related to the Spark WAL. The directory named
/receivedBlockMetadata seems to stop getting
written to after the point of an HDFS node being killed and restarted. I
have a couple questions around this :

1. Why is the flume receiver even writing to the WAL? I have not enabled
the 'spark.streaming.receiver.writeAheadLog.enable' property and even after
I set it explicitly to false in the driver, the WAL seems to be getting
written to.

2. Why would the receive receive metadata but not write to the WAL after an
HDFS node is lost and restarted? HDFS replication factor is at its default
of 2.

Thanks
N B


On Mon, Jun 19, 2017 at 6:23 PM, N B <nb.nos...@gmail.com> wrote:

> Hi all,
>
> We are running a Standalone Spark Cluster for running a streaming
> application. The application consumes data from Flume using a Flume Polling
> stream created as such :
>
> flumeStream = FlumeUtils.createPollingStream(streamingContext,
> socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
> StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);
>
>
> The checkpoint directory is configured to be on an HDFS cluster and Spark
> workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to be on
> their respective local filesystems.
>
> What we are seeing is some odd behavior and unable to explain. During
> normal operation, everything runs as expected with flume delivering events
> to Spark. However, while running, if I kill one of the HDFS nodes (does not
> matter which one), the Flume Receiver in Spark stops producing any data to
> the data processing.
>
> I enabled debug logging for org.apache.spark.streaming.flume on Spark
> worker nodes and looked at the logs for the one that gets to run the Flume
> Receiver and it keeps chugging along receiving data from Flume as shown in
> a sample of the log below, but the resulting batches in the Stream start
> receiving 0 records soon as the HDFS node is killed, with no errors being
> produced to indicate any issue.
>
> *17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
> 09fa05f59050*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events
> with sequence number: 09fa05f59052*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
> number: 09fa05f59052*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
> 09fa05f59052*
> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events
> with sequence number: 09fa05f59054*
> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
> number: 09fa05f59054*
>
> The driver output for the application shows (printed via
> Dstream.count().map().print()):
>
> ---
> Time: 149792077 ms
> ---
> Received 0 flume events.
>
>
> Any insights about where to look in order to find the root cause will be
> greatly appreciated.
>
> Thanks
> N B
>
>