Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-11 Thread Michael Armbrust
Hmm yeah that does look wrong.  Would be great if someone opened a PR to
correct the docs :)

On Thu, May 10, 2018 at 5:13 PM Yuta Morisawa 
wrote:

> The problem is solved.
> The actual schema of Kafka message is different from documentation.
>
>
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> The documentation says the format of "timestamp" column is Long type,
> but the actual format is timestamp.
>
>
> The followings are my code and result to check schema.
>
> -code
> val df = spark
>.read
>.format("kafka")
>.option("kafka.bootstrap.servers", bootstrapServers)
>.option(subscribeType, topics)
>.load()
>.printSchema()
>
> -result
> root
>   |-- key: binary (nullable = true)
>   |-- value: binary (nullable = true)
>   |-- topic: string (nullable = true)
>   |-- partition: integer (nullable = true)
>   |-- offset: long (nullable = true)
>   |-- timestamp: timestamp (nullable = true)
>   |-- timestampType: integer (nullable = true)
>
>
> Regards,
> Yuta
>
> On 2018/05/09 16:14, Yuta Morisawa wrote:
> > Hi All
> >
> > I'm trying to extract Kafka-timestamp from Kafka topics.
> >
> > The timestamp does not contain milli-seconds information,
> > but it should contain because ConsumerRecord class of Kafka 0.10
> > supports milli-second timestamp.
> >
> > How can I get milli-second timestamp from Kafka topics?
> >
> >
> > These are websites I refer to.
> >
> >
> https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
> >
> >
> >
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html
> >
> >
> >
> > And this is my code.
> > 
> > val df = spark
> >.readStream
> >.format("kafka")
> >.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
> >.option("subscribe", "topic1,topic2")
> >.load()
> >.selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
> >.as[(Long, String)]
> > 
> >
> > Regards,
> > Yuta
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-10 Thread Yuta Morisawa

The problem is solved.
The actual schema of Kafka message is different from documentation.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

The documentation says the format of "timestamp" column is Long type, 
but the actual format is timestamp.



The followings are my code and result to check schema.

-code
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .printSchema()

-result
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


Regards,
Yuta

On 2018/05/09 16:14, Yuta Morisawa wrote:

Hi All

I'm trying to extract Kafka-timestamp from Kafka topics.

The timestamp does not contain milli-seconds information,
but it should contain because ConsumerRecord class of Kafka 0.10 
supports milli-second timestamp.


How can I get milli-second timestamp from Kafka topics?


These are websites I refer to.

https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html 



https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html 




And this is my code.

val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1,topic2")
   .load()
   .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
   .as[(Long, String)]


Regards,
Yuta


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-09 Thread Yuta Morisawa

Hi All

I'm trying to extract Kafka-timestamp from Kafka topics.

The timestamp does not contain milli-seconds information,
but it should contain because ConsumerRecord class of Kafka 0.10 
supports milli-second timestamp.


How can I get milli-second timestamp from Kafka topics?


These are websites I refer to.

https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html

https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html


And this is my code.

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
  .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
  .as[(Long, String)]


Regards,
Yuta


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org