[
https://issues.apache.org/jira/browse/SPARK-30553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
bettermouse updated SPARK-30553:
--------------------------------
Description:
[http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking]
I write code according to this by java and scala.
java
{code:java}
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark =
SparkSession.builder().appName("test").master("local[*]")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate(); Dataset<Row> lines =
spark.readStream().format("socket")
.option("host", "skynet")
.option("includeTimestamp", true)
.option("port", 8888).load();
Dataset<Row> words = lines.select("timestamp", "value");
Dataset<Row> count = words.withWatermark("timestamp", "10 seconds")
.groupBy(functions.window(words.col("timestamp"), "10 seconds",
"10 seconds")
, words.col("value")).count();
StreamingQuery start = count.writeStream()
.outputMode("update")
.format("console").start();
start.awaitTermination(); }
{code}
scala
{code:java}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("test").
master("local[*]").
config("spark.sql.shuffle.partitions", 1)
.getOrCreate
import spark.implicits._
val lines = spark.readStream.format("socket").
option("host", "skynet").option("includeTimestamp", true).
option("port", 8888).load
val words = lines.select("timestamp", "value")
val count = words.withWatermark("timestamp", "10 seconds").
groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"value")
.count()
val start = count.writeStream.outputMode("update").format("console").start
start.awaitTermination()
}
{code}
This is according to official documents. written in Java I found metrics
"stateOnCurrentVersionSizeBytes" always increase .but scala is ok.
java
{code:java}
== Physical Plan ==
WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4176a001
+- *(4) HashAggregate(keys=[window#11, value#0], functions=[count(1)],
output=[window#11, value#0, count#10L])
+- StateStoreSave [window#11, value#0], state info [ checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions
= 1], Update, 1579274372624, 2
+- *(3) HashAggregate(keys=[window#11, value#0],
functions=[merge_count(1)], output=[window#11, value#0, count#21L])
+- StateStoreRestore [window#11, value#0], state info [ checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions
= 1], 2
+- *(2) HashAggregate(keys=[window#11, value#0],
functions=[merge_count(1)], output=[window#11, value#0, count#21L])
+- Exchange hashpartitioning(window#11, value#0, 1)
+- *(1) HashAggregate(keys=[window#11, value#0],
functions=[partial_count(1)], output=[window#11, value#0, count#21L])
+- *(1) Project [named_struct(start,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
LongType) - 0) as double) / 1.0E7)) as double) =
(cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as
double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) -
0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType,
TimestampType), end, precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
LongType) - 0) as double) / 1.0E7)) as double) =
(cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as
double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) -
0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType,
TimestampType)) AS window#11, value#0]
+- *(1) Filter isnotnull(timestamp#1)
+- EventTimeWatermark timestamp#1: timestamp,
interval 10 seconds
+- LocalTableScan <empty>, [timestamp#1, value#0]
{code}
scala
{code:java}
WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4149892c
+- *(4) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[count(1)],
output=[window#6-T10000ms, value#0, count#10L])
+- StateStoreSave [window#11-T10000ms, value#0], state info [ checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions
= 1], Update, 1579275214256, 2
+- *(3) HashAggregate(keys=[window#11-T10000ms, value#0],
functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
+- StateStoreRestore [window#11-T10000ms, value#0], state info [
checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions
= 1], 2
+- *(2) HashAggregate(keys=[window#11-T10000ms, value#0],
functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
+- Exchange hashpartitioning(window#11-T10000ms, value#0, 1)
+- *(1) HashAggregate(keys=[window#11-T10000ms, value#0],
functions=[partial_count(1)], output=[window#11-T10000ms, value#0, count#21L])
+- *(1) Project [named_struct(start,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms,
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) =
(cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType,
TimestampType), end, precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms,
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) =
(cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000),
LongType, TimestampType)) AS window#11-T10000ms, value#0]
+- *(1) Filter isnotnull(timestamp#1-T10000ms)
+- EventTimeWatermark timestamp#1: timestamp,
interval 10 seconds
+- LocalTableScan <empty>, [timestamp#1, value#0]
{code}
you also can debug in statefulOperators.scala
{code:java}
protected def removeKeysOlderThanWatermark(
storeManager: StreamingAggregationStateManager,
store: StateStore): Unit = {
if (watermarkPredicateForKeys.nonEmpty) {
storeManager.keys(store).foreach { keyRow =>
if (watermarkPredicateForKeys.get.eval(keyRow)) {
storeManager.remove(store, keyRow) //this line
}
}
}
}
}
{code}
you will find java does not remove old state.
I think java should write like this
{code:java}
SparkSession spark =
SparkSession.builder().appName("test").master("local[*]")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate(); Dataset<Row> lines =
spark.readStream().format("socket")
.option("host", "skynet")
.option("includeTimestamp",true)
.option("port", 8888).load();
Dataset<Row> words = lines.select("timestamp", "value");
Dataset<Row> wordsWatermark = words.withWatermark("timestamp", "10
seconds");
Dataset<Row> count = wordsWatermark
.groupBy(functions.window(wordsWatermark.col("timestamp"), "10
seconds", "10 seconds")
, wordsWatermark.col("value")).count();
StreamingQuery start = count.writeStream()
.outputMode("update")
.format("console").start();
start.awaitTermination(); }
{code}
was:
[http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking]
I write code according to this by java and scala.
java
{code:java}
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark =
SparkSession.builder().appName("test").master("local[*]")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate(); Dataset<Row> lines =
spark.readStream().format("socket")
.option("host", "skynet")
.option("includeTimestamp",true)
.option("port", 8888).load();
Dataset<Row> words = lines.select("timestamp", "value");
Dataset<Row> count = words.withWatermark("timestamp", "10 seconds")
.groupBy(functions.window(words.col("timestamp"), "10 seconds",
"10 seconds")
, words.col("value")).count();
StreamingQuery start = count.writeStream()
.outputMode("update")
.format("console").start();
start.awaitTermination(); }
{code}
scala
{code:java}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("test").
master("local[*]").
config("spark.sql.shuffle.partitions", 1)
.getOrCreate
import spark.implicits._
val lines = spark.readStream.format("socket").
option("host", "skynet").option("includeTimestamp", true).
option("port", 8888).load
val words = lines.select("timestamp", "value")
val count = words.withWatermark("timestamp", "10 seconds").
groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"value")
.count()
val start = count.writeStream.outputMode("update").format("console").start
start.awaitTermination()
}
{code}
This is according to official documents. written in Java I found metrics
"stateOnCurrentVersionSizeBytes" always increase .but scala is ok.
java
{code:java}
== Physical Plan ==
== Physical Plan ==
WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4176a001
+- *(4) HashAggregate(keys=[window#11, value#0], functions=[count(1)],
output=[window#11, value#0, count#10L])
+- StateStoreSave [window#11, value#0], state info [ checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions
= 1], Update, 1579274372624, 2
+- *(3) HashAggregate(keys=[window#11, value#0],
functions=[merge_count(1)], output=[window#11, value#0, count#21L])
+- StateStoreRestore [window#11, value#0], state info [ checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions
= 1], 2
+- *(2) HashAggregate(keys=[window#11, value#0],
functions=[merge_count(1)], output=[window#11, value#0, count#21L])
+- Exchange hashpartitioning(window#11, value#0, 1)
+- *(1) HashAggregate(keys=[window#11, value#0],
functions=[partial_count(1)], output=[window#11, value#0, count#21L])
+- *(1) Project [named_struct(start,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
LongType) - 0) as double) / 1.0E7)) as double) =
(cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as
double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) -
0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType,
TimestampType), end, precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
LongType) - 0) as double) / 1.0E7)) as double) =
(cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as
double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) -
0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType,
TimestampType)) AS window#11, value#0]
+- *(1) Filter isnotnull(timestamp#1)
+- EventTimeWatermark timestamp#1: timestamp,
interval 10 seconds
+- LocalTableScan <empty>, [timestamp#1, value#0]
{code}
scala
{code:java}
WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4149892c
+- *(4) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[count(1)],
output=[window#6-T10000ms, value#0, count#10L])
+- StateStoreSave [window#11-T10000ms, value#0], state info [ checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions
= 1], Update, 1579275214256, 2
+- *(3) HashAggregate(keys=[window#11-T10000ms, value#0],
functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
+- StateStoreRestore [window#11-T10000ms, value#0], state info [
checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions
= 1], 2
+- *(2) HashAggregate(keys=[window#11-T10000ms, value#0],
functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
+- Exchange hashpartitioning(window#11-T10000ms, value#0, 1)
+- *(1) HashAggregate(keys=[window#11-T10000ms, value#0],
functions=[partial_count(1)], output=[window#11-T10000ms, value#0, count#21L])
+- *(1) Project [named_struct(start,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms,
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) =
(cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType,
TimestampType), end, precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms,
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) =
(cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000),
LongType, TimestampType)) AS window#11-T10000ms, value#0]
+- *(1) Filter isnotnull(timestamp#1-T10000ms)
+- EventTimeWatermark timestamp#1: timestamp,
interval 10 seconds
+- LocalTableScan <empty>, [timestamp#1, value#0]
{code}
you also can debug in statefulOperators.scala
{code:java}
protected def removeKeysOlderThanWatermark(
storeManager: StreamingAggregationStateManager,
store: StateStore): Unit = {
if (watermarkPredicateForKeys.nonEmpty) {
storeManager.keys(store).foreach { keyRow =>
if (watermarkPredicateForKeys.get.eval(keyRow)) {
storeManager.remove(store, keyRow) //this line
}
}
}
}
}
{code}
you will find java does not remove old state.
I think java should write like this
{code:java}
SparkSession spark =
SparkSession.builder().appName("test").master("local[*]")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate(); Dataset<Row> lines =
spark.readStream().format("socket")
.option("host", "skynet")
.option("includeTimestamp",true)
.option("port", 8888).load();
Dataset<Row> words = lines.select("timestamp", "value");
Dataset<Row> wordsWatermark = words.withWatermark("timestamp", "10
seconds");
Dataset<Row> count = wordsWatermark
.groupBy(functions.window(wordsWatermark.col("timestamp"), "10
seconds", "10 seconds")
, wordsWatermark.col("value")).count();
StreamingQuery start = count.writeStream()
.outputMode("update")
.format("console").start();
start.awaitTermination(); }
{code}
> structured-streaming documentation java watermark group by
> -------------------------------------------------------------
>
> Key: SPARK-30553
> URL: https://issues.apache.org/jira/browse/SPARK-30553
> Project: Spark
> Issue Type: Bug
> Components: Documentation
> Affects Versions: 2.4.4
> Reporter: bettermouse
> Priority: Trivial
>
> [http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking]
> I write code according to this by java and scala.
> java
> {code:java}
> public static void main(String[] args) throws StreamingQueryException {
> SparkSession spark =
> SparkSession.builder().appName("test").master("local[*]")
> .config("spark.sql.shuffle.partitions", 1)
> .getOrCreate(); Dataset<Row> lines =
> spark.readStream().format("socket")
> .option("host", "skynet")
> .option("includeTimestamp", true)
> .option("port", 8888).load();
> Dataset<Row> words = lines.select("timestamp", "value");
> Dataset<Row> count = words.withWatermark("timestamp", "10 seconds")
> .groupBy(functions.window(words.col("timestamp"), "10
> seconds", "10 seconds")
> , words.col("value")).count();
> StreamingQuery start = count.writeStream()
> .outputMode("update")
> .format("console").start();
> start.awaitTermination(); }
> {code}
> scala
>
> {code:java}
> def main(args: Array[String]): Unit = {
> val spark = SparkSession.builder.appName("test").
> master("local[*]").
> config("spark.sql.shuffle.partitions", 1)
> .getOrCreate
> import spark.implicits._
> val lines = spark.readStream.format("socket").
> option("host", "skynet").option("includeTimestamp", true).
> option("port", 8888).load
> val words = lines.select("timestamp", "value")
> val count = words.withWatermark("timestamp", "10 seconds").
> groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"value")
> .count()
> val start = count.writeStream.outputMode("update").format("console").start
> start.awaitTermination()
> }
> {code}
> This is according to official documents. written in Java I found metrics
> "stateOnCurrentVersionSizeBytes" always increase .but scala is ok.
>
> java
>
> {code:java}
> == Physical Plan ==
> WriteToDataSourceV2
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4176a001
> +- *(4) HashAggregate(keys=[window#11, value#0], functions=[count(1)],
> output=[window#11, value#0, count#10L])
> +- StateStoreSave [window#11, value#0], state info [ checkpoint =
> file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
> runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5,
> numPartitions = 1], Update, 1579274372624, 2
> +- *(3) HashAggregate(keys=[window#11, value#0],
> functions=[merge_count(1)], output=[window#11, value#0, count#21L])
> +- StateStoreRestore [window#11, value#0], state info [ checkpoint =
> file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
> runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5,
> numPartitions = 1], 2
> +- *(2) HashAggregate(keys=[window#11, value#0],
> functions=[merge_count(1)], output=[window#11, value#0, count#21L])
> +- Exchange hashpartitioning(window#11, value#0, 1)
> +- *(1) HashAggregate(keys=[window#11, value#0],
> functions=[partial_count(1)], output=[window#11, value#0, count#21L])
> +- *(1) Project [named_struct(start,
> precisetimestampconversion(((((CASE WHEN
> (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
> LongType) - 0) as double) / 1.0E7)) as double) =
> (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0)
> as double) / 1.0E7)) THEN
> (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType)
> - 0) as double) / 1.0E7)) + 1) ELSE
> CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType)
> - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType,
> TimestampType), end, precisetimestampconversion(((((CASE WHEN
> (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
> LongType) - 0) as double) / 1.0E7)) as double) =
> (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0)
> as double) / 1.0E7)) THEN
> (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType)
> - 0) as double) / 1.0E7)) + 1) ELSE
> CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType)
> - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType,
> TimestampType)) AS window#11, value#0]
> +- *(1) Filter isnotnull(timestamp#1)
> +- EventTimeWatermark timestamp#1: timestamp,
> interval 10 seconds
> +- LocalTableScan <empty>, [timestamp#1,
> value#0]
> {code}
>
>
> scala
>
>
> {code:java}
> WriteToDataSourceV2
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4149892c
> +- *(4) HashAggregate(keys=[window#11-T10000ms, value#0],
> functions=[count(1)], output=[window#6-T10000ms, value#0, count#10L])
> +- StateStoreSave [window#11-T10000ms, value#0], state info [ checkpoint =
> file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
> runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7,
> numPartitions = 1], Update, 1579275214256, 2
> +- *(3) HashAggregate(keys=[window#11-T10000ms, value#0],
> functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
> +- StateStoreRestore [window#11-T10000ms, value#0], state info [
> checkpoint =
> file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
> runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7,
> numPartitions = 1], 2
> +- *(2) HashAggregate(keys=[window#11-T10000ms, value#0],
> functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
> +- Exchange hashpartitioning(window#11-T10000ms, value#0, 1)
> +- *(1) HashAggregate(keys=[window#11-T10000ms, value#0],
> functions=[partial_count(1)], output=[window#11-T10000ms, value#0, count#21L])
> +- *(1) Project [named_struct(start,
> precisetimestampconversion(((((CASE WHEN
> (cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms,
> TimestampType, LongType) - 0) as double) / 1.0E7)) as double) =
> (cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
> LongType) - 0) as double) / 1.0E7)) THEN
> (CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
> LongType) - 0) as double) / 1.0E7)) + 1) ELSE
> CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
> LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType,
> TimestampType), end, precisetimestampconversion(((((CASE WHEN
> (cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms,
> TimestampType, LongType) - 0) as double) / 1.0E7)) as double) =
> (cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
> LongType) - 0) as double) / 1.0E7)) THEN
> (CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
> LongType) - 0) as double) / 1.0E7)) + 1) ELSE
> CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType,
> LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000),
> LongType, TimestampType)) AS window#11-T10000ms, value#0]
> +- *(1) Filter isnotnull(timestamp#1-T10000ms)
> +- EventTimeWatermark timestamp#1: timestamp,
> interval 10 seconds
> +- LocalTableScan <empty>, [timestamp#1,
> value#0]
> {code}
>
> you also can debug in statefulOperators.scala
> {code:java}
> protected def removeKeysOlderThanWatermark(
> storeManager: StreamingAggregationStateManager,
> store: StateStore): Unit = {
> if (watermarkPredicateForKeys.nonEmpty) {
> storeManager.keys(store).foreach { keyRow =>
> if (watermarkPredicateForKeys.get.eval(keyRow)) {
> storeManager.remove(store, keyRow) //this line
> }
> }
> }
> }
> }
> {code}
> you will find java does not remove old state.
> I think java should write like this
> {code:java}
> SparkSession spark =
> SparkSession.builder().appName("test").master("local[*]")
> .config("spark.sql.shuffle.partitions", 1)
> .getOrCreate(); Dataset<Row> lines =
> spark.readStream().format("socket")
> .option("host", "skynet")
> .option("includeTimestamp",true)
> .option("port", 8888).load();
> Dataset<Row> words = lines.select("timestamp", "value");
> Dataset<Row> wordsWatermark = words.withWatermark("timestamp", "10
> seconds");
> Dataset<Row> count = wordsWatermark
> .groupBy(functions.window(wordsWatermark.col("timestamp"),
> "10 seconds", "10 seconds")
> , wordsWatermark.col("value")).count();
> StreamingQuery start = count.writeStream()
> .outputMode("update")
> .format("console").start();
> start.awaitTermination(); }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]