What's the value of "hdfsCheckPointDir"? Could you list this directory on HDFS and report the files there?
On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <mich...@databricks.com> wrote: > -dev > > Have you tried clearing out the checkpoint directory? Can you also give > the full stack trace? > > On Wed, May 24, 2017 at 3:45 PM, kant kodali <kanth...@gmail.com> wrote: > >> Even if I do simple count aggregation like below I get the same error as >> https://issues.apache.org/jira/browse/SPARK-19268 >> >> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 >> hours", "24 hours"), df1.col("AppName")).count(); >> >> >> On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi All, >>> >>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and >>> Kafka >>> >>> I am running into the same problem as https://issues.apache.org/jira >>> /browse/SPARK-19268 with my app(not KafkaWordCount). >>> >>> Here is my sample code >>> >>> *Here is how I create ReadStream* >>> >>> sparkSession.readStream() >>> .format("kafka") >>> .option("kafka.bootstrap.servers", >>> config.getString("kafka.consumer.settings.bootstrapServers")) >>> .option("subscribe", >>> config.getString("kafka.consumer.settings.topicName")) >>> .option("startingOffsets", "earliest") >>> .option("failOnDataLoss", "false") >>> .option("checkpointLocation", hdfsCheckPointDir) >>> .load(); >>> >>> >>> *The core logic* >>> >>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), >>> client.getSchema()).as("payload")); >>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*"); >>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", >>> "24 hours"), df1.col("AppName")).agg(sum("Amount")); >>> StreamingQuery query = df1.writeStream().foreach(new >>> KafkaSink()).outputMode("update").start(); >>> query.awaitTermination(); >>> >>> >>> I can also provide any other information you may need. >>> >>> Thanks! >>> >> >> >