[
https://issues.apache.org/jira/browse/SPARK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
dejan miljkovic updated SPARK-26167:
------------------------------------
Description:
For aggregation query in append mode not all outputs are produced for inputs
with expired watermark. I have data in kafka that need to be reprocessed and
results stored in S3. S3 works only with append mode. Problem is that only part
of the data is written to S3. Code below illustrates the my approach.
{{String windowDuration = "24 hours"; }}
{{String slideDuration = "15 minutes"; }}
{{Dataset<Row> sliding24h = rowData }}
{{.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol))}}
{{.count(); }}
{{sliding24h .writeStream() }}
{{.format("console") }}
{{.option("truncate", false) }}
{{.option("numRows", 1000) }}
{{.outputMode(OutputMode.Append()) }}
{{ .start() }}
{{.awaitTermination();}}
Below is the example that shows the behavior. Code produces only empty Batch 0
in Append mode. Data is aggregated in 24 hour windows with 15 minute slide.
Input data covers 84 hours. I think that code should produce all aggregated
results expect for the last 15 minute interval.
{color:#000080}public static void {color}main(String [] args)
{color:#000080}throws {color}StreamingQueryException {
SparkSession spark =
SparkSession.builder().master({color:#008000}"local[*]"{color}).getOrCreate();
ArrayList<String> rl = {color:#000080}new {color}ArrayList<>();
{color:#000080}for {color}({color:#000080}int {color}i =
{color:#0000ff}0{color}; i < {color:#0000ff}1000{color}; ++i) {
{color:#000080}long {color}t = {color:#0000ff}1512164314L {color}+ i *
{color:#0000ff}5 {color}* {color:#0000ff}60{color};
rl.add(t + {color:#008000}",qwer"{color});
}
String nameCol = {color:#008000}"name"{color};
String eventTimeCol = {color:#008000}"eventTime"{color};
String eventTimestampCol = {color:#008000}"eventTimestamp"{color};
MemoryStream<String> input = {color:#000080}new
{color}MemoryStream<>({color:#0000ff}42{color}, spark.sqlContext(),
Encoders.STRING());
input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
Dataset<Row> stream = input.toDF().selectExpr(
{color:#008000}"cast(split(value,'[,]')[0] as long) as " {color}+
eventTimestampCol,
{color:#008000}"cast(split(value,'[,]')[1] as String) as " {color}+ nameCol);
System.{color:#660e7a}out{color}.println({color:#008000}"isStreaming: "
{color}+ stream.isStreaming());
Column eventTime = functions.to_timestamp(col(eventTimestampCol));
Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
String windowDuration = {color:#008000}"24 hours"{color};
String slideDuration = {color:#008000}"15 minutes"{color};
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format({color:#008000}"console"{color})
.option({color:#008000}"truncate"{color}, {color:#000080}false{color})
.option({color:#008000}"numRows"{color}, {color:#0000ff}1000{color})
.outputMode(OutputMode.Append())
{color:#808080}//.outputMode(OutputMode.Complete()){color} .start()
.awaitTermination();
}
was:
For aggregation query in append mode not all outputs are produced for inputs
with expired watermark. I have data in kafka that need to be reprocessed and
results stored in S3. S3 works only with append mode. Problem is that only part
of the data is written to S3.
Below is the example that illustrate the behavior. Code produces only empty
Batch 0 in Append mode. Data is aggregated in 24 hour windows with 15 minute
slide. Input data covers 84 hours. I think that code should produce all
aggregated results expect for the last 15 minute interval.
{color:#000080}public static void {color}main(String [] args)
{color:#000080}throws {color}StreamingQueryException {
SparkSession spark =
SparkSession.builder().master({color:#008000}"local[*]"{color}).getOrCreate();
ArrayList<String> rl = {color:#000080}new {color}ArrayList<>();
{color:#000080}for {color}({color:#000080}int {color}i =
{color:#0000ff}0{color}; i < {color:#0000ff}1000{color}; ++i) {
{color:#000080}long {color}t = {color:#0000ff}1512164314L {color}+ i *
{color:#0000ff}5 {color}* {color:#0000ff}60{color};
rl.add(t + {color:#008000}",qwer"{color});
}
String nameCol = {color:#008000}"name"{color};
String eventTimeCol = {color:#008000}"eventTime"{color};
String eventTimestampCol = {color:#008000}"eventTimestamp"{color};
MemoryStream<String> input = {color:#000080}new
{color}MemoryStream<>({color:#0000ff}42{color}, spark.sqlContext(),
Encoders.STRING());
input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
Dataset<Row> stream = input.toDF().selectExpr(
{color:#008000}"cast(split(value,'[,]')[0] as long) as " {color}+
eventTimestampCol,
{color:#008000}"cast(split(value,'[,]')[1] as String) as " {color}+ nameCol);
System.{color:#660e7a}out{color}.println({color:#008000}"isStreaming: "
{color}+ stream.isStreaming());
Column eventTime = functions.to_timestamp(col(eventTimestampCol));
Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
String windowDuration = {color:#008000}"24 hours"{color};
String slideDuration = {color:#008000}"15 minutes"{color};
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format({color:#008000}"console"{color})
.option({color:#008000}"truncate"{color}, {color:#000080}false{color})
.option({color:#008000}"numRows"{color}, {color:#0000ff}1000{color})
.outputMode(OutputMode.Append())
{color:#808080}//.outputMode(OutputMode.Complete())
{color} .start()
.awaitTermination();
}
> No output created for aggregation query in append mode
> ------------------------------------------------------
>
> Key: SPARK-26167
> URL: https://issues.apache.org/jira/browse/SPARK-26167
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.2
> Reporter: dejan miljkovic
> Priority: Blocker
>
> For aggregation query in append mode not all outputs are produced for inputs
> with expired watermark. I have data in kafka that need to be reprocessed and
> results stored in S3. S3 works only with append mode. Problem is that only
> part of the data is written to S3. Code below illustrates the my approach.
> {{String windowDuration = "24 hours"; }}
> {{String slideDuration = "15 minutes"; }}
> {{Dataset<Row> sliding24h = rowData }}
> {{.withWatermark(eventTimeCol, slideDuration)
> .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
> col(nameCol))}}
> {{.count(); }}
>
> {{sliding24h .writeStream() }}
> {{.format("console") }}
> {{.option("truncate", false) }}
> {{.option("numRows", 1000) }}
> {{.outputMode(OutputMode.Append()) }}
> {{ .start() }}
> {{.awaitTermination();}}
>
> Below is the example that shows the behavior. Code produces only empty Batch
> 0 in Append mode. Data is aggregated in 24 hour windows with 15 minute slide.
> Input data covers 84 hours. I think that code should produce all aggregated
> results expect for the last 15 minute interval.
>
> {color:#000080}public static void {color}main(String [] args)
> {color:#000080}throws {color}StreamingQueryException {
> SparkSession spark =
> SparkSession.builder().master({color:#008000}"local[*]"{color}).getOrCreate();
> ArrayList<String> rl = {color:#000080}new {color}ArrayList<>();
> {color:#000080}for {color}({color:#000080}int {color}i =
> {color:#0000ff}0{color}; i < {color:#0000ff}1000{color}; ++i) {
> {color:#000080}long {color}t = {color:#0000ff}1512164314L {color}+ i *
> {color:#0000ff}5 {color}* {color:#0000ff}60{color};
> rl.add(t + {color:#008000}",qwer"{color});
> }
> String nameCol = {color:#008000}"name"{color};
> String eventTimeCol = {color:#008000}"eventTime"{color};
> String eventTimestampCol = {color:#008000}"eventTimestamp"{color};
> MemoryStream<String> input = {color:#000080}new
> {color}MemoryStream<>({color:#0000ff}42{color}, spark.sqlContext(),
> Encoders.STRING());
> input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
> Dataset<Row> stream = input.toDF().selectExpr(
> {color:#008000}"cast(split(value,'[,]')[0] as long) as " {color}+
> eventTimestampCol,
> {color:#008000}"cast(split(value,'[,]')[1] as String) as " {color}+ nameCol);
> System.{color:#660e7a}out{color}.println({color:#008000}"isStreaming: "
> {color}+ stream.isStreaming());
> Column eventTime = functions.to_timestamp(col(eventTimestampCol));
> Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
> String windowDuration = {color:#008000}"24 hours"{color};
> String slideDuration = {color:#008000}"15 minutes"{color};
> Dataset<Row> sliding24h = rowData
> .withWatermark(eventTimeCol, slideDuration)
> .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
> col(nameCol)).count();
> sliding24h
> .writeStream()
> .format({color:#008000}"console"{color})
> .option({color:#008000}"truncate"{color}, {color:#000080}false{color})
> .option({color:#008000}"numRows"{color}, {color:#0000ff}1000{color})
> .outputMode(OutputMode.Append())
> {color:#808080}//.outputMode(OutputMode.Complete()){color} .start()
> .awaitTermination();
> }
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]