What is your end goal?  Right now the foreach writer is the way to do
arbitrary processing on the data produced by various output modes.

On Sun, Aug 20, 2017 at 12:23 PM, Yanpeng Lin <ypeng...@gmail.com> wrote:

> Hello,
>
> I am new to Spark.
> It would be appreciated if anyone could help me understand how to get
> appended data from structured streaming. According to the document
> <https://spark.apache.org/docs/2.1.0/structured-streaming-programming-guide.html#basic-concepts>,
> data stream could be treated as new rows appended to unbounded table. I
> want to know besides writing out data to external storage to get appended
> data only at every time, is there any other way to get appended data? like
> from memory directly.
>
> Here is my case. I had a Kafka source keeping publish data to Spark with
> `test` topic.
>
> val source = spark.readStream.format("kafka")
>                              .option("kafka.bootstrap.servers",
> "broker:9092")
>                              .option("subscribe", "test")\
>                              .load()
>
> I tried that write stream with format `memory` like the following:
>
> val query = source.writeStream.format("memory")
>                               .trigger(ProcessingTime("3 seconds"))
>                               .queryName("tests").outputMode
> (OutputMode.Append).start()
> spark.sql("select topic, value from tests")
> The result table `tests` contains all data from the beginning of stream.
> like
>
> Trigger Time,         Topic, Value
> t1                     test,   1
> t1                     test,   2
> t2                     test,   3
> t3                     test,   4
>
> By appended data I mean only the delta data after each trigger. For
> example, after trigger time t1, rows of value 1 and 2 are newly appended.
> After trigger time t2, row of value 3 will be treated as newly appended.
> And after t3, row of value 4 could be fetched as newly appended.
> I understand each appended data could be processed using `ForeachWriter`,
> but if I want to fetch all newly appended data after any trigger time,
> is there any way to do that directly from dataframe?
>
> Thanks!
> Yanpeng
>

Reply via email to