Re: Spark 2.3.0 Structured Streaming Kafka Timestamp
Hmm yeah that does look wrong. Would be great if someone opened a PR to correct the docs :) On Thu, May 10, 2018 at 5:13 PM Yuta Morisawawrote: > The problem is solved. > The actual schema of Kafka message is different from documentation. > > > https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html > > The documentation says the format of "timestamp" column is Long type, > but the actual format is timestamp. > > > The followings are my code and result to check schema. > > -code > val df = spark >.read >.format("kafka") >.option("kafka.bootstrap.servers", bootstrapServers) >.option(subscribeType, topics) >.load() >.printSchema() > > -result > root > |-- key: binary (nullable = true) > |-- value: binary (nullable = true) > |-- topic: string (nullable = true) > |-- partition: integer (nullable = true) > |-- offset: long (nullable = true) > |-- timestamp: timestamp (nullable = true) > |-- timestampType: integer (nullable = true) > > > Regards, > Yuta > > On 2018/05/09 16:14, Yuta Morisawa wrote: > > Hi All > > > > I'm trying to extract Kafka-timestamp from Kafka topics. > > > > The timestamp does not contain milli-seconds information, > > but it should contain because ConsumerRecord class of Kafka 0.10 > > supports milli-second timestamp. > > > > How can I get milli-second timestamp from Kafka topics? > > > > > > These are websites I refer to. > > > > > https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html > > > > > > > https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html > > > > > > > > And this is my code. > > > > val df = spark > >.readStream > >.format("kafka") > >.option("kafka.bootstrap.servers", "host1:port1,host2:port2") > >.option("subscribe", "topic1,topic2") > >.load() > >.selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)") > >.as[(Long, String)] > > > > > > Regards, > > Yuta > > > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark 2.3.0 Structured Streaming Kafka Timestamp
The problem is solved. The actual schema of Kafka message is different from documentation. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html The documentation says the format of "timestamp" column is Long type, but the actual format is timestamp. The followings are my code and result to check schema. -code val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .printSchema() -result root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) Regards, Yuta On 2018/05/09 16:14, Yuta Morisawa wrote: Hi All I'm trying to extract Kafka-timestamp from Kafka topics. The timestamp does not contain milli-seconds information, but it should contain because ConsumerRecord class of Kafka 0.10 supports milli-second timestamp. How can I get milli-second timestamp from Kafka topics? These are websites I refer to. https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html And this is my code. val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)") .as[(Long, String)] Regards, Yuta - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark 2.3.0 Structured Streaming Kafka Timestamp
Hi All I'm trying to extract Kafka-timestamp from Kafka topics. The timestamp does not contain milli-seconds information, but it should contain because ConsumerRecord class of Kafka 0.10 supports milli-second timestamp. How can I get milli-second timestamp from Kafka topics? These are websites I refer to. https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html And this is my code. val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)") .as[(Long, String)] Regards, Yuta - To unsubscribe e-mail: user-unsubscr...@spark.apache.org