Hey Gian, Thanks for the feedback and taking the time to read through the proposal.
Yeah your suggestion is much less complex and I think makes more sense! Cheers, Dylan On 8 May 2018 at 22:06, Gian Merlino <g...@apache.org> wrote: > Hi Dylan, > > My feeling is that it is going to be challenging to add a layer of > indirection to the Kafka indexing stuff while maintaining its > exactly-onceness. The exactly-onceness is based around tracking the > specific Kafka offsets that are read by each task, and is tightly linked to > which partitions are assigned to which task. I think what you describe > would be doable, but it would add a lot of complexity to a code base that > is already pretty complex. If I were in your position I'd try > repartitioning the original Kafka topic into a second Kafka topic using a > good key. It doubles the traffic at Kafka but at least it's simple! It > follows a general rule of making Kafka do as much of the work as possible. > > What do you think? > > On Thu, May 3, 2018 at 1:53 PM, Dylan Wylie <dylanwy...@gmail.com> wrote: > > > Hey Gian, > > > > Thanks for your response! > > > > Automatic compaction looks great and it'll definitely help out with > general > > efficiency. I'm also excited for the second PR, we've some use cases > it'll > > be helpful for. > > > > Even with those I still think there may be value in something along of > the > > lines of my suggestion. > > > > To present a real life example, we have a Kafka topic which emits ~200k > > messages/second, each of these messages has around 300 fields (in a > nested > > avro structure) and its partitioned on some key which is not ingested > into > > Druid. We produce a datasource from that message with around 40-50 > fields. > > > > Using the Kafka Indexing Service, we measured that a single indexing task > > extracting the fields for that datasource can consume 7k messages/second. > > Meaning in order to keep up we have to run around 30 indexing tasks. This > > results in around 30 segments at around 300mb each for each hour of data > > compared to 6 segments at around 650mb each when batch ingested, so the > > cluster's holding up to 3x as much data as it might do if the data was > > ingested into a smaller number of segments. > > (I realise that Tranquility & Batch Ingestion partitions data by a hash > of > > all the fields, which the KIS can't so will always have less optimal > > segments unless the Kafka topic is partitioned using a good key) > > > > Profiling the indexing peon shows that the largest chunk of time is being > > spent deserialising the large avro message. So we think that if we could > > split the current ingestion process into the two job-types as described > we > > could scale one up to handle consuming and parsing the message and > another > > could manage appending the rows to a (smaller) set of segments. > > > > From some initial playing around we've noticed that > > https://github.com/druid-io/druid/pull/5261 introduced a stand-alone > > realtime task using the newer appenderator API that could potentially be > > built on top of. > > > > It might be simpler for us to introduce a stream processor which parses > and > > extracts the parts of the messages that are needed and emit only those > > fields to be used in a topic partitioned on something that aids roll-up. > > However separating parsing data from indexing it feels like it might be > > more generally useful and avoid the extra work in maintaining a stream > > processor. (And it seems like a fun way to get hacking with the Druid > > codebase!). > > > > Sorry for the long email, thoughts or comments appreciated! > > > > Best regards, > > Dylan > > > > > > > > On 3 May 2018 at 02:32, Gian Merlino <g...@apache.org> wrote: > > > > > Hey Dylan, > > > > > > Great to hear that your experience has generally been positive! > > > > > > What do you think about using compaction for this? (The feature added > in > > > https://github.com/druid-io/druid/pull/5102.) The idea with compaction > > was > > > that it would enable a background process that goes through freshly > > > inserted segments and re-partitions them optimally. > > > > > > For creating multiple datasources out of one topic, there is a PR > wending > > > its way through review right now that is relevant: https://github.com/ > > > druid-io/druid/pull/5556. > > > > > > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie <dylanwy...@gmail.com> > > wrote: > > > > > > > Hey there, > > > > > > > > With the recent improvements to the Kafka Indexing Service we've been > > > > migrating over from Tranquility and have had a very positive > > experience. > > > > > > > > However one of the downsides to using the KIS, is that the number of > > > > segments generated for each period can't be smaller than the number > of > > > > tasks required to consume the queue. So if you have a use case > > involving > > > > ingesting from a topic with a high rate of large messages but your > spec > > > > only extracts a small proportion of fields you may be forced to run a > > > large > > > > number of tasks that generate very small segments. > > > > > > > > This email is to check in for peoples thoughts on separating > consuming > > > and > > > > parsing messages from indexing and segment management, in a similar > > > fashion > > > > to how Tranquility operates. > > > > > > > > Potentially - we could have the supervisor spawn two types of task > that > > > can > > > > be configured independently, a consumer and an appender. The consumer > > > would > > > > parse the message based on the spec and then pass the results to the > > > > appropriate appender task which builds the segment. Another advantage > > to > > > > this approach is that it would allow creating multiple datasources > > from a > > > > single consumer group rather than ingesting the same topic multiple > > > times. > > > > > > > > I'm quite new to the codebase so all thoughts and comments are > welcome! > > > > > > > > Best regards, > > > > Dylan > > > > > > > > > >