Neither previous window has value for key, nor new values found.

2016-06-10 Thread Marco1982
Hi all,

I'm running a Spark Streaming application that uses reduceByKeyAndWindow().
The window interval is 2 hours, while the slide interval is 1 hour. I have a
JavaPairRDD in which both keys and values are strings. Each time the
reduceByKeyAndWindow() function is called, it uses appendString() and
removeString() functions below to incrementally build a windowed stream of
data:

Function2 appendString = new Function2() {
  @Override
  public String call(String s1, String s2) {
return s1 + s2;
  }
};

Function2 removeString = new Function2() {
  @Override
  public String call(String s1, String s2) {
return s1.replace(s2, "");
  }
};

filterEmptyRecords() removes keys that eventually won't contain any value:

Function, Boolean> filterEmptyRecords =
new Function, Boolean>() {
  @Override
  public Boolean call(scala.Tuple2 t) {
return (!t._2().isEmpty());
  }
};

The windowed operation is then:

JavaPairDStream cdr_kv =
cdr_filtered.reduceByKeyAndWindow(appendString, removeString,
Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION),
PARTITIONS, filterEmptyRecords);

After a few hours of operation, this function raises the following
exception:
"Neither previous window has value for key, nor new values found. Are you
sure your key class hashes consistently?"

I've found this post from 2013:
https://groups.google.com/forum/#!msg/spark-users/9OM1YvWzwgE/PhFgdSTP2OQJ
which however doesn't solve my problem. I'm using String to represent keys,
which I'm pretty sure hash consistently.

Any clue why this happens and possible suggestions to fix it?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Neither-previous-window-has-value-for-key-nor-new-values-found-tp27140.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming - long garbage collection time

2016-06-03 Thread Marco1982
Hi all,

I'm running a Spark Streaming application with 1-hour batches to join two
data feeds and write the output to disk. The total size of one data feed is
about 40 GB per hour (split in multiple files), while the size of the second
data feed is about 600-800 MB per hour (also split in multiple files). Due
to application constraints, I may not be able to run smaller batches.
Currently, it takes about 20 minutes to produce the output in a cluster with
140 cores and 700 GB of RAM. I'm running 7 workers and 28 executors, each
with 5 cores and 22 GB of RAM.

I execute mapToPair(), filter(), and reduceByKeyAndWindow(1 hour batch) on
the 40 GB data feed. Most of the computation time is spent on these
operations. What worries me is the Garbage Collection (GC) execution time
per executor, which goes from 25 secs to 9.2 mins. I attach two screenshots
below: one lists the GC time and one prints out GC comments for a single
executor. I anticipate that the executor that spends 9.2 mins in doing
garbage collection is eventually killed by the Spark driver.

I think these numbers are too high. Do you have any suggestion about keeping
GC time low? I'm already using Kryo Serializer, ++UseConcMarkSweepGC, and
spark.rdd.compress=true.

Is there anything else that would help?

Thanks
 

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-long-garbage-collection-time-tp27087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Symbolic links in Spark

2016-06-01 Thread Marco1982
Hi all,

It seems to me that Spark Streaming doesn't read symbolic links. Do you
confirm that?

Marco



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Symbolic-links-in-Spark-tp27062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming - Is window() caching DStreams?

2016-05-27 Thread Marco1982
Dear all,

Can someone please explain me how Spark Streaming executes the window()
operation? From the Spark 1.6.1 documentation, it seems that windowed
batches are automatically cached in memory, but looking at the web UI it
seems that operations already executed in previous batches are executed
again. For your convenience, I attach a screenshot of my running application
below.

By looking at the web UI, it seems that flatMapValues() RDDs are cached
(green spot - this is the last operation executed before I call window() on
the DStream), but, at the same time, it also seems that all the
transformations that led to flatMapValues() in previous batches are executed
again. If this is the case, the window() operation may induce huge
performance penalties, especially if the window duration is 1 or 2 hours (as
I expect for my application). Do you think that checkpointing the DStream at
that time can be helpful? Consider that the expected slide window is about 5
minutes.

Hope someone can clarify this point.

Thanks,
Marco
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Is-window-caching-DStreams-tp27041.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to carry data streams over multiple batch intervals in Spark Streaming

2016-05-21 Thread Marco1982
Hi experts,
I'm using Apache Spark Streaming 1.6.1 to write a Java application that
joins two Key/Value data streams and writes the output to HDFS. The two data
streams contain K/V strings and are periodically ingested in Spark from HDFS
by using textFileStream().
The two data streams aren't synchronized, which means that some keys that
are in stream1 at time t0 may appear in stream2 at time t1, or the vice
versa. Hence, my goal is to join the two streams and compute "leftover"
keys, which should be considered for the join operation in the next batch
intervals.
To better clarify this, look at the following algorithm:

variables:
stream1 =  input stream at time t1
stream2 =  input stream at time t1
left_keys_s1 =  records of stream1 that didn't appear in the
join at time t0
left_keys_s2 =  records of stream2 that didn't appear in the
join at time t0

operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should
be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should
be used at time t2)

I've tried to implement this algorithm with Spark Streaming unsuccessfully.
Initially, I create two empty streams for leftover keys in this way (this is
only one stream, but the code to generate the second stream is similar):

JavaRDD empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue> q = new LinkedList>();
q.add(empty_rdd);
JavaDStream empty_dstream = jssc.queueStream(q);
JavaPairDStream k1 = empty_dstream.mapToPair(new
PairFunction () {
 @Override
 public scala.Tuple2
call(String s) {
   return new scala.Tuple2(s, s);
 }
   });

Later on, this empty stream is unified (i.e., union()) with stream1 and
finally, after the join, I add the leftover keys from stream1 and call
window(). The same happens with stream2.
The problem is that the operations that generate left_keys_s1 and
left_keys_s2 are transformations without actions, which means that Spark
doesn't create any RDD flow graph and, hence, they are never executed. What
I get right now is a join that outputs only the records whose keys are in
stream1 and stream2 in the same time interval.
Do you guys have any suggestion to implement this correctly with Spark?

Thanks, 
Marco



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-carry-data-streams-over-multiple-batch-intervals-in-Spark-Streaming-tp26994.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org