What's the value of "hdfsCheckPointDir"? Could you list this directory on
HDFS and report the files there?

On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> -dev
>
> Have you tried clearing out the checkpoint directory?  Can you also give
> the full stack trace?
>
> On Wed, May 24, 2017 at 3:45 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Even if I do simple count aggregation like below I get the same error as
>> https://issues.apache.org/jira/browse/SPARK-19268
>>
>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
>> hours", "24 hours"), df1.col("AppName")).count();
>>
>>
>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>>> Kafka
>>>
>>> I am running into the same problem as https://issues.apache.org/jira
>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>
>>> Here is my sample code
>>>
>>> *Here is how I create ReadStream*
>>>
>>> sparkSession.readStream()
>>>                 .format("kafka")
>>>                 .option("kafka.bootstrap.servers", 
>>> config.getString("kafka.consumer.settings.bootstrapServers"))
>>>                 .option("subscribe", 
>>> config.getString("kafka.consumer.settings.topicName"))
>>>                 .option("startingOffsets", "earliest")
>>>                 .option("failOnDataLoss", "false")
>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>                 .load();
>>>
>>>
>>> *The core logic*
>>>
>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), 
>>> client.getSchema()).as("payload"));
>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", 
>>> "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>> StreamingQuery query = df1.writeStream().foreach(new 
>>> KafkaSink()).outputMode("update").start();
>>> query.awaitTermination();
>>>
>>>
>>> I can also provide any other information you may need.
>>>
>>> Thanks!
>>>
>>
>>
>

Reply via email to