Hi, Jef, What I suggest is exactly in-process, in-memory KV store. Samza has two types of such built-in KVstores: in-memory and RocksDB. Both can be backed by a changelog topic in Kafka as the failure recovery mechanism (i.e. if a container fails, it can reseed the whole store from the changelog topic in Kafka). Since they are built-in KV stores in Samza, there is no additional external systems introduced in the solution. Here is the doc link for state management in Samza: http://samza.apache.org/learn/documentation/0.13/container/state-management.html. Please read the section of "Local State".
The Samza high-level API is not based on KStream API. It is supporting bi-way join now, but as long as the join key are in the same partition the result can be in memory and does not need to go through another persistent topics. Best! -Yi On Thu, Oct 12, 2017 at 10:00 AM, Jef G <j...@dataminr.com> wrote: > Yi, thanks for your detailed reply! > > I believe what you are suggesting regarding a KV store is to set up a > remote durable system to maintain the join state. That way if a node dies > and Samza restarts the task on another node, the join state is still > available. Is that correct? > > This approach is certainly an option. However, we were hoping to use an > in-process in-memory KV store, as a remote store would introduce a lot of > latency for us. In some cases we would have to make more than 100,000 round > trips per second to the KV store for a single stream, and we would want to > be able to scale beyond that. It also introduces some complexity and > another point of failure. > > Regarding using AsyncStreamTask with a very large (100,000) > task.max.concurrency, is that a bad idea? > > The high-level API is based on the KStream API, right? Our jobs will > sometimes need to join as many as 20 input streams. I believe currently > Samza (and KStream) only supports a binary join and if that is the case, we > would need 19 binary joins. The KStream doc suggests that all intermediate > results are persisted, so many chained joins might be very inefficient. If > so we would prefer to use the "classic" API. > > -Jef > > On Wed, Oct 11, 2017 at 7:37 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > Hi, Jef, > > > > I would recommend that you use a KV store to buffer the messages for > join. > > The logic would be more predictable and state is also durable. In > > StreamTask.process(), you can do some pseudo code like below: > > {code} > > public void process(IncomingMessageEnvelope msg, MessageCollector > > collector, TaskCoordinator coordinator) { > > if (msg is from streamA) { > > storeA.put(msg.key, msg); > > } else { > > storeB.put(msg.key, msg); > > } > > if (joinCondition is triggered) { > > doJoin(storeA, storeB); > > } > > } > > {code} > > > > Make sure that you configure storeA and storeB w/ changelog s.t. they can > > be recovered. Then, you don't need to worry about the data loss, since > > before the auto-checkpoint, your buffered messages are flushed to disk > and > > changelog via storeA and storeB. If you do not want to delete each and > > every buffered message after join, you can set TTL for each store if you > > are using RocksDB store. > > > > We are also actively working on build-in join operator in Samza > high-level > > APIs. The new high-level APIs are already released in Samza 0.13.1 with > the > > feature preview here: > > http://samza.apache.org/startup/preview/#high-level-api. Feel free to > take > > a look and try it. We love to hear about feedbacks now. The current > version > > does not support durable state in join yet. We are actively working on > > durable state support in he next release. Note that the high-level API is > > still in early evolution and might change in the next two releases. > > > > Best! > > > > -Yi > > > > On Wed, Oct 11, 2017 at 1:56 PM, Jef G <j...@dataminr.com> wrote: > > > > > Hello. My team is looking into Samza for doing real-time processing. We > > > would like to run a directed graph of jobs, where the records in each > > job's > > > input streams are joined on a common key. We have functionality to > > perform > > > the join by buffering records from the input streams until certain > > > conditions are met and then passing them on. > > > > > > We are wondering about the best way to integrate this functionality > into > > a > > > Samza job. After looking over the API we see two possibilities: > > > > > > 1. Use a StreamTask that adds records to a buffer. This is the method > > that > > > the "ad event" example uses. But we am concerned that the framework > > commits > > > a StreamTask's offset after process() completes, so if the job fails, > > > records in the buffer are permanently lost. > > > > > > 2. Use an AsyncTask that adds records to a buffer. Also add > TaskCallbacks > > > to the buffer. When records are eventually joined and processed, commit > > > their callbacks. This method seems promising but it requires setting > > > task.max.concurrency very high - possibly in the tens of thousands in > our > > > case. Are we likely to run into any issues doing that? > > > > > > Are there any other options that we overlooked? What is the best > > approach? > > > > > > -Jef G > > > > > > > > > -- > Jef G > Senior Data Scientist | Dataminr | dataminr.com > 99 Madison Ave, 3rd Floor | New York, NY 10016 > j...@dataminr.com >