Yeah, kafka server client compatibility can be pretty confusing and does
not give good errors in the case of mismatches.  This should be addressed
in the next release of kafka (they are adding an API to query the servers
capabilities).

On Fri, Jan 27, 2017 at 12:56 PM, Koert Kuipers <ko...@tresata.com> wrote:

> in case anyone else runs into this:
>
> the issue is that i was using kafka-clients 0.10.1.1
>
> it works when i use kafka-clients 0.10.0.1 with spark structured streaming
>
> my kafka server is 0.10.1.1
>
> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i checked my topic. it has 5 partitions but all the data is written to a
>> single partition: wikipedia-2
>> i turned on debug logging and i see this:
>>
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
>> wikipedia-1]. Seeking to the end.
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
>> successful heartbeat response for group spark-kafka-source-fac4f749-fd
>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=152908} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-1 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
>> wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> what is confusing to me is this:
>> Resetting offset for partition wikipedia-2 to latest offset.
>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> why does it find latest offset 152908 for wikipedia-2 but then sets
>> latest offset to 0 for that partition? or am i misunderstanding?
>>
>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> code:
>>>       val query = spark.readStream
>>>         .format("kafka")
>>>         .option("kafka.bootstrap.servers", "somenode:9092")
>>>         .option("subscribe", "wikipedia")
>>>         .load
>>>         .select(col("value") cast StringType)
>>>         .writeStream
>>>         .format("console")
>>>         .outputMode(OutputMode.Append)
>>>         .start()
>>>
>>>       while (true) {
>>>         Thread.sleep(10000)
>>>         println(query.lastProgress)
>>>       }
>>>     }
>>>
>>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <
>>> alons...@gmail.com> wrote:
>>>
>>>> lets see the code...
>>>>
>>>> Alonso Isidoro Roman
>>>> [image: https://]about.me/alonso.isidoro.roman
>>>>
>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>>
>>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>:
>>>>
>>>>> my little program prints out query.lastProgress every 10 seconds, and
>>>>> this is what it shows:
>>>>>
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:54:45.732Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 9,
>>>>>     "triggerExecution" : 10
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:54:55.745Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 5,
>>>>>     "triggerExecution" : 5
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:55:05.748Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 5,
>>>>>     "triggerExecution" : 5
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:55:15.758Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 4,
>>>>>     "triggerExecution" : 4
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:55:25.760Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 4,
>>>>>     "triggerExecution" : 4
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:55:35.766Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 4,
>>>>>     "triggerExecution" : 4
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> hey,
>>>>>> i am just getting started with kafka + spark structured streaming. so
>>>>>> this is probably a pretty dumb mistake.
>>>>>>
>>>>>> i wrote a little program in spark to read messages from a kafka topic
>>>>>> and display them in the console, using the kafka source and console 
>>>>>> sink. i
>>>>>> run it it in spark local mode.
>>>>>>
>>>>>> i hooked it up to a test topic that i send messages to using the
>>>>>> kafka console producer, and everything works great. i type a message in 
>>>>>> the
>>>>>> console producer, and it pops up in my spark program. very neat!
>>>>>>
>>>>>> next i point it to another topic instead on which a kafka-connect
>>>>>> program is writing lots of irc messages. i can see kafka connect to the
>>>>>> topic successfully, the partitions are discovered etc., and then...
>>>>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the same
>>>>>> time in another terminal i can see messages coming in just fine using the
>>>>>> kafka console consumer.
>>>>>>
>>>>>> i dont get it. why doesnt kafka want to consume from this topic in
>>>>>> spark structured streaming?
>>>>>>
>>>>>> thanks! koert
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to