The implementation of the input-stream-to-iterator function in #2 is
incorrect. The function should be such that, when the hasNext is called on
the iterator, it should try to read from the buffered reader. If an object
(that is, line) can be read, then return it, otherwise block and wait for
data to be available. See the function used for socketTextStream
<https://github.com/apache/spark/blob/095b5182536a43e2ae738be93294ee5215d86581/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala>
.

Instead what your code is doing is try to read as much data from the input
stream in one shot (which may have very little data), and put it into the
list, and then return the iterator of the list. As soon as the list is
consumed (that is iterator.hashNext = false), the socket receiver assumes
that there is no more data and thus stops on its own.

Note to self - document this better.

TD


On Thu, Jul 10, 2014 at 2:47 AM, kytay <kaiyang....@gmail.com> wrote:

> Hi
>
> I am trying out a simple piece of code by writing my own JavaNetworkCount
> app to test out Spark Streaming
>
> So here is the 2 set of the codes.
>
>
> // #1
> JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1",
> 9999);
>
>
> // #2
> JavaReceiverInputDStream<String> lines = sctx.socketStream(
>                         "127.0.0.1",
>                         9999,
>                         new Function<InputStream, Iterable&lt;String>>() {
>
>                                         @Override
>                                         public Iterable<String>
> call(InputStream arg0) throws Exception {
>                                                 // TODO Auto-generated
> method stub
>                                                 if(arg0 != null)
>
> System.out.println("CALL is called...");
>
>                                                 BufferedReader reader =
> new BufferedReader(new
> InputStreamReader(arg0));
>                                                 ArrayList<String> list =
> new ArrayList<String>();
>                                                 while(reader.ready())
>                                                 {
>                                                         String linetext =
> reader.readLine();
>
> System.out.println(linetext);
>                                                         list.add(linetext);
>                                                 }
>
>                                                 if(list.size() > 0)
>
> System.out.println("ArrayList is not empty.");
>
>                                                 return list;
>                                         }
>
>                                 },
>                                 StorageLevel.MEMORY_AND_DISK_SER_2()
>                         );
>
> I am writing the #2 to test out some other issues that I am facing, where
> the text stream from the TCP host is not received, but this is not my first
> concern.
>
> What I am concern about is.
>
> Using .socketTextStream(), the code manage to keep a persistent connection
> to the TCP host, while for #2 code using .socketStream(), I am unable to
> maintain a persistent connection.
>
> The following is the log printed when I run #2
>
> 14/07/10 01:55:42 INFO ReceiverSupervisorImpl: Receiver started again
> 14/07/10 01:55:43 INFO SocketReceiver: Connected to 127.0.0.1:9999
> CALL is called...
> 14/07/10 01:55:43 INFO SocketReceiver: Stopped receiving
> 14/07/10 01:55:43 WARN ReceiverSupervisorImpl: Restarting receiver with
> delay 2000 ms: Retrying connecting to 127.0.0.1:9999
> 14/07/10 01:55:43 INFO SocketReceiver: Closed socket to 127.0.0.1:9999
> 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopping receiver with
> message: Restarting receiver with delay 2000ms: Retrying connecting to
> 127.0.0.1:9999:
> 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Called receiver onStop
> 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Deregistering receiver 0
> 14/07/10 01:55:43 ERROR ReceiverTracker: Deregistered receiver for stream
> 0:
> Restarting receiver with delay 2000ms: Retrying connecting to
> 127.0.0.1:9999
> 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopped receiver 0
> 14/07/10 01:55:45 INFO ReceiverTracker: Stream 0 received 0 blocks
> 14/07/10 01:55:45 INFO JobScheduler: Added jobs for time 1404984705000 ms
> 14/07/10 01:55:45 INFO ReceiverSupervisorImpl: Starting receiver again
>
> <iframe src="http://pastebin.com/embed_iframe.php?i=KVWEC1kU";
> style="border:none;width:100%"></iframe>
>
> I am very new to clustered computing, hadoop, spark, even streaming. So I
> may not get the entire concept right.
>
> So may I clarify, is there something in my #2 codes? am i able to achieve
> the same thing as what #1 is trying to do?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to