This is very insightful, thank you. I'm going to share it around with a few
other key folks to see how viable a query-able indexing process would be
using idiomatic Beam stuff.

Cheers,
Charles Allen

On Thu, Feb 8, 2018 at 6:13 PM Lukasz Cwik <lc...@google.com> wrote:

> Actually missed one step. For each bundle:
> 1) If not owner of the key:
>     a) Read in all prior state stuff into memory and cache it.
>     b) De-register any prior druid sink that owned that key.
>     c) Register it self as the owner of that key
> 3) Garbage collect anything in state/memory that is now indexed in druid
> 4) Add the new data part of the bundle to state
> 5) Output the new data part of the bundle to druid
>
> The idea is that if writing to druid fails, then nothing gets added to
> state. But if we write to druid then either we added it to state or the
> runner failed in some way and will retry the bundle again so we will write
> it to druid again and it will eventually show up in state.
>
> The fixed key space is important since if a machine goes down, you want
> some other machine to process a bundle containing that key "soon" it
> becomes owner of that space soon otherwise you'll have an extended outage
> for queries. You also want a large enough fixed key space since that
> controls the parallelism during writing so no single machine becomes the
> bottleneck.
>
> The key space doesn't have to be fixed since you can technically figure
> out the flow rate of messages and compute an "optimal" key space size
> dynamically in the pipeline. Scaling the key space up is really easy, the
> complexity comes from scaling the key space down but I wouldn't think about
> this until it is needed.
>
>
> On Thu, Feb 8, 2018 at 5:46 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Based on this description, it seems like druid sinks have to be fault
>> tolerant. I was hoping that they didn't need to be and as soon as they
>> wrote some information to druid then you would be able to crash and druid
>> only used the sinks as an optimization for unindexed data.
>>
>> In your case it seems like you'll want to have a StatefulDoFn which
>> writes information out to to state and druid at the same time. Future calls
>> to the StatefulDoFn would be responsible for garbage collecting information
>> from state based upon whether that data has been indexed.
>> You could map all the data you want to write on to a fixed key space like
>> [0, 100). Each druid sink would cache their writes in memory and into state
>> (to handle a machine going down or being migrated) and write to druid.
>> Whenever it gets a piece of work in the range [0, 100) it would need to:
>> 1) If not owner of the key:
>>     a) De-register any prior druid sink that owned that key
>>     b) Register it self as the owner of that key
>> 3) Garbage collect anything in state that is now indexed in druid
>> 4) Add the new data to state
>> 5) Output the data to druid
>>
>> Not being the owner of a key should be rare. Also, it would be worthwhile
>> to read https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_blog_2017_02_13_stateful-2Dprocessing.html&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=iUimR_GO5E3aBxI47fao68OWjHRPMMnmS_uwZvOh23g&s=kDEw8OBUaQbVChZBm-zbjMBm3r67JbkMGtsqilvK5fk&e=>
>>
>> On Wed, Feb 7, 2018 at 8:44 PM, Charles Allen <charles.al...@snap.com>
>> wrote:
>>
>>> Thanks for the insightful response.
>>>
>>> > Can druid handle sinks being created and removed dynamically and at
>>> what rate?
>>>
>>> From the external data access side (the things that need to access Beam
>>> runners), announcement is handled by zookeeper, so changes that happen on a
>>> scale larger than typical zookeeper timeout level (maybe order tens of
>>> seconds) are expected. There are probably ways to do reverse proxies if
>>> things go up and down faster. But any launch/terminate speeds that are
>>> comparable to network hiccups or unlucky JVM GC timings are going to be
>>> problematic regardless.
>>>
>>> > What are the expectations around this partial data that has been
>>> collected by a sink?
>>>
>>> That is a core area I'm trying to figure out how to handle. Druid
>>> indexing right now works by making "small" batches of events (usually high
>>> tens of thousands to low hundreds of thousands of events) and doing
>>> incremental indexing to get the data into a read-optimized form, then doing
>>> a final big merge of a bunch of these incremental indicies. The key thing
>>> is this: once data is considered queryable, it needs a chain of custody
>>> until loaded up on "historical" nodes (or some other new type of node) in
>>> Druid. What this means is that a runner that has query-able state needs to
>>> stay up until that state is acknowledged available by something else. If a
>>> runner fails and one (or more) other runner starts to re-accumulate that
>>> state, it is fine. What is forbidden is for the runner to have queryable
>>> state, then just exit under "normal" conditions without waiting for the
>>> state to be accessible somewhere else. This can cause backpressure in the
>>> runners whenever handoff is delayed.
>>>
>>> I *THINK* such a handoff should be able to be handled in a reasonable
>>> commit workflow. Eventually something needs to merge a bunch of these
>>> incremental things together into larger chunks, and such a
>>> data-optimization pass is not clear if it would work on the same framework,
>>> or have to be a different one.
>>>
>>> Thoughts?
>>> Charles Allen
>>>
>>>
>>> On Wed, Feb 7, 2018 at 9:37 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> There are no existing sinks which are accessible from outside of Apache
>>>> Beam.
>>>>
>>>> Apache Beam does work by processing "bundles" (a unit of work). Each of
>>>> these are executed and results are committed back into a Runner (such as
>>>> Flink/Spark/...). The lifetime of an individual instance of a sink is bound
>>>> to the bundle being processed and can be as small as a few milliseconds.
>>>> There are some sinks which cache stuff within the JVM (like connections)
>>>> but the caching is best effort and if the machine was to go down (crash,
>>>> autoscaling reduces number of workers, ...) and if it ever comes back the
>>>> cached state was either unimportant or easily recoverable.
>>>>
>>>> Can druid handle sinks being created and removed dynamically and at
>>>> what rate?
>>>> What are the expectations around this partial data that has been
>>>> collected by a sink?
>>>>
>>>>
>>>> On Wed, Feb 7, 2018 at 7:31 AM, Charles Allen <charles.al...@snap.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I work closely with druid.io
>>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__druid.io_&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=RfzQSR6AU_Kv9ZMfTOpxwcU8ZmBmA0C2qLVOf8mjBFo&s=Mnf63IL1qJt71TQ22vHbCBAMjSHgIPQg71o7bUkSRsw&e=>
>>>>> and one of the main pain points for any Druid deployment is handling the
>>>>> real-time streaming component. What I, personally, would *like* to have is
>>>>> the streaming orchestration and streaming state handled by a runner which
>>>>> specializes in such things, and allow Druid to focus on the lightning fast
>>>>> ad-hoc query side.
>>>>>
>>>>> A natural contender for such a setup would be a Beam based solution
>>>>> with a Druid segment Sink. The trouble we are having pursuing such a setup
>>>>> is two fold:
>>>>>
>>>>>    1.  Many Druid setups run lambda-style pipes to backfill late or
>>>>>    wrong data, so the sink needs to call out to the Druid cluster for data
>>>>>    version locking and orchestration. Access must be available from the 
>>>>> sink
>>>>>    to the Druid Overlord and/or Coordinator, or potentially some other
>>>>>    task-specific jvm in the cluster.
>>>>>    2. Druid segments are queryable while they are being built. This
>>>>>    means that the Druid cluster (Broker specifically) must be able to
>>>>>    discover and issue *RPC queries* against the Sink on the partial
>>>>>    data it has accumulated. This puts an extra dynamic load on the Sink 
>>>>> jvm,
>>>>>    but in my own experience that extra load is small compared to the load 
>>>>> of
>>>>>    indexing the incoming data.
>>>>>
>>>>> The key desire to use a stream-native framework is that the Druid
>>>>> MiddleManager/Peon setup (the streaming task infrastructure for Druid) has
>>>>> problems recovering from failure, recovering from upgrade easily, handling
>>>>> late data well, and dynamic horizontal scaling. The Kafka Indexing
>>>>> Extension
>>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__druid.io_docs_latest_development_extensions-2Dcore_kafka-2Dingestion.html&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=RfzQSR6AU_Kv9ZMfTOpxwcU8ZmBmA0C2qLVOf8mjBFo&s=AlpnfkhLS3u0I-XIe6pzvtds6_n1MkIMfVl0WrI8v6M&e=>
>>>>> handles some of these things, but still doesn't quite operate like a
>>>>> stream-native solution, and is only available for Kafka.
>>>>>
>>>>> What is not clear to me is if such a feature set would be in the
>>>>> intended scope of Beam as opposed to having some other streaming data
>>>>> catcher service that Beam simply feeds into (aka, a Tranquility
>>>>> <https://github.com/druid-io/tranquility> sink). Having a
>>>>> customizable, accessible and stateful sink with good commit semantics 
>>>>> seems
>>>>> like it should be something Beam could support natively, but the
>>>>> "accessible" part is where I need some guidance on if that is in scope for
>>>>> Beam.
>>>>>
>>>>> Can the dev list provide some insight to if there are any other Sinks
>>>>> that strive to be accessible at run-time from non-Beam components?
>>>>>
>>>>> Also, is such a use case something that is desired to be a part of
>>>>> what Beam does, or would it be best outside of Beam?
>>>>>
>>>>> Thank you,
>>>>> Charles Allen
>>>>>
>>>>
>>>>
>>
>

Reply via email to