This is very insightful, thank you. I'm going to share it around with a few other key folks to see how viable a query-able indexing process would be using idiomatic Beam stuff.
Cheers, Charles Allen On Thu, Feb 8, 2018 at 6:13 PM Lukasz Cwik <lc...@google.com> wrote: > Actually missed one step. For each bundle: > 1) If not owner of the key: > a) Read in all prior state stuff into memory and cache it. > b) De-register any prior druid sink that owned that key. > c) Register it self as the owner of that key > 3) Garbage collect anything in state/memory that is now indexed in druid > 4) Add the new data part of the bundle to state > 5) Output the new data part of the bundle to druid > > The idea is that if writing to druid fails, then nothing gets added to > state. But if we write to druid then either we added it to state or the > runner failed in some way and will retry the bundle again so we will write > it to druid again and it will eventually show up in state. > > The fixed key space is important since if a machine goes down, you want > some other machine to process a bundle containing that key "soon" it > becomes owner of that space soon otherwise you'll have an extended outage > for queries. You also want a large enough fixed key space since that > controls the parallelism during writing so no single machine becomes the > bottleneck. > > The key space doesn't have to be fixed since you can technically figure > out the flow rate of messages and compute an "optimal" key space size > dynamically in the pipeline. Scaling the key space up is really easy, the > complexity comes from scaling the key space down but I wouldn't think about > this until it is needed. > > > On Thu, Feb 8, 2018 at 5:46 PM, Lukasz Cwik <lc...@google.com> wrote: > >> Based on this description, it seems like druid sinks have to be fault >> tolerant. I was hoping that they didn't need to be and as soon as they >> wrote some information to druid then you would be able to crash and druid >> only used the sinks as an optimization for unindexed data. >> >> In your case it seems like you'll want to have a StatefulDoFn which >> writes information out to to state and druid at the same time. Future calls >> to the StatefulDoFn would be responsible for garbage collecting information >> from state based upon whether that data has been indexed. >> You could map all the data you want to write on to a fixed key space like >> [0, 100). Each druid sink would cache their writes in memory and into state >> (to handle a machine going down or being migrated) and write to druid. >> Whenever it gets a piece of work in the range [0, 100) it would need to: >> 1) If not owner of the key: >> a) De-register any prior druid sink that owned that key >> b) Register it self as the owner of that key >> 3) Garbage collect anything in state that is now indexed in druid >> 4) Add the new data to state >> 5) Output the data to druid >> >> Not being the owner of a key should be rare. Also, it would be worthwhile >> to read https://beam.apache.org/blog/2017/02/13/stateful-processing.html >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_blog_2017_02_13_stateful-2Dprocessing.html&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=iUimR_GO5E3aBxI47fao68OWjHRPMMnmS_uwZvOh23g&s=kDEw8OBUaQbVChZBm-zbjMBm3r67JbkMGtsqilvK5fk&e=> >> >> On Wed, Feb 7, 2018 at 8:44 PM, Charles Allen <charles.al...@snap.com> >> wrote: >> >>> Thanks for the insightful response. >>> >>> > Can druid handle sinks being created and removed dynamically and at >>> what rate? >>> >>> From the external data access side (the things that need to access Beam >>> runners), announcement is handled by zookeeper, so changes that happen on a >>> scale larger than typical zookeeper timeout level (maybe order tens of >>> seconds) are expected. There are probably ways to do reverse proxies if >>> things go up and down faster. But any launch/terminate speeds that are >>> comparable to network hiccups or unlucky JVM GC timings are going to be >>> problematic regardless. >>> >>> > What are the expectations around this partial data that has been >>> collected by a sink? >>> >>> That is a core area I'm trying to figure out how to handle. Druid >>> indexing right now works by making "small" batches of events (usually high >>> tens of thousands to low hundreds of thousands of events) and doing >>> incremental indexing to get the data into a read-optimized form, then doing >>> a final big merge of a bunch of these incremental indicies. The key thing >>> is this: once data is considered queryable, it needs a chain of custody >>> until loaded up on "historical" nodes (or some other new type of node) in >>> Druid. What this means is that a runner that has query-able state needs to >>> stay up until that state is acknowledged available by something else. If a >>> runner fails and one (or more) other runner starts to re-accumulate that >>> state, it is fine. What is forbidden is for the runner to have queryable >>> state, then just exit under "normal" conditions without waiting for the >>> state to be accessible somewhere else. This can cause backpressure in the >>> runners whenever handoff is delayed. >>> >>> I *THINK* such a handoff should be able to be handled in a reasonable >>> commit workflow. Eventually something needs to merge a bunch of these >>> incremental things together into larger chunks, and such a >>> data-optimization pass is not clear if it would work on the same framework, >>> or have to be a different one. >>> >>> Thoughts? >>> Charles Allen >>> >>> >>> On Wed, Feb 7, 2018 at 9:37 AM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> There are no existing sinks which are accessible from outside of Apache >>>> Beam. >>>> >>>> Apache Beam does work by processing "bundles" (a unit of work). Each of >>>> these are executed and results are committed back into a Runner (such as >>>> Flink/Spark/...). The lifetime of an individual instance of a sink is bound >>>> to the bundle being processed and can be as small as a few milliseconds. >>>> There are some sinks which cache stuff within the JVM (like connections) >>>> but the caching is best effort and if the machine was to go down (crash, >>>> autoscaling reduces number of workers, ...) and if it ever comes back the >>>> cached state was either unimportant or easily recoverable. >>>> >>>> Can druid handle sinks being created and removed dynamically and at >>>> what rate? >>>> What are the expectations around this partial data that has been >>>> collected by a sink? >>>> >>>> >>>> On Wed, Feb 7, 2018 at 7:31 AM, Charles Allen <charles.al...@snap.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I work closely with druid.io >>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__druid.io_&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=RfzQSR6AU_Kv9ZMfTOpxwcU8ZmBmA0C2qLVOf8mjBFo&s=Mnf63IL1qJt71TQ22vHbCBAMjSHgIPQg71o7bUkSRsw&e=> >>>>> and one of the main pain points for any Druid deployment is handling the >>>>> real-time streaming component. What I, personally, would *like* to have is >>>>> the streaming orchestration and streaming state handled by a runner which >>>>> specializes in such things, and allow Druid to focus on the lightning fast >>>>> ad-hoc query side. >>>>> >>>>> A natural contender for such a setup would be a Beam based solution >>>>> with a Druid segment Sink. The trouble we are having pursuing such a setup >>>>> is two fold: >>>>> >>>>> 1. Many Druid setups run lambda-style pipes to backfill late or >>>>> wrong data, so the sink needs to call out to the Druid cluster for data >>>>> version locking and orchestration. Access must be available from the >>>>> sink >>>>> to the Druid Overlord and/or Coordinator, or potentially some other >>>>> task-specific jvm in the cluster. >>>>> 2. Druid segments are queryable while they are being built. This >>>>> means that the Druid cluster (Broker specifically) must be able to >>>>> discover and issue *RPC queries* against the Sink on the partial >>>>> data it has accumulated. This puts an extra dynamic load on the Sink >>>>> jvm, >>>>> but in my own experience that extra load is small compared to the load >>>>> of >>>>> indexing the incoming data. >>>>> >>>>> The key desire to use a stream-native framework is that the Druid >>>>> MiddleManager/Peon setup (the streaming task infrastructure for Druid) has >>>>> problems recovering from failure, recovering from upgrade easily, handling >>>>> late data well, and dynamic horizontal scaling. The Kafka Indexing >>>>> Extension >>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__druid.io_docs_latest_development_extensions-2Dcore_kafka-2Dingestion.html&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=RfzQSR6AU_Kv9ZMfTOpxwcU8ZmBmA0C2qLVOf8mjBFo&s=AlpnfkhLS3u0I-XIe6pzvtds6_n1MkIMfVl0WrI8v6M&e=> >>>>> handles some of these things, but still doesn't quite operate like a >>>>> stream-native solution, and is only available for Kafka. >>>>> >>>>> What is not clear to me is if such a feature set would be in the >>>>> intended scope of Beam as opposed to having some other streaming data >>>>> catcher service that Beam simply feeds into (aka, a Tranquility >>>>> <https://github.com/druid-io/tranquility> sink). Having a >>>>> customizable, accessible and stateful sink with good commit semantics >>>>> seems >>>>> like it should be something Beam could support natively, but the >>>>> "accessible" part is where I need some guidance on if that is in scope for >>>>> Beam. >>>>> >>>>> Can the dev list provide some insight to if there are any other Sinks >>>>> that strive to be accessible at run-time from non-Beam components? >>>>> >>>>> Also, is such a use case something that is desired to be a part of >>>>> what Beam does, or would it be best outside of Beam? >>>>> >>>>> Thank you, >>>>> Charles Allen >>>>> >>>> >>>> >> >