I am trying to implements some online algorithms based on structured
streaming currently.
My requirement is fetching only delta data at each trigger time from memory
and calculating and updating global variables at the same time.
Here are 2 points I found it's difficult:
1. with foreach writer, it is able to get appended data on each worker, but
it is impossible to pass and update global variables.
2. foreach writer only deal with one piece of data at a time, is there any
other plan for supporting mini-batch at a time?

I don't know if I am using it in the right way or is there any best
practice for implementing online algorithms using Spark.

Thanks.
Yanpeng

On Mon, Aug 21, 2017 at 10:05 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> What is your end goal?  Right now the foreach writer is the way to do
> arbitrary processing on the data produced by various output modes.
>
>
> On Sun, Aug 20, 2017 at 12:23 PM, Yanpeng Lin <ypeng...@gmail.com> wrote:
>
>> Hello,
>>
>> I am new to Spark.
>> It would be appreciated if anyone could help me understand how to get
>> appended data from structured streaming. According to the document
>> <https://spark.apache.org/docs/2.1.0/structured-streaming-programming-guide.html#basic-concepts>,
>> data stream could be treated as new rows appended to unbounded table. I
>> want to know besides writing out data to external storage to get appended
>> data only at every time, is there any other way to get appended data? like
>> from memory directly.
>>
>> Here is my case. I had a Kafka source keeping publish data to Spark with
>> `test` topic.
>>
>> val source = spark.readStream.format("kafka")
>>                              .option("kafka.bootstrap.servers",
>> "broker:9092")
>>                              .option("subscribe", "test")\
>>                              .load()
>>
>> I tried that write stream with format `memory` like the following:
>>
>> val query = source.writeStream.format("memory")
>>                               .trigger(ProcessingTime("3 seconds"))
>>                               .queryName("tests").outputMode
>> (OutputMode.Append).start()
>> spark.sql("select topic, value from tests")
>> The result table `tests` contains all data from the beginning of stream.
>> like
>>
>> Trigger Time,         Topic, Value
>> t1                     test,   1
>> t1                     test,   2
>> t2                     test,   3
>> t3                     test,   4
>>
>> By appended data I mean only the delta data after each trigger. For
>> example, after trigger time t1, rows of value 1 and 2 are newly appended.
>> After trigger time t2, row of value 3 will be treated as newly appended.
>> And after t3, row of value 4 could be fetched as newly appended.
>> I understand each appended data could be processed using `ForeachWriter`,
>> but if I want to fetch all newly appended data after any trigger time,
>> is there any way to do that directly from dataframe?
>>
>> Thanks!
>> Yanpeng
>>
>
>

Reply via email to