Oh, just want to follow up on the question on AsyncStreamTask. The main consideration factor on setting a big task.max.concurrency value is the memory required.
And assuming your join function works like micro matches (i.e. buffer all input messages from all streams for a while, apply the join function, and discard all buffered messages), you can make it work w/o a local KV store. If any of your buffered message is not for the current join, but is needed for future join, you may have to use a local KV store to ensure correctness and no-dataloss across container restarts. -Yi On Thu, Oct 12, 2017 at 11:40 AM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Jef, > > What I suggest is exactly in-process, in-memory KV store. Samza has two > types of such built-in KVstores: in-memory and RocksDB. Both can be backed > by a changelog topic in Kafka as the failure recovery mechanism (i.e. if a > container fails, it can reseed the whole store from the changelog topic in > Kafka). Since they are built-in KV stores in Samza, there is no additional > external systems introduced in the solution. Here is the doc link for state > management in Samza: http://samza.apache.org/learn/documentation/0.13/ > container/state-management.html. Please read the section of "Local State". > > The Samza high-level API is not based on KStream API. It is supporting > bi-way join now, but as long as the join key are in the same partition the > result can be in memory and does not need to go through another persistent > topics. > > Best! > > -Yi > > On Thu, Oct 12, 2017 at 10:00 AM, Jef G <j...@dataminr.com> wrote: > >> Yi, thanks for your detailed reply! >> >> I believe what you are suggesting regarding a KV store is to set up a >> remote durable system to maintain the join state. That way if a node dies >> and Samza restarts the task on another node, the join state is still >> available. Is that correct? >> >> This approach is certainly an option. However, we were hoping to use an >> in-process in-memory KV store, as a remote store would introduce a lot of >> latency for us. In some cases we would have to make more than 100,000 >> round >> trips per second to the KV store for a single stream, and we would want to >> be able to scale beyond that. It also introduces some complexity and >> another point of failure. >> >> Regarding using AsyncStreamTask with a very large (100,000) >> task.max.concurrency, is that a bad idea? >> >> The high-level API is based on the KStream API, right? Our jobs will >> sometimes need to join as many as 20 input streams. I believe currently >> Samza (and KStream) only supports a binary join and if that is the case, >> we >> would need 19 binary joins. The KStream doc suggests that all intermediate >> results are persisted, so many chained joins might be very inefficient. If >> so we would prefer to use the "classic" API. >> >> -Jef >> >> On Wed, Oct 11, 2017 at 7:37 PM, Yi Pan <nickpa...@gmail.com> wrote: >> >> > Hi, Jef, >> > >> > I would recommend that you use a KV store to buffer the messages for >> join. >> > The logic would be more predictable and state is also durable. In >> > StreamTask.process(), you can do some pseudo code like below: >> > {code} >> > public void process(IncomingMessageEnvelope msg, MessageCollector >> > collector, TaskCoordinator coordinator) { >> > if (msg is from streamA) { >> > storeA.put(msg.key, msg); >> > } else { >> > storeB.put(msg.key, msg); >> > } >> > if (joinCondition is triggered) { >> > doJoin(storeA, storeB); >> > } >> > } >> > {code} >> > >> > Make sure that you configure storeA and storeB w/ changelog s.t. they >> can >> > be recovered. Then, you don't need to worry about the data loss, since >> > before the auto-checkpoint, your buffered messages are flushed to disk >> and >> > changelog via storeA and storeB. If you do not want to delete each and >> > every buffered message after join, you can set TTL for each store if you >> > are using RocksDB store. >> > >> > We are also actively working on build-in join operator in Samza >> high-level >> > APIs. The new high-level APIs are already released in Samza 0.13.1 with >> the >> > feature preview here: >> > http://samza.apache.org/startup/preview/#high-level-api. Feel free to >> take >> > a look and try it. We love to hear about feedbacks now. The current >> version >> > does not support durable state in join yet. We are actively working on >> > durable state support in he next release. Note that the high-level API >> is >> > still in early evolution and might change in the next two releases. >> > >> > Best! >> > >> > -Yi >> > >> > On Wed, Oct 11, 2017 at 1:56 PM, Jef G <j...@dataminr.com> wrote: >> > >> > > Hello. My team is looking into Samza for doing real-time processing. >> We >> > > would like to run a directed graph of jobs, where the records in each >> > job's >> > > input streams are joined on a common key. We have functionality to >> > perform >> > > the join by buffering records from the input streams until certain >> > > conditions are met and then passing them on. >> > > >> > > We are wondering about the best way to integrate this functionality >> into >> > a >> > > Samza job. After looking over the API we see two possibilities: >> > > >> > > 1. Use a StreamTask that adds records to a buffer. This is the method >> > that >> > > the "ad event" example uses. But we am concerned that the framework >> > commits >> > > a StreamTask's offset after process() completes, so if the job fails, >> > > records in the buffer are permanently lost. >> > > >> > > 2. Use an AsyncTask that adds records to a buffer. Also add >> TaskCallbacks >> > > to the buffer. When records are eventually joined and processed, >> commit >> > > their callbacks. This method seems promising but it requires setting >> > > task.max.concurrency very high - possibly in the tens of thousands in >> our >> > > case. Are we likely to run into any issues doing that? >> > > >> > > Are there any other options that we overlooked? What is the best >> > approach? >> > > >> > > -Jef G >> > > >> > >> >> >> >> -- >> Jef G >> Senior Data Scientist | Dataminr | dataminr.com >> 99 Madison Ave, 3rd Floor | New York, NY 10016 >> j...@dataminr.com >> > >