[
https://issues.apache.org/jira/browse/SPARK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698676#comment-16698676
]
Jungtaek Lim commented on SPARK-26167:
--------------------------------------
[[email protected]]
Please close the issue if you can't reproduce with Spark 2.4.0. Thanks in
advance!
> No output created for aggregation query in append mode
> ------------------------------------------------------
>
> Key: SPARK-26167
> URL: https://issues.apache.org/jira/browse/SPARK-26167
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.2
> Reporter: dejan miljkovic
> Priority: Blocker
>
> For aggregation query in append mode not all outputs are produced for inputs
> with expired watermark. I have data in kafka that need to be reprocessed and
> results stored in S3. S3 works only with append mode. Problem is that only
> part of the data is written to S3. Code below illustrates the my approach.
> String windowDuration = "24 hours";
> String slideDuration = "15 minutes";
> Dataset<Row> sliding24h = rowData
> {{.withWatermark(eventTimeCol, slideDuration)
> .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
> col(nameCol))}}
> count();
>
> sliding24h .writeStream()
> .format("console")
> .option("truncate", false)
> .option("numRows", 1000)
> {{.outputMode(OutputMode.Append()) }}
> .start()
> {{.awaitTermination();}}
>
> Below is the example that shows the behavior. Code produces only empty Batch
> 0 in Append mode. Data is aggregated in 24 hour windows with 15 minute slide.
> Input data covers 84 hours. I think that code should produce all aggregated
> results expect for the last 15 minute interval.
>
> {color:#000080}public static void {color}main(String [] args)
> {color:#000080}throws {color}StreamingQueryException {
> SparkSession spark =
> SparkSession.builder().master({color:#008000}"local[*]"{color}).getOrCreate();
> ArrayList<String> rl = {color:#000080}new {color}ArrayList<>();
> {color:#000080}for {color}({color:#000080}int {color}i =
> {color:#0000ff}0{color}; i < {color:#0000ff}1000{color}; ++i) {
> {color:#000080}long {color}t = {color:#0000ff}1512164314L {color}+ i *
> {color:#0000ff}5 {color}* {color:#0000ff}60{color};
> rl.add(t + {color:#008000}",qwer"{color});
> }
> String nameCol = {color:#008000}"name"{color};
> String eventTimeCol = {color:#008000}"eventTime"{color};
> String eventTimestampCol = {color:#008000}"eventTimestamp"{color};
> MemoryStream<String> input = {color:#000080}new
> {color}MemoryStream<>({color:#0000ff}42{color}, spark.sqlContext(),
> Encoders.STRING());
> input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
> Dataset<Row> stream = input.toDF().selectExpr(
> {color:#008000}"cast(split(value,'[,]')[0] as long) as " {color}+
> eventTimestampCol,
> {color:#008000}"cast(split(value,'[,]')[1] as String) as " {color}+ nameCol);
> System.{color:#660e7a}out{color}.println({color:#008000}"isStreaming: "
> {color}+ stream.isStreaming());
> Column eventTime = functions.to_timestamp(col(eventTimestampCol));
> Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
> String windowDuration = {color:#008000}"24 hours"{color};
> String slideDuration = {color:#008000}"15 minutes"{color};
> Dataset<Row> sliding24h = rowData
> .withWatermark(eventTimeCol, slideDuration)
> .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
> col(nameCol)).count();
> sliding24h
> .writeStream()
> .format({color:#008000}"console"{color})
> .option({color:#008000}"truncate"{color}, {color:#000080}false{color})
> .option({color:#008000}"numRows"{color}, {color:#0000ff}1000{color})
> .outputMode(OutputMode.Append())
> {color:#808080}//.outputMode(OutputMode.Complete()){color} .start()
> .awaitTermination();
> }
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]