Spark Zmq issue in cluster mode
I have a spark streaming zmq application running fine in non-cluster mode. When running a local cluster and I do spark-submit, zero mq java client is choking. org.zeromq.ZMQException: No such file or directory at org.zeromq.ZMQ$Socket.raiseZMQException(ZMQ.java:480) at org.zeromq.ZMQ$Socket.recv(ZMQ The classpath has jzmq jar and the java.library.path points to /usr/local/lib which has the zmq bindings. The compilation of jzmq et all are fine as non-cluster application works fine, this seems to be some env parameter not getting passed onto the executor correctly... Any clues on what I might be missing?
Re: Issue while trying to aggregate with a sliding window
Ok that patch does fix the key lookup exception. However, curious about the time validity check..isValidTime ( https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264 ) Why does (time - zerotime) have to be a multiple of slide duration ? Shouldn't the reduceByKeyAndWindow aggregate every record in a given window (zeroTime to zeroTime+windowDuration)? On Tue, Jun 17, 2014 at 10:55 PM, Hatch M hatchman1...@gmail.com wrote: Thanks! Will try to get the fix and retest. On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote: There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch
Issue while trying to aggregate with a sliding window
Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch
Re: Issue while trying to aggregate with a sliding window
Thanks! Will try to get the fix and retest. On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote: There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch