The input dataset has multiple days worth of data, so I thought the
watermark should have been crossed. To debug, I changed the query to the
code below. My expectation was that since I am doing 1 day windows with
late arrivals permitted for 1 second, when it sees records for the next
day, it would output a row for the previous day. When i run the code below
with 'complete' output mode, I see the table and then the lastProgress
output about 10 seconds later. When i run it with 'append' mode, the table
has no rows, but the lastProgress output is the same.

Is withWatermark ignored in 'complete' and 'update' modes?

For append mode, I'm not able to understand why the watermark entry is
still at 1970. "For a specific window starting at time T, the engine will
maintain state and allow late data to update the state until (max event
time seen by the engine - late threshold > T)". The table shows that we are
seeing events with timestamp  2017-05-18. At that point the 2017-05-17 window
can be closed and the row output, right?

grouped_df = frame \
    .withWatermark("timestamp", "1 second") \
    .groupby(F.window("timestamp", "1 day")) \
    .agg(F.max("timestamp"))

query = frame.writeStream \

    .format("console") \

    .option("truncate", False) \

    .option("checkpointLocation", CKPT_LOC) \
    .outputMode("complete") \
    .start()
import time
i = 0
while (i < 10):
    time.sleep(10)
    print(query.lastProgress)
    i += 1
query.awaitTermination()

The output:

+---------------------------------------------+-------------------+
|window                                       |max(timestamp)     |
+---------------------------------------------+-------------------+
|[2017-05-17 00:00:00.0,2017-05-18 00:00:00.0]|2017-05-17 23:59:59|
|[2017-05-15 00:00:00.0,2017-05-16 00:00:00.0]|2017-05-15 23:59:59|
|[2017-05-14 00:00:00.0,2017-05-15 00:00:00.0]|2017-05-14 23:59:59|
|[2017-05-16 00:00:00.0,2017-05-17 00:00:00.0]|2017-05-16 23:59:59|
|[2017-05-07 00:00:00.0,2017-05-08 00:00:00.0]|2017-05-07 23:59:59|
|[2017-05-19 00:00:00.0,2017-05-20 00:00:00.0]|2017-05-19 23:59:59|
|[2017-05-18 00:00:00.0,2017-05-19 00:00:00.0]|2017-05-18 23:59:59|
|[2017-05-20 00:00:00.0,2017-05-21 00:00:00.0]|2017-05-20 23:59:59|
|[2017-05-08 00:00:00.0,2017-05-09 00:00:00.0]|2017-05-08 23:59:59|
|[2017-05-10 00:00:00.0,2017-05-11 00:00:00.0]|2017-05-10 23:59:59|
|[2017-05-13 00:00:00.0,2017-05-14 00:00:00.0]|2017-05-13 23:59:57|
|[2017-05-21 00:00:00.0,2017-05-22 00:00:00.0]|2017-05-21 23:40:08|
|[2017-05-09 00:00:00.0,2017-05-10 00:00:00.0]|2017-05-09 23:59:59|
|[2017-05-12 00:00:00.0,2017-05-13 00:00:00.0]|2017-05-12 23:40:11|
|[2017-05-11 00:00:00.0,2017-05-12 00:00:00.0]|2017-05-11 23:59:59|
+---------------------------------------------+-------------------+{u'stateOperators':
[{u'numRowsTotal': 15, u'numRowsUpdated': 0}],

u'eventTime': {u'watermark': u'1970-01-01T00:00:00.000Z'},

u'name': None, u'timestamp': u'2017-08-15T18:13:54.381Z',

u'processedRowsPerSecond': 0.0, u'inputRowsPerSecond': 0.0,

u'numInputRows': 0, u'sources':

[{u'description': u'FileStreamSource[hdfs://some_ip/some_path]',

u'endOffset': {u'logOffset': 0}, u'processedRowsPerSecond': 0.0,
u'inputRowsPerSecond': 0.0,

u'numInputRows': 0, u'startOffset': {u'logOffset': 0}}], u'durationMs':

{u'getOffset': 57, u'triggerExecution': 57}, u'runId':
u'a5b75404-c774-49db-aac5-2592211417ca',

u'id': u'35ad86ec-f608-40b5-a48b-9507c82a87c8', u'sink':

{u'description':
u'org.apache.spark.sql.execution.streaming.ConsoleSink@7e4050cd'}}




On Mon, Aug 14, 2017 at 4:55 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> In append mode, the aggregation outputs a row only when the watermark has
> been crossed and the corresponding aggregate is *final*, that is, will not
> be updated any more.
> See http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#handling-late-data-and-watermarking
>
> On Mon, Aug 14, 2017 at 4:09 PM, Ashwin Raju <ther...@gmail.com> wrote:
>
>> Hi,
>>
>> I am running Spark 2.2 and trying out structured streaming. I have the
>> following code:
>>
>> from pyspark.sql import functions as F
>>
>> df=frame \
>>
>>     .withWatermark("timestamp","1 minute") \
>>
>>     .groupby(F.window("timestamp","1 day"),*groupby_cols) \
>>
>>     .agg(f.sum('bytes'))
>>
>> query = frame.writeStream \
>>
>> .format("console")
>>
>> .option("checkpointLocation", '\some\chkpoint')
>>
>> .outputMode("complete")
>>
>> .start()
>>
>>
>>
>> query.awaitTermination()
>>
>>
>>
>> It prints out a bunch of aggregated rows to console. When I run the same
>> query with outputMode("append") however, the output only has the column
>> names, no rows. I was originally trying to output to parquet, which only
>> supports append mode. I was seeing no data in my parquet files, so I
>> switched to console output to debug, then noticed this issue. Am I
>> misunderstanding something about how append mode works?
>>
>>
>> Thanks,
>>
>> Ashwin
>>
>>
>

Reply via email to