Responses inline.

On Mon, Apr 20, 2015 at 3:27 PM, Ankit Patel <patel7...@hotmail.com> wrote:

> What you said is correct and I am expecting the printlns to be in my
> console or my SparkUI. I do not see it in either places.
>
Can you actually login into the machine running the executor which runs the
receiver? And then see the executors logs in that machine, to see whether
the expected onStart logs are there?


> However, if you run the program then the printlns do print for the
> constructor of the receiver and the for the foreach statements with total
> count 0.
>
That's expected. In all cases, with or without master, the receiver object
is constructed in the driver. With master, the receiver gets serialized and
sent to worker machine and rest of the prints go to the executor logs.


> When you run it in regular more with no master attached then you will see
> the counts being printed out in the console as well. Please compile my
> program and try it out, I have spent significant time on debugging where it
> can go wrong and could not find an answer.
>

As I said, checkout the executor logs in the worker machine running the
receiver. You can identify that machine from the streaming tab in the Spark
UI.


> I also see the starting receiver logs from spark when no master is
> defined, but do not see it when there is. Also, I am running some other
> simple code with spark-submit with printlns and I do see them in my
> SparkUI, but not for spark streaming.
>
That's expected. Same reason. Without master, received logs is in the same
driver process. With master, they go to executor logs.

As such, its not clear what you are trying to debug? If you just want to
see printlns, then this seems to be the expected behavior, nothing is
wrong. Use without master for convenient debugging (you can see all logs
and prints), and then run into distributed mode for actual deployment
(where its harder to see those logs and prints).

>
> Thanks,
> Ankit
>
> ------------------------------
> From: t...@databricks.com
> Date: Mon, 20 Apr 2015 13:29:31 -0700
> Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver
> attached to master with multiple workers
> To: patel7...@hotmail.com
> CC: ak...@sigmoidanalytics.com; user@spark.apache.org
>
>
> Well, the receiver always runs as part of an executor. When running
> locally (that is, spark-submit without --master), the executor is in the
> same process as the driver, so you see the printlns. If you are running
> with --master spark://cluster, then the executors ar running in different
> process and possibly different nodes. Hence you dont see the printlns in
> the output of driver process. If you see the the output of executorsin the
> Spark UI, then you may find those prints.
>
> TD
>
> On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel <patel7...@hotmail.com>
> wrote:
>
> The code I've written is simple as it just invokes a thread and calls a
> store method on the Receiver class.
>
>  I see this code with printlns working fine when I try spark-submit --jars
> <jar> --class test.TestCustomReceiver <jar>
>
> However it does not work with I try the same command above with --master
> spark://masterURL
> spark-submit --master spark://masterURL --jars <jar> --class
> test.TestCustomReceiver <jar>
>
> I also tried setting the master in the conf that I am created, but that
> does not work either. I do not see the onStart println being printed when I
> use --master option. Please advice.
>
> Also, the master I am attaching to has multiple workers across hosts with
> many threads available to it.
>
> The code is pasted below (Classes: TestReviever, TestCustomReceiver):
>
>
>
> package test;
> import org.apache.spark.storage.StorageLevel;
> import org.apache.spark.streaming.receiver.Receiver;
>
> public class TestReceiver extends Receiver<String> {
>
>          public TestReceiver() {
>            super(StorageLevel.MEMORY_ONLY());
>            System.out.println("Ankit: Created TestReceiver");
>          }
>
>          @Override
>          public void onStart() {
>                 System.out.println("Start TestReceiver");
>                 new TestThread().start();
>          }
>          public void onStop() {}  @SuppressWarnings("unused")
>
>        private class TestThread extends Thread{
>                   @Override
>                   public void run() {
>                            while(true){
>                                   try{
>                                          sleep( (long) (Math.random() *
> 3000));
>                                   }catch(Exception e){
>                                          e.printStackTrace();
>                                   }
>                                   store("Time: " +
> System.currentTimeMillis());
>                            }
>                      }
>          }
>
>        }
>
>
>
> package test;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> public class TestCustomReceiver {
>        public static void main(String[] args){
>               SparkConf conf = new SparkConf();
>               JavaStreamingContext ssc = new JavaStreamingContext(conf,
> new Duration(1000));
>               TestReceiver receiver = new TestReceiver();
>               JavaReceiverInputDStream<String> stream =
> ssc.receiverStream(receiver);
>               stream.map(new Function(){
>                      @Override
>                      public Object call(Object arg0) throws Exception {
>                            System.out.println("Received: " + arg0);
>                            return arg0;
>                      }
>               }).foreachRDD(new Function(){
>                      @Override
>                      public Object call(Object arg0) throws Exception {
>                            System.out.println("Total Count: " +
> ((org.apache.spark.api.java.JavaRDD)arg0).count());
>                            return arg0;
>                      }
>               });
>               ssc.start();
>               ssc.awaitTermination();
>        }
>
> }
>
>
> Thanks,
> Ankit
>
> ------------------------------
> Date: Mon, 20 Apr 2015 12:22:03 +0530
> Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver
> attached to master with multiple workers
> From: ak...@sigmoidanalytics.com
> To: patel7...@hotmail.com
> CC: user@spark.apache.org
>
> Would be good, if you can paste your custom receiver code and the code
> that you used to invoke it.
>
> Thanks
> Best Regards
>
> On Mon, Apr 20, 2015 at 9:43 AM, Ankit Patel <patel7...@hotmail.com>
> wrote:
>
>
> I am experiencing problem with SparkStreaming (Spark 1.2.0), the onStart
> method is never called on CustomReceiver when calling spark-submit against
> a master node with multiple workers. However, SparkStreaming works fine
> with no master node set. Anyone notice this issue?
>
>
>
>

Reply via email to