*Yes, my code is shown below* /** * input */ val logs = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", BROKER_SERVER) .option("subscribe", TOPIC) .option("startingOffset", "latest") .load()
/** * process */ val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] val events = logValues .map(parseFunction) .select( $"_1".alias("date").cast("timestamp"), $"_2".alias("uuid").cast("string") ) val results = events .withWatermark("date", "1 day") .dropDuplicates("uuid", "date") .groupBy($"date") .count() .withColumn("window", window(current_timestamp(), "15 minutes")) /** * output */ val query = results .writeStream .outputMode("update") .format("console") .option("truncate", "false") .trigger(Trigger.ProcessingTime("1 seconds")) .start() query.awaitTermination() *and I use play json to parse input logs from kafka ,the parse function is like* def parseFunction(str: String): (Long, String) = { val json = Json.parse(str) val timestamp = (json \ "time").get.toString().toLong val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 val uuid = (json \ "uuid").get.toString() (date, uuid) } Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道: > Can you show all the code? This works for me. > > On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote: > >> The spark version is 2.2.0 >> >> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道: >> >>> Which version of spark? >>> >>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote: >>> >>>> Thanks for reply, but using this method I got an exception: >>>> >>>> "Exception in thread "main" >>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic >>>> expressions are only allowed in >>>> >>>> Project, Filter, Aggregate or Window" >>>> >>>> Can you give more advice? >>>> >>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道: >>>> >>>>> import org.apache.spark.sql.functions._ >>>>> >>>>> df.withColumn("window", window(current_timestamp(), "15 minutes")) >>>>> >>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> In structured streaming how can I add a column to a dataset with >>>>>> current system time aligned with 15 minutes? >>>>>> >>>>>> Thanks. >>>>>> >>>>> >>>>> >>> >