Yi, thank you for your detailed and informative replies. -Jef
On Thu, Oct 12, 2017 at 5:49 PM, Yi Pan <nickpa...@gmail.com> wrote: > Oh, just want to follow up on the question on AsyncStreamTask. The main > consideration factor on setting a big task.max.concurrency value is the > memory required. > > And assuming your join function works like micro matches (i.e. buffer all > input messages from all streams for a while, apply the join function, and > discard all buffered messages), you can make it work w/o a local KV store. > If any of your buffered message is not for the current join, but is needed > for future join, you may have to use a local KV store to ensure correctness > and no-dataloss across container restarts. > > -Yi > > On Thu, Oct 12, 2017 at 11:40 AM, Yi Pan <nickpa...@gmail.com> wrote: > > > Hi, Jef, > > > > What I suggest is exactly in-process, in-memory KV store. Samza has two > > types of such built-in KVstores: in-memory and RocksDB. Both can be > backed > > by a changelog topic in Kafka as the failure recovery mechanism (i.e. if > a > > container fails, it can reseed the whole store from the changelog topic > in > > Kafka). Since they are built-in KV stores in Samza, there is no > additional > > external systems introduced in the solution. Here is the doc link for > state > > management in Samza: http://samza.apache.org/learn/documentation/0.13/ > > container/state-management.html. Please read the section of "Local > State". > > > > The Samza high-level API is not based on KStream API. It is supporting > > bi-way join now, but as long as the join key are in the same partition > the > > result can be in memory and does not need to go through another > persistent > > topics. > > > > Best! > > > > -Yi > > > > On Thu, Oct 12, 2017 at 10:00 AM, Jef G <j...@dataminr.com> wrote: > > > >> Yi, thanks for your detailed reply! > >> > >> I believe what you are suggesting regarding a KV store is to set up a > >> remote durable system to maintain the join state. That way if a node > dies > >> and Samza restarts the task on another node, the join state is still > >> available. Is that correct? > >> > >> This approach is certainly an option. However, we were hoping to use an > >> in-process in-memory KV store, as a remote store would introduce a lot > of > >> latency for us. In some cases we would have to make more than 100,000 > >> round > >> trips per second to the KV store for a single stream, and we would want > to > >> be able to scale beyond that. It also introduces some complexity and > >> another point of failure. > >> > >> Regarding using AsyncStreamTask with a very large (100,000) > >> task.max.concurrency, is that a bad idea? > >> > >> The high-level API is based on the KStream API, right? Our jobs will > >> sometimes need to join as many as 20 input streams. I believe currently > >> Samza (and KStream) only supports a binary join and if that is the case, > >> we > >> would need 19 binary joins. The KStream doc suggests that all > intermediate > >> results are persisted, so many chained joins might be very inefficient. > If > >> so we would prefer to use the "classic" API. > >> > >> -Jef > >> > >> On Wed, Oct 11, 2017 at 7:37 PM, Yi Pan <nickpa...@gmail.com> wrote: > >> > >> > Hi, Jef, > >> > > >> > I would recommend that you use a KV store to buffer the messages for > >> join. > >> > The logic would be more predictable and state is also durable. In > >> > StreamTask.process(), you can do some pseudo code like below: > >> > {code} > >> > public void process(IncomingMessageEnvelope msg, MessageCollector > >> > collector, TaskCoordinator coordinator) { > >> > if (msg is from streamA) { > >> > storeA.put(msg.key, msg); > >> > } else { > >> > storeB.put(msg.key, msg); > >> > } > >> > if (joinCondition is triggered) { > >> > doJoin(storeA, storeB); > >> > } > >> > } > >> > {code} > >> > > >> > Make sure that you configure storeA and storeB w/ changelog s.t. they > >> can > >> > be recovered. Then, you don't need to worry about the data loss, since > >> > before the auto-checkpoint, your buffered messages are flushed to disk > >> and > >> > changelog via storeA and storeB. If you do not want to delete each and > >> > every buffered message after join, you can set TTL for each store if > you > >> > are using RocksDB store. > >> > > >> > We are also actively working on build-in join operator in Samza > >> high-level > >> > APIs. The new high-level APIs are already released in Samza 0.13.1 > with > >> the > >> > feature preview here: > >> > http://samza.apache.org/startup/preview/#high-level-api. Feel free to > >> take > >> > a look and try it. We love to hear about feedbacks now. The current > >> version > >> > does not support durable state in join yet. We are actively working on > >> > durable state support in he next release. Note that the high-level API > >> is > >> > still in early evolution and might change in the next two releases. > >> > > >> > Best! > >> > > >> > -Yi > >> > > >> > On Wed, Oct 11, 2017 at 1:56 PM, Jef G <j...@dataminr.com> wrote: > >> > > >> > > Hello. My team is looking into Samza for doing real-time processing. > >> We > >> > > would like to run a directed graph of jobs, where the records in > each > >> > job's > >> > > input streams are joined on a common key. We have functionality to > >> > perform > >> > > the join by buffering records from the input streams until certain > >> > > conditions are met and then passing them on. > >> > > > >> > > We are wondering about the best way to integrate this functionality > >> into > >> > a > >> > > Samza job. After looking over the API we see two possibilities: > >> > > > >> > > 1. Use a StreamTask that adds records to a buffer. This is the > method > >> > that > >> > > the "ad event" example uses. But we am concerned that the framework > >> > commits > >> > > a StreamTask's offset after process() completes, so if the job > fails, > >> > > records in the buffer are permanently lost. > >> > > > >> > > 2. Use an AsyncTask that adds records to a buffer. Also add > >> TaskCallbacks > >> > > to the buffer. When records are eventually joined and processed, > >> commit > >> > > their callbacks. This method seems promising but it requires setting > >> > > task.max.concurrency very high - possibly in the tens of thousands > in > >> our > >> > > case. Are we likely to run into any issues doing that? > >> > > > >> > > Are there any other options that we overlooked? What is the best > >> > approach? > >> > > > >> > > -Jef G > >> > > > >> > > >> > >> > >> > >> -- > >> Jef G > >> Senior Data Scientist | Dataminr | dataminr.com > >> 99 Madison Ave, 3rd Floor | New York, NY 10016 > >> j...@dataminr.com > >> > > > > > -- Jef G Senior Data Scientist | Dataminr | dataminr.com 99 Madison Ave, 3rd Floor | New York, NY 10016 j...@dataminr.com