Log4j files per spark job

2015-03-17 Thread Dan H.
Hey guys, Looking for a bit of help on logging.

I trying to get Spark to write log4j logs per job within a Spark cluster. 
So for example, I'd like:

$SPARK_HOME/logs/job1.log.x
$SPARK_HOME/logs/job2.log.x

And I want this on the driver and on the executor.

I'm trying to accomplish this by using a log4j.properties file in each job
resource, but isn't logging properly.

How can I get job level log on the executor and driver?

Thanks in advance for taking the time to respond.

D



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Log4j-files-per-spark-job-tp22106.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Port Configuration

2014-12-23 Thread Dan H.
Hi all,

I'm trying to lock down ALL Spark ports and have tried using
spark-defaults.conf and via the sparkContext.  (The example below was run in
local[*] mode, but all attempts to run in local or spark-submit.sh on
cluster via jar all result in the same results).  

My goal is to define all communication between the driver and worker to use
5-50006 and to not use any random ports. (all other ports, 7077, 4040,
etc are ok).

But, I am still seeing random ports being generated from akka and would like
to define all ports for the spark application due to strict security that is
needed.  

As part of my SparkContext, I've defined: (snippet)

set(spark.driver.port, 50001).
set(spark.fileserver.port, 50002).
set(spark.broadcast.port, 50003).
set(spark.replClassServer.port, 50004).
set(spark.blockManager.port, 50005).
set(spark.executor.port, 50006).



And upon execution, I see the following being read into the UI correctly,
but am still seeing random port assignments:
.
.
.
This is valid and what I'm expecting
Remoting started; listening on addresses :[akka.tcp://spark@10.x.x.x:50001]
Remoting: Remoting now listens on addresses:
[akka.tcp://spark@10.x.x.x:50001]
.
.
.
This isn't being set
ConnectionManager: Bound socket to port 54061 with id =
ConnectionManagerId(10.x.x.x,54061)
BlockManagerMaster: Trying to register BlockManager
BlockManagerInfo: Registering block manager 10.x.x.x:54061 with 2.1 GB RAM
BlockManagerMaster: Registered BlockManager
HttpServer: Starting HTTP Server
HttpBroadcast: Broadcast server started at http://10.x.x.x:54062

I defined a block manager port, but it simply isn't getting set, what else
can I try to get this resolved?

I can also see in the UI that are not being set properly.
 spark.fileserver.uri 
 spark.httpBroadcast.uri

Thanks for your time in reviewing/answering my post in advance.

Regards,

Dan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Port-Configuration-tp20839.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming Workflow Validation

2014-08-07 Thread Dan H.
I wanted to post for validation to understand if there is more efficient way
to achieve my goal.  I'm currently performing this flow for two distinct
calculations executing in parallel:
  1) Sum key/value pair, by using a simple witnessed count(apply 1 to a
mapToPair() and then groupByKey()
  2) Sum the actual values, in my key/value pair and transform the data so
group properly by groupByKey()

DataSource: RDDStream_in

Workflow1:
Generate DStream using flatmap() from input RDDStream_in, which splits data
into:
StringKey1, StringKey2, Value1_to_be_inspected

Next I apply a filter() to pull the values I only want to see as
witnessed...which creates a smaller DStream
StringKey1, StringKey2, Value1_inspected

I generate a PairDStream from mapToPair() from the previous step, providing
a way to append a summable value yielding:
StringKey1, StringKey2, Value1_inspected, to_be_summed_valueof 1

Next I apply the groupByKey() to the PairDStream get:
StringKey1, StringKey2, Value1, summed_value by key/values


Workflow 2:
Generate DStream using flatmap() from input RDDStream_in, which splits data
into:
StringKey1, StringKey2, Value1_to_be_summed

Next, I apply mapToPair() from the previous DStream, thus providing a way to
sum the Value1 and remove the Value1 from the original StringKey, thus
yielding:
StringKey1, StringKey2, Value1_to_be_summed

Next I apply the groupByKey() and I get:
StringKey1, StringKey2, Value1_summed by Keys
 
Are there more efficient approaches I should be considering, such as
method.chaining or another technique to increase work flow efficiency?  

Thanks for your feedback in advance.

DH





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Workflow-Validation-tp11677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Workflow Validation

2014-08-07 Thread Dan H.
Yes, thanks, I did in fact mean reduceByKey(), thus allowing the convenience
method process the summation by key.

Thanks for your feedback!

DH



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Workflow-Validation-tp11677p11706.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming and Storm

2014-07-09 Thread Dan H.
Xichen_tju,

I recently evaluated Storm for a period of months (using 2Us, 2.4GHz CPU, 
24GBRAM with 3 servers) and was not able to achieve a realistic scale for my 
business domain needs.  Storm is really only a framework, which allows you to 
put in code to do whatever it is you need for a distributed system…so it’s 
completely flexible and distributable, but it comes at a price.  In Storm, the 
one of the biggest performance hits, came down to how the “acks” work within 
the tuple trees.  You can have the framework default ack messages between 
spouts and/or bolts, but in the end, you most likely want to manage acks 
yourself, due to how much reliability you’re system will need (to replay 
messages…).  All this means, is that if you don’t have massive amounts of data 
that you need to process within a few seconds, (which I do) then Storm may work 
well for you, but you’re performance will diminish as you add in more and more 
business rules (unless of course you add in more servers for processing).  If 
you need to ingest at least 1GBps+, then you may want to reevaluate since 
you’re server scale may not mesh with you overall processing needs.

I recently just started using Spark Streaming with Kafka and have been quite 
impressed at the performance level that’s being achieved.  I particularly like 
the fact that Spark isn’t just a framework, but it provides you with simple 
tools with API convenience methods.  Some of those features are reduceByKey 
(mapReduce), sliding and aggregate sub time windows, etc.  Also, In my 
environment, I believe it’s going to be a great fit since we use Hadoop already 
and Spark should fit into that environment well.

You should look into both Storm and Spark Streaming, but in the end it just 
depends on your needs.  If you not looking for Streaming aspects, then Spark on 
Hadoop is a great option since Spark will cache the dataset in memory for all 
queries, which will be much faster than running Hive/Pig onto of Hadoop.  But 
I’m assuming you need some sort of Streaming system for data flow, but if it 
doesn’t need to be real-time or near real-time, you may want to simply look at 
Hadoop, which you could always use Spark ontop of for real-time queries.

Hope this helps…

Dan

 
On Jul 8, 2014, at 7:25 PM, Shao, Saisai saisai.s...@intel.com wrote:

 You may get the performance comparison results from Spark Streaming paper and 
 meetup ppt, just google it.
 Actually performance comparison is case by case and relies on your work load 
 design, hardware and software configurations. There is no actual winner for 
 the whole scenarios.
  
 Thanks
 Jerry
  
 From: xichen_tju@126 [mailto:xichen_...@126.com] 
 Sent: Wednesday, July 09, 2014 9:17 AM
 To: user@spark.apache.org
 Subject: Spark Streaming and Storm
  
 hi all
 I am a newbie to Spark Streaming, and used Strom before.Have u test the 
 performance both of them and which one is better?
  
 xichen_tju@126



Re: reduceByKey Not Being Called by Spark Streaming

2014-07-03 Thread Dan H.
Hi All,

I was able to resolve this matter with a simple fix.  It seems that in order
to process a reduceByKey and the flat map operations at the same time, the
only way to resolve was to increase the number of threads to  1.

Since I'm developing on my personal machine for speed, I simply updated the
sparkURL argument to:
   private static String sparkURL = local[2];  //Instead of local

,which is then used by the JavaStreamingContext method as a parameter.

After I made this change, I was able to see the reduceByKey values properly
aggregated and counted.

Best Regards,

D



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-Not-Being-Called-by-Spark-Streaming-tp8684p8739.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


reduceByKey Not Being Called by Spark Streaming

2014-07-02 Thread Dan H.
Hi all, I recently just picked up Spark and am trying to work through a
coding issue that involves the reduceByKey method.  After various debugging
efforts, it seems that the reducyByKey method never gets called.

Here's my workflow, which is followed by my code and results:

My parsed data contains three fields (val1 val2 val3), separated by spaces. 
Where I place the data into an JavaDStream using .flatmap.

From there I am inspecting the 3rd value (val3) and placing the entire
string into this RDDStream bucket, since I need the correlated values for
the record.  (Later, I'll be expanding judiciously to filter all of the
values I need into multiple buckets/filters).  If I get the match I'm
looking for, I add an Integer onto the values String, so I end up (val1 val2
val3, 1) to be used by my reduceByKey method.

From there, I am calling  reduceByKey method, but it doesn't seem to ever
get called.  (although I am calling the print action on the RDD).  However,
I can see the spark print statements showing data is being consumed through
my Kafka implementation, but I never get a value to print.

Any ideas on what is going on here?

===
Here's the important snippets of the code I've implemented...

//Create a Kafka Dstream broker
JavaPairDStreamString, String messages =
KafkaUtils.createStream(jsc, zkQuorum, group, topicMap);

//get the data stream into an RDD 
JavaDStreamString lines = messages.map(new FunctionTuple2lt;String,
String, String() {
  public String call(Tuple2String, String tuple2) {
  return tuple2._2();
  }
});


//Parse out the data and place into new RDD
JavaDStreamString words = lines.flatMap(new FlatMapFunctionString,
String() {
public IterableString call(String x) {

/*
 * let's create singleton object for our Kafka Parser class
 */
KafkaParser kpObj = new KafkaParser();

ListString lastList = new ArrayListString();

lastList =  kpObj.getProcessingValues(
kpObj.myKeyToValue(
 kpObj.myFillParser(x)));

return kpObj.getProcessingValues(
kpObj.myKeyToValue(
 kpObj.myFillParser(x)));   
}
});

// Filter each response code into it's own rdd
JavaDStreamString responseCode3xxMap = words.flatMap(new
FlatMapFunctionString, String() {
  public IterableString call(String s) {
  
  Pattern regex = Pattern.compile(\\s[3][0-9][0-9]);
  Matcher regexMatcher;
  ListString properRec = new ArrayListString();
  regexMatcher = regex.matcher(s);
  
  //pull out all 3xx response codes
  while (regexMatcher.find()) {
  if (regexMatcher.group() != null) {
 properRec.add(s);
 } 
  }
  return properRec;
  }
});

JavaPairDStreamString, Integer responseCode3xxPairs =
responseCode3xxMap.mapToPair(
new PairFunctionString, String, Integer() {
public Tuple2String, Integer call(String s) {

Tuple2String, Integer myTuple = new Tuple2String, 
Integer(s, 1);
return myTuple; }
});

//THIS NEVER SEEMS TO GET CALLED??
JavaPairDStreamString, Integer responseCode3xxCounts =
responseCode3xxPairs.reduceByKey(
new Function2Integer, Integer, Integer() {
public Integer call(Integer a, Integer b) throws Exception {

Integer myCount = a + b;
return myCount;
}
});


responseCode3xxMap.print();
responseCode3xxPairs.print();
responseCode3xxCounts.print();
jsc.start();
jsc.awaitTermination();

===
And the basic results...

---
Time: 140434164 ms
---
12345 1.111.111.111 304
...

---
Time: 140434164 ms
---
(12345 1.111.111.111 304,1)
...


While this executes, and add in new data, I'm able to witness new data being
received, but it never seems to get processed / printed...

14/07/02 16:54:00 INFO scheduler.ReceiverTracker: Stream 0 received 8 blocks
14/07/02 16:54:00 INFO scheduler.JobScheduler: Added jobs for time
140434164 ms
14/07/02 16:54:10 INFO scheduler.ReceiverTracker: