Log4j files per spark job
Hey guys, Looking for a bit of help on logging. I trying to get Spark to write log4j logs per job within a Spark cluster. So for example, I'd like: $SPARK_HOME/logs/job1.log.x $SPARK_HOME/logs/job2.log.x And I want this on the driver and on the executor. I'm trying to accomplish this by using a log4j.properties file in each job resource, but isn't logging properly. How can I get job level log on the executor and driver? Thanks in advance for taking the time to respond. D -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Log4j-files-per-spark-job-tp22106.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Port Configuration
Hi all, I'm trying to lock down ALL Spark ports and have tried using spark-defaults.conf and via the sparkContext. (The example below was run in local[*] mode, but all attempts to run in local or spark-submit.sh on cluster via jar all result in the same results). My goal is to define all communication between the driver and worker to use 5-50006 and to not use any random ports. (all other ports, 7077, 4040, etc are ok). But, I am still seeing random ports being generated from akka and would like to define all ports for the spark application due to strict security that is needed. As part of my SparkContext, I've defined: (snippet) set(spark.driver.port, 50001). set(spark.fileserver.port, 50002). set(spark.broadcast.port, 50003). set(spark.replClassServer.port, 50004). set(spark.blockManager.port, 50005). set(spark.executor.port, 50006). And upon execution, I see the following being read into the UI correctly, but am still seeing random port assignments: . . . This is valid and what I'm expecting Remoting started; listening on addresses :[akka.tcp://spark@10.x.x.x:50001] Remoting: Remoting now listens on addresses: [akka.tcp://spark@10.x.x.x:50001] . . . This isn't being set ConnectionManager: Bound socket to port 54061 with id = ConnectionManagerId(10.x.x.x,54061) BlockManagerMaster: Trying to register BlockManager BlockManagerInfo: Registering block manager 10.x.x.x:54061 with 2.1 GB RAM BlockManagerMaster: Registered BlockManager HttpServer: Starting HTTP Server HttpBroadcast: Broadcast server started at http://10.x.x.x:54062 I defined a block manager port, but it simply isn't getting set, what else can I try to get this resolved? I can also see in the UI that are not being set properly. spark.fileserver.uri spark.httpBroadcast.uri Thanks for your time in reviewing/answering my post in advance. Regards, Dan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Port-Configuration-tp20839.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming Workflow Validation
I wanted to post for validation to understand if there is more efficient way to achieve my goal. I'm currently performing this flow for two distinct calculations executing in parallel: 1) Sum key/value pair, by using a simple witnessed count(apply 1 to a mapToPair() and then groupByKey() 2) Sum the actual values, in my key/value pair and transform the data so group properly by groupByKey() DataSource: RDDStream_in Workflow1: Generate DStream using flatmap() from input RDDStream_in, which splits data into: StringKey1, StringKey2, Value1_to_be_inspected Next I apply a filter() to pull the values I only want to see as witnessed...which creates a smaller DStream StringKey1, StringKey2, Value1_inspected I generate a PairDStream from mapToPair() from the previous step, providing a way to append a summable value yielding: StringKey1, StringKey2, Value1_inspected, to_be_summed_valueof 1 Next I apply the groupByKey() to the PairDStream get: StringKey1, StringKey2, Value1, summed_value by key/values Workflow 2: Generate DStream using flatmap() from input RDDStream_in, which splits data into: StringKey1, StringKey2, Value1_to_be_summed Next, I apply mapToPair() from the previous DStream, thus providing a way to sum the Value1 and remove the Value1 from the original StringKey, thus yielding: StringKey1, StringKey2, Value1_to_be_summed Next I apply the groupByKey() and I get: StringKey1, StringKey2, Value1_summed by Keys Are there more efficient approaches I should be considering, such as method.chaining or another technique to increase work flow efficiency? Thanks for your feedback in advance. DH -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Workflow-Validation-tp11677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Workflow Validation
Yes, thanks, I did in fact mean reduceByKey(), thus allowing the convenience method process the summation by key. Thanks for your feedback! DH -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Workflow-Validation-tp11677p11706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and Storm
Xichen_tju, I recently evaluated Storm for a period of months (using 2Us, 2.4GHz CPU, 24GBRAM with 3 servers) and was not able to achieve a realistic scale for my business domain needs. Storm is really only a framework, which allows you to put in code to do whatever it is you need for a distributed system…so it’s completely flexible and distributable, but it comes at a price. In Storm, the one of the biggest performance hits, came down to how the “acks” work within the tuple trees. You can have the framework default ack messages between spouts and/or bolts, but in the end, you most likely want to manage acks yourself, due to how much reliability you’re system will need (to replay messages…). All this means, is that if you don’t have massive amounts of data that you need to process within a few seconds, (which I do) then Storm may work well for you, but you’re performance will diminish as you add in more and more business rules (unless of course you add in more servers for processing). If you need to ingest at least 1GBps+, then you may want to reevaluate since you’re server scale may not mesh with you overall processing needs. I recently just started using Spark Streaming with Kafka and have been quite impressed at the performance level that’s being achieved. I particularly like the fact that Spark isn’t just a framework, but it provides you with simple tools with API convenience methods. Some of those features are reduceByKey (mapReduce), sliding and aggregate sub time windows, etc. Also, In my environment, I believe it’s going to be a great fit since we use Hadoop already and Spark should fit into that environment well. You should look into both Storm and Spark Streaming, but in the end it just depends on your needs. If you not looking for Streaming aspects, then Spark on Hadoop is a great option since Spark will cache the dataset in memory for all queries, which will be much faster than running Hive/Pig onto of Hadoop. But I’m assuming you need some sort of Streaming system for data flow, but if it doesn’t need to be real-time or near real-time, you may want to simply look at Hadoop, which you could always use Spark ontop of for real-time queries. Hope this helps… Dan On Jul 8, 2014, at 7:25 PM, Shao, Saisai saisai.s...@intel.com wrote: You may get the performance comparison results from Spark Streaming paper and meetup ppt, just google it. Actually performance comparison is case by case and relies on your work load design, hardware and software configurations. There is no actual winner for the whole scenarios. Thanks Jerry From: xichen_tju@126 [mailto:xichen_...@126.com] Sent: Wednesday, July 09, 2014 9:17 AM To: user@spark.apache.org Subject: Spark Streaming and Storm hi all I am a newbie to Spark Streaming, and used Strom before.Have u test the performance both of them and which one is better? xichen_tju@126
Re: reduceByKey Not Being Called by Spark Streaming
Hi All, I was able to resolve this matter with a simple fix. It seems that in order to process a reduceByKey and the flat map operations at the same time, the only way to resolve was to increase the number of threads to 1. Since I'm developing on my personal machine for speed, I simply updated the sparkURL argument to: private static String sparkURL = local[2]; //Instead of local ,which is then used by the JavaStreamingContext method as a parameter. After I made this change, I was able to see the reduceByKey values properly aggregated and counted. Best Regards, D -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-Not-Being-Called-by-Spark-Streaming-tp8684p8739.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
reduceByKey Not Being Called by Spark Streaming
Hi all, I recently just picked up Spark and am trying to work through a coding issue that involves the reduceByKey method. After various debugging efforts, it seems that the reducyByKey method never gets called. Here's my workflow, which is followed by my code and results: My parsed data contains three fields (val1 val2 val3), separated by spaces. Where I place the data into an JavaDStream using .flatmap. From there I am inspecting the 3rd value (val3) and placing the entire string into this RDDStream bucket, since I need the correlated values for the record. (Later, I'll be expanding judiciously to filter all of the values I need into multiple buckets/filters). If I get the match I'm looking for, I add an Integer onto the values String, so I end up (val1 val2 val3, 1) to be used by my reduceByKey method. From there, I am calling reduceByKey method, but it doesn't seem to ever get called. (although I am calling the print action on the RDD). However, I can see the spark print statements showing data is being consumed through my Kafka implementation, but I never get a value to print. Any ideas on what is going on here? === Here's the important snippets of the code I've implemented... //Create a Kafka Dstream broker JavaPairDStreamString, String messages = KafkaUtils.createStream(jsc, zkQuorum, group, topicMap); //get the data stream into an RDD JavaDStreamString lines = messages.map(new FunctionTuple2lt;String, String, String() { public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); //Parse out the data and place into new RDD JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { public IterableString call(String x) { /* * let's create singleton object for our Kafka Parser class */ KafkaParser kpObj = new KafkaParser(); ListString lastList = new ArrayListString(); lastList = kpObj.getProcessingValues( kpObj.myKeyToValue( kpObj.myFillParser(x))); return kpObj.getProcessingValues( kpObj.myKeyToValue( kpObj.myFillParser(x))); } }); // Filter each response code into it's own rdd JavaDStreamString responseCode3xxMap = words.flatMap(new FlatMapFunctionString, String() { public IterableString call(String s) { Pattern regex = Pattern.compile(\\s[3][0-9][0-9]); Matcher regexMatcher; ListString properRec = new ArrayListString(); regexMatcher = regex.matcher(s); //pull out all 3xx response codes while (regexMatcher.find()) { if (regexMatcher.group() != null) { properRec.add(s); } } return properRec; } }); JavaPairDStreamString, Integer responseCode3xxPairs = responseCode3xxMap.mapToPair( new PairFunctionString, String, Integer() { public Tuple2String, Integer call(String s) { Tuple2String, Integer myTuple = new Tuple2String, Integer(s, 1); return myTuple; } }); //THIS NEVER SEEMS TO GET CALLED?? JavaPairDStreamString, Integer responseCode3xxCounts = responseCode3xxPairs.reduceByKey( new Function2Integer, Integer, Integer() { public Integer call(Integer a, Integer b) throws Exception { Integer myCount = a + b; return myCount; } }); responseCode3xxMap.print(); responseCode3xxPairs.print(); responseCode3xxCounts.print(); jsc.start(); jsc.awaitTermination(); === And the basic results... --- Time: 140434164 ms --- 12345 1.111.111.111 304 ... --- Time: 140434164 ms --- (12345 1.111.111.111 304,1) ... While this executes, and add in new data, I'm able to witness new data being received, but it never seems to get processed / printed... 14/07/02 16:54:00 INFO scheduler.ReceiverTracker: Stream 0 received 8 blocks 14/07/02 16:54:00 INFO scheduler.JobScheduler: Added jobs for time 140434164 ms 14/07/02 16:54:10 INFO scheduler.ReceiverTracker: