Thanks for for following up!  I've linked the relevant tickets to
SPARK-18057 <https://issues.apache.org/jira/browse/SPARK-18057> and I
targeted it for Spark 2.2.

On Sat, Jan 28, 2017 at 10:15 AM, Koert Kuipers <ko...@tresata.com> wrote:

> there was also already an existing spark ticket for this:
> SPARK-18779 <https://issues.apache.org/jira/browse/SPARK-18779>
>
> On Sat, Jan 28, 2017 at 1:13 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> it seems the bug is:
>> https://issues.apache.org/jira/browse/KAFKA-4547
>>
>> i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or
>> 0.10.1.1
>>
>> On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> in case anyone else runs into this:
>>>
>>> the issue is that i was using kafka-clients 0.10.1.1
>>>
>>> it works when i use kafka-clients 0.10.0.1 with spark structured
>>> streaming
>>>
>>> my kafka server is 0.10.1.1
>>>
>>> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>>
>>>> i checked my topic. it has 5 partitions but all the data is written to
>>>> a single partition: wikipedia-2
>>>> i turned on debug logging and i see this:
>>>>
>>>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
>>>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
>>>> wikipedia-1]. Seeking to the end.
>>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>>> partition wikipedia-0
>>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>>> partition wikipedia-4
>>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>>> partition wikipedia-3
>>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>>> partition wikipedia-2
>>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>>> partition wikipedia-1
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-0 to latest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=0} for partition wikipedia-0
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-0 to earliest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=0} for partition wikipedia-0
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-4 to latest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=0} for partition wikipedia-4
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-4 to earliest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=0} for partition wikipedia-4
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-3 to latest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
>>>> successful heartbeat response for group spark-kafka-source-fac4f749-fd
>>>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=0} for partition wikipedia-3
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-3 to earliest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=0} for partition wikipedia-3
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-2 to latest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=152908} for partition wikipedia-2
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-2 to earliest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=0} for partition wikipedia-2
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>>> partition wikipedia-1 to latest offset.
>>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>>> offset=0} for partition wikipedia-1
>>>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
>>>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
>>>> wikipedia-3 -> 0, wikipedia-0 -> 0)
>>>>
>>>> what is confusing to me is this:
>>>> Resetting offset for partition wikipedia-2 to latest offset.
>>>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
>>>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
>>>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>>>>
>>>> why does it find latest offset 152908 for wikipedia-2 but then sets
>>>> latest offset to 0 for that partition? or am i misunderstanding?
>>>>
>>>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> code:
>>>>>       val query = spark.readStream
>>>>>         .format("kafka")
>>>>>         .option("kafka.bootstrap.servers", "somenode:9092")
>>>>>         .option("subscribe", "wikipedia")
>>>>>         .load
>>>>>         .select(col("value") cast StringType)
>>>>>         .writeStream
>>>>>         .format("console")
>>>>>         .outputMode(OutputMode.Append)
>>>>>         .start()
>>>>>
>>>>>       while (true) {
>>>>>         Thread.sleep(10000)
>>>>>         println(query.lastProgress)
>>>>>       }
>>>>>     }
>>>>>
>>>>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <
>>>>> alons...@gmail.com> wrote:
>>>>>
>>>>>> lets see the code...
>>>>>>
>>>>>> Alonso Isidoro Roman
>>>>>> [image: https://]about.me/alonso.isidoro.roman
>>>>>>
>>>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>>>>
>>>>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>:
>>>>>>
>>>>>>> my little program prints out query.lastProgress every 10 seconds,
>>>>>>> and this is what it shows:
>>>>>>>
>>>>>>> {
>>>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>>>   "name" : "wiki",
>>>>>>>   "timestamp" : "2017-01-26T22:54:45.732Z",
>>>>>>>   "numInputRows" : 0,
>>>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>>>   "durationMs" : {
>>>>>>>     "getOffset" : 9,
>>>>>>>     "triggerExecution" : 10
>>>>>>>   },
>>>>>>>   "stateOperators" : [ ],
>>>>>>>   "sources" : [ {
>>>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>>>     "startOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "endOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "numInputRows" : 0,
>>>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>>>     "processedRowsPerSecond" : 0.0
>>>>>>>   } ],
>>>>>>>   "sink" : {
>>>>>>>     "description" : "org.apache.spark.sql.executio
>>>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>>>   }
>>>>>>> }
>>>>>>> {
>>>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>>>   "name" : "wiki",
>>>>>>>   "timestamp" : "2017-01-26T22:54:55.745Z",
>>>>>>>   "numInputRows" : 0,
>>>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>>>   "durationMs" : {
>>>>>>>     "getOffset" : 5,
>>>>>>>     "triggerExecution" : 5
>>>>>>>   },
>>>>>>>   "stateOperators" : [ ],
>>>>>>>   "sources" : [ {
>>>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>>>     "startOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "endOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "numInputRows" : 0,
>>>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>>>     "processedRowsPerSecond" : 0.0
>>>>>>>   } ],
>>>>>>>   "sink" : {
>>>>>>>     "description" : "org.apache.spark.sql.executio
>>>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>>>   }
>>>>>>> }
>>>>>>> {
>>>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>>>   "name" : "wiki",
>>>>>>>   "timestamp" : "2017-01-26T22:55:05.748Z",
>>>>>>>   "numInputRows" : 0,
>>>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>>>   "durationMs" : {
>>>>>>>     "getOffset" : 5,
>>>>>>>     "triggerExecution" : 5
>>>>>>>   },
>>>>>>>   "stateOperators" : [ ],
>>>>>>>   "sources" : [ {
>>>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>>>     "startOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "endOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "numInputRows" : 0,
>>>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>>>     "processedRowsPerSecond" : 0.0
>>>>>>>   } ],
>>>>>>>   "sink" : {
>>>>>>>     "description" : "org.apache.spark.sql.executio
>>>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>>>   }
>>>>>>> }
>>>>>>> {
>>>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>>>   "name" : "wiki",
>>>>>>>   "timestamp" : "2017-01-26T22:55:15.758Z",
>>>>>>>   "numInputRows" : 0,
>>>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>>>   "durationMs" : {
>>>>>>>     "getOffset" : 4,
>>>>>>>     "triggerExecution" : 4
>>>>>>>   },
>>>>>>>   "stateOperators" : [ ],
>>>>>>>   "sources" : [ {
>>>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>>>     "startOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "endOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "numInputRows" : 0,
>>>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>>>     "processedRowsPerSecond" : 0.0
>>>>>>>   } ],
>>>>>>>   "sink" : {
>>>>>>>     "description" : "org.apache.spark.sql.executio
>>>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>>>   }
>>>>>>> }
>>>>>>> {
>>>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>>>   "name" : "wiki",
>>>>>>>   "timestamp" : "2017-01-26T22:55:25.760Z",
>>>>>>>   "numInputRows" : 0,
>>>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>>>   "durationMs" : {
>>>>>>>     "getOffset" : 4,
>>>>>>>     "triggerExecution" : 4
>>>>>>>   },
>>>>>>>   "stateOperators" : [ ],
>>>>>>>   "sources" : [ {
>>>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>>>     "startOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "endOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "numInputRows" : 0,
>>>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>>>     "processedRowsPerSecond" : 0.0
>>>>>>>   } ],
>>>>>>>   "sink" : {
>>>>>>>     "description" : "org.apache.spark.sql.executio
>>>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>>>   }
>>>>>>> }
>>>>>>> {
>>>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>>>   "name" : "wiki",
>>>>>>>   "timestamp" : "2017-01-26T22:55:35.766Z",
>>>>>>>   "numInputRows" : 0,
>>>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>>>   "durationMs" : {
>>>>>>>     "getOffset" : 4,
>>>>>>>     "triggerExecution" : 4
>>>>>>>   },
>>>>>>>   "stateOperators" : [ ],
>>>>>>>   "sources" : [ {
>>>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>>>     "startOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "endOffset" : {
>>>>>>>       "wikipedia" : {
>>>>>>>         "2" : 0,
>>>>>>>         "4" : 0,
>>>>>>>         "1" : 0,
>>>>>>>         "3" : 0,
>>>>>>>         "0" : 0
>>>>>>>       }
>>>>>>>     },
>>>>>>>     "numInputRows" : 0,
>>>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>>>     "processedRowsPerSecond" : 0.0
>>>>>>>   } ],
>>>>>>>   "sink" : {
>>>>>>>     "description" : "org.apache.spark.sql.executio
>>>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> hey,
>>>>>>>> i am just getting started with kafka + spark structured streaming.
>>>>>>>> so this is probably a pretty dumb mistake.
>>>>>>>>
>>>>>>>> i wrote a little program in spark to read messages from a kafka
>>>>>>>> topic and display them in the console, using the kafka source and 
>>>>>>>> console
>>>>>>>> sink. i run it it in spark local mode.
>>>>>>>>
>>>>>>>> i hooked it up to a test topic that i send messages to using the
>>>>>>>> kafka console producer, and everything works great. i type a message 
>>>>>>>> in the
>>>>>>>> console producer, and it pops up in my spark program. very neat!
>>>>>>>>
>>>>>>>> next i point it to another topic instead on which a kafka-connect
>>>>>>>> program is writing lots of irc messages. i can see kafka connect to the
>>>>>>>> topic successfully, the partitions are discovered etc., and then...
>>>>>>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the 
>>>>>>>> same
>>>>>>>> time in another terminal i can see messages coming in just fine using 
>>>>>>>> the
>>>>>>>> kafka console consumer.
>>>>>>>>
>>>>>>>> i dont get it. why doesnt kafka want to consume from this topic in
>>>>>>>> spark structured streaming?
>>>>>>>>
>>>>>>>> thanks! koert
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to