1/2: Sounds good, let's remove the joins within KGlobalTable for now. 3. I see, makes sense.
Unfortunately since TopologyBuilder is a public class we cannot separate its internal usage only functions like build / buildWithGlobalTables / etc with other user functions like stream / table / etc. We need to consider refactoring this interface sooner than later. 4/6. OK. Guozhang On Tue, Dec 20, 2016 at 2:16 PM, Damian Guy <[email protected]> wrote: > Hi Guozhang, > > Thanks for your input. Answers below, but i'm thinking we should remove > joins from GlobalKTables for the time being and re-visit if necessary in > the future. > > 1. with a global table the joins are never really materialized (at least > how i see it), rather they are just views on the existing global tables. > I've deliberately taken this approach so we don't have to create yet > another State Store and changelog topic etc. These all consume resources > that i believe are unnecessary. So, i don't really see the point of having > a materialize method. Further, one of the major benefits of joining two > global tables is being able to query them via Interactive Queries. For this > you need the name, so i think it makes sense to provide it with the join. > > 2. This has been discussed already in this thread (with Michael), and > outerJoin is deliberately not part of the KIP. To be able to join both > ways, as you suggest, requires that both inputs are able to map to the same > key. This is not always going to be possible, i.e., relationships can be > one way, so for that reason i felt it was best to not go down that path as > we'd not be able to resolve it at the time that > globalTable.join(otherGlobalTable,...) was called, and this would result > in > possible confusion. Also, to support this we'd need to physically > materialize a StateStore that represents the join (which i think is a waste > of resources), or, we'd need to provide another interface where we can map > from the key of the resulting global table to the keys of both of the > joined tables. > > 3. The intention is that the GlobalKTables are in a single topology that is > owned and updated by a single thread. So yes it is necessary that they can > be created separately. > > 4. Bootstrapping and maintaining of the state of GlobalKTables are done on > a single thread. This thread will run simultaneously with the current > StreamThreads. It doesn't make sense to move the bootstrapping of the > StandbyTasks to this thread as they are logically part of a StreamThread, > they are 'assigned' to the StreamThread. With GlobalKTables there is no > assignment as such, the thread just maintains all of them. > > 5. Yes i'll update the KIP - the state directory will be under the same > path as StreamsConfig.STATE_DIR_CONFIG, but it will be a specific > directory, i.e, global_state, rather then being a task directory. > > 6. The whole point of GlobalKTables is to have a copy of ALL of the data on > each node. I don't think it makes sense to be able to reset the starting > position. > > Thanks, > Damian > > On Tue, 20 Dec 2016 at 20:00 Guozhang Wang <[email protected]> wrote: > > > One more thing to add: > > > > 6. For KGlobalTable, it is always bootstrapped from the beginning while > for > > other KTables, we are enabling users to override their resetting position > > as in > > > > https://github.com/apache/kafka/pull/2007 > > > > Should we consider doing the same for KGlobalTable as well? > > > > > > Guozhang > > > > > > On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <[email protected]> > > wrote: > > > > > Thanks for the very well written proposal, and sorry for the very-late > > > review. I have a few comments here: > > > > > > 1. We are introducing a "queryableViewName" in the GlobalTable join > > > results, while I'm wondering if we should just add a more general > > function > > > like "materialize" to KTable and KGlobalTable with the name to be used > in > > > queries? > > > > > > 2. For KGlobalTable's own "join" and "leftJoin": since we are only > > passing > > > the KeyValueMapper<K, V, K1> keyMapper it seems that for either case > only > > > the left hand side will logically "trigger" the join, which is > different > > to > > > KTable's join semantics. I'm wondering if it would be more consistent > to > > > have them as: > > > > > > > > > <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other, > > > final KeyValueMapper<K, V, K1> > > > leftkeyMapper, > > > final KeyValueMapper<K1, V1, K> > > > rightkeyMapper, > > > final ValueJoiner<V, V1, R> > > joiner > > > final String > queryableViewName); > > > > > > <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1> > > other, > > > final KeyValueMapper<K, V, K1> > > > leftkeyMapper, > > > final KeyValueMapper<K1, V1, > K> > > > rightkeyMapper, > > > final ValueJoiner<V, V1, R> > > > joiner, > > > final String > queryableViewName); > > > > > > <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> > other, > > > final KeyValueMapper<K, V, K1> > > > keyMapper, > > > final ValueJoiner<V, V1, R> > > > joiner, > > > final String > queryableViewName); > > > > > > > > > I.e. add another directional key mapper to join and also to outerJoin. > > > > > > > > > 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to > > > have a separate function from "TopologyBuilder.build" itself? With > global > > > tables, is there any scenarios that we want to build the topology > without > > > the embedded global tables (i.e. still calling "build")? > > > > > > 4. As for implementation, you mentioned that global table bootstraping > > > will be done in another dedicated thread. Could we also consider moving > > the > > > logic of bootstrapping the standby-replica state stores into this > thread > > as > > > well, which can then leverage on the existing "restoreConsumer" that > does > > > not participate in the consumer group protocol? By doing this I think > we > > > can still avoid thread-synchronization while making the logic more > clear > > > (ideally the standby restoration do not really need to be in part of > the > > > stream thread's main loops). > > > > > > 5. Also for the global table's state directory, I'm assuming it will > not > > > be under the per-task directory as it is per instance. But could you > > > elaborate a bit in the wiki about its directory as well? Also could we > > > consider adding https://issues.apache.org/jira/browse/KAFKA-3522 along > > > with this feature since we may need to change the directory path / > > storage > > > schema formats for these different types of stores moving forward. > > > > > > > > > > > > Guozhang > > > > > > > > > On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <[email protected]> > wrote: > > > > > >> Thanks for the update Michael. > > >> > > >> I just wanted to add that there is one crucial piece of information > that > > >> i've failed to add (I apologise). > > >> > > >> To me, the join between 2 Global Tables just produces a view on top of > > the > > >> underlying tables (this is the same as it works for KTables today). So > > >> that > > >> means there is no Physical StateStore that backs the join result, it > is > > >> just a Virtual StateStore that knows how to resolve the join when it > is > > >> required. I've deliberately taken this path so that we don't end up > > having > > >> yet another copy of the data, stored on local disk, and sent to > another > > >> change-log topic. This also reduces the memory overhead from creating > > >> RocksDBStores and reduces load on the Thread based caches we have. So > it > > >> is > > >> a resource optimization. > > >> > > >> So while it is technically possible to support outer joins, we would > > need > > >> to physically materialize the StateStore (and create a changelog-topic > > for > > >> it), or, we'd need to provide another interface where the user could > map > > >> from the outerJoin key to both of the other table keys. This is > because > > >> the > > >> key of the outerJoin table could be either the key of the lhs table, > or > > >> the > > >> rhs tables, or something completely different. > > >> > > >> With this and what you have mentioned above in mind i think we should > > park > > >> outerJoin support for this KIP and re-visit if and when we need it in > > the > > >> future. > > >> > > >> I'll update the KIP with this. > > >> > > >> Thanks, > > >> Damian > > >> > > >> On Fri, 9 Dec 2016 at 09:53 Michael Noll <[email protected]> > wrote: > > >> > > >> > Damian and I briefly chatted offline (thanks, Damian!), and here's > the > > >> > summary of my thoughts and conclusion. > > >> > > > >> > TL;DR: Let's skip outer join support for global tables. > > >> > > > >> > In more detail: > > >> > > > >> > - We agreed that, technically, we can add OUTER JOIN support. > > However, > > >> > outer joins only work if certain preconditions are met. The > > >> preconditions > > >> > are IMHO simliar/the same as we have for the normal, partitioned > > KTables > > >> > (e.g. having matching keys and co-partitioned data for the tables), > > but > > >> in > > >> > the case of global tables the user would need to meet all these > > >> > preconditions in one big swing when specifying the params for the > > outer > > >> > join call. Even so, you'd only know at run-time whether the > > >> preconditions > > >> > were actually met properly. > > >> > > > >> > - Hence it's quite likely that users will be confused about these > > >> > preconditions and how to meet them, and -- from what we can tell -- > > use > > >> > cases / user demand for outer joins have been rare. > > >> > > > >> > - So, long story short, even though we could add outer join support > > we'd > > >> > suggest to skip it for global tables. If we subsequently learn that > > is > > >> a > > >> > lot of user interest in that functionality, we still have the option > > to > > >> add > > >> > it in the future. > > >> > > > >> > > > >> > Best, > > >> > Michael > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <[email protected]> > > >> wrote: > > >> > > > >> > > Hi Michael, > > >> > > > > >> > > I don't see how that helps? > > >> > > > > >> > > Lets say we have tables Person(id, device_id, name, ...), > Device(id, > > >> > > person_id, type, ...), and both are keyed with same type. And we > > have > > >> a > > >> > > stream, that for the sake of simplicity, has both person_id and > > >> > device_id ( > > >> > > i know this is a bit contrived!) > > >> > > so our join > > >> > > person = builder.globalTable(...); > > >> > > device = builder.globalTable(...); > > >> > > personDevice = builder.outerJoin(device, ...); > > >> > > > > >> > > someStream = builder.stream(..); > > >> > > // which id do i use to join with? person.id? device.id? > > >> > > someStream.leftJoin(personDevice, ...) > > >> > > > > >> > > // Interactive Query on the view generated by the join of person > and > > >> > device > > >> > > personDeviceStore = streams.store("personDevice",...); > > >> > > // person.id? device.id? > > >> > > personDeviceStore.get(someId); > > >> > > > > >> > > We get records > > >> > > person id=1, device_id=2 ,... > > >> > > device id=2, person_id=1, ... > > >> > > stream person_id = 1, device_id = 2 > > >> > > > > >> > > We could do the join between the GlobalTables both ways as each > side > > >> > could > > >> > > map to the other sides key, but when i'm accessing the resulting > > >> table, > > >> > > personDevice, what is the key? The person.id ? the device.id? it > > >> can't > > >> > be > > >> > > both of them. > > >> > > > > >> > > Thanks, > > >> > > Damian > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <[email protected]> > > >> wrote: > > >> > > > > >> > > > The key type returned by both KeyValueMappers (in the current > > trunk > > >> > > > version, that type is named `R`) would need to be the same for > > this > > >> to > > >> > > > work. > > >> > > > > > >> > > > > > >> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy < > [email protected]> > > >> > wrote: > > >> > > > > > >> > > > > Michael, > > >> > > > > > > >> > > > > We can only support outerJoin if both tables are keyed the > same > > >> way. > > >> > > Lets > > >> > > > > say for example you can map both ways, however, the key for > each > > >> > table > > >> > > is > > >> > > > > of a different type. So t1 is long and t2 is string - what is > > the > > >> key > > >> > > > type > > >> > > > > of the resulting GlobalKTable? So when you subsequently join > to > > >> this > > >> > > > table, > > >> > > > > and do a lookup on it, which key are you using? > > >> > > > > > > >> > > > > Thanks, > > >> > > > > Damian > > >> > > > > > > >> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll < > [email protected]> > > >> > wrote: > > >> > > > > > > >> > > > > > Damian, > > >> > > > > > > > >> > > > > > yes, that makes sense. > > >> > > > > > > > >> > > > > > But I am still wondering: In your example, there's no prior > > >> > > knowledge > > >> > > > > "can > > >> > > > > > I map from t1->t2" that Streams can leverage for joining t1 > > and > > >> t2 > > >> > > > other > > >> > > > > > than blindly relying on the user to provide an appropriate > > >> > > > KeyValueMapper > > >> > > > > > for K1/V1 of t1 -> K2/V2 of t2. In other words, if we allow > > the > > >> > user > > >> > > > to > > >> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know > at > > >> > > compile > > >> > > > > time > > >> > > > > > whether this mapping will actually work), then we can also > > allow > > >> > the > > >> > > > user > > >> > > > > > to provide a corresponding "reverse" mapper from t2->t1. > That > > >> is, > > >> > we > > >> > > > > could > > >> > > > > > say that an outer join between two global tables IS > supported, > > >> but > > >> > if > > >> > > > and > > >> > > > > > only if the user provides two KeyValueMappers, one for > t1->t2 > > >> and > > >> > one > > >> > > > for > > >> > > > > > t2->t1. > > >> > > > > > > > >> > > > > > The left join t1->t2 (which is supported in the KIP), in > > >> general, > > >> > > works > > >> > > > > > only because of the existence of the user-provided > > >> KeyValueMapper > > >> > > from > > >> > > > > > t1->t2. The outer join, as you point out, cannot satisfied > as > > >> > easily > > >> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 -- > > >> > > otherwise > > >> > > > > the > > >> > > > > > outer join won't work. > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy < > > >> [email protected]> > > >> > > > wrote: > > >> > > > > > > > >> > > > > > > Hi Michael, > > >> > > > > > > > > >> > > > > > > Sure. Say we have 2 input topics t1 & t2 below: > > >> > > > > > > t1{ > > >> > > > > > > int key; > > >> > > > > > > string t2_id; > > >> > > > > > > ... > > >> > > > > > > } > > >> > > > > > > > > >> > > > > > > t2 { > > >> > > > > > > string key; > > >> > > > > > > .. > > >> > > > > > > } > > >> > > > > > > If we create global tables out of these we'd get: > > >> > > > > > > GlobalKTable<Integer, ?> t1; > > >> > > > > > > GlobalKTable<String, ?> t2; > > >> > > > > > > > > >> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2 > > as > > >> in > > >> > > > order > > >> > > > > to > > >> > > > > > > perform the join we need to use a KeyValueMapper to > extract > > >> the > > >> > t2 > > >> > > > key > > >> > > > > > from > > >> > > > > > > the t1 value. > > >> > > > > > > > > >> > > > > > > Does that make sense? > > >> > > > > > > > > >> > > > > > > Thanks, > > >> > > > > > > Damian > > >> > > > > > > > > >> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll < > > >> [email protected]> > > >> > > > wrote: > > >> > > > > > > > > >> > > > > > > > > There is no outer-join for GlobalKTables as the tables > > >> may be > > >> > > > keyed > > >> > > > > > > > > differently. So you need to use the key from the left > > >> side of > > >> > > the > > >> > > > > > join > > >> > > > > > > > > along with the KeyValueMapper to resolve the right > side > > of > > >> > the > > >> > > > > join. > > >> > > > > > > This > > >> > > > > > > > > wont work the other way around. > > >> > > > > > > > > > >> > > > > > > > Care to elaborate why it won't work the other way > around? > > >> If, > > >> > > for > > >> > > > > > > example, > > >> > > > > > > > we swapped the call from leftTable.join(rightTable) to > > >> > > > > > > > rightTable.join(leftTable), that join would work, too. > > >> > Perhaps I > > >> > > > am > > >> > > > > > > > missing something though. :-) > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy < > > >> > > [email protected]> > > >> > > > > > > wrote: > > >> > > > > > > > > > >> > > > > > > > > Hi Matthias, > > >> > > > > > > > > > > >> > > > > > > > > Thanks for the feedback. > > >> > > > > > > > > > > >> > > > > > > > > There is no outer-join for GlobalKTables as the tables > > >> may be > > >> > > > keyed > > >> > > > > > > > > differently. So you need to use the key from the left > > >> side of > > >> > > the > > >> > > > > > join > > >> > > > > > > > > along with the KeyValueMapper to resolve the right > side > > of > > >> > the > > >> > > > > join. > > >> > > > > > > This > > >> > > > > > > > > wont work the other way around. > > >> > > > > > > > > > > >> > > > > > > > > On the bootstrapping concern. If the application is > > >> failing > > >> > > > before > > >> > > > > > > > > bootstrapping finishes, the problem is likely to be > > >> related > > >> > to > > >> > > a > > >> > > > > > > terminal > > >> > > > > > > > > exception, i.e., running out of disk space, corrupt > > state > > >> > > stores > > >> > > > > etc. > > >> > > > > > > In > > >> > > > > > > > > these cases, we wouldn't want the application to > > continue. > > >> > So i > > >> > > > > think > > >> > > > > > > > this > > >> > > > > > > > > is ok. > > >> > > > > > > > > > > >> > > > > > > > > Thanks, > > >> > > > > > > > > Damian > > >> > > > > > > > > > > >> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax < > > >> > > > [email protected] > > >> > > > > > > > >> > > > > > > > wrote: > > >> > > > > > > > > > > >> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating > > example! > > >> > > > > > > > > > > > >> > > > > > > > > > A few comments: > > >> > > > > > > > > > > > >> > > > > > > > > > - why is there no outer-join for GlobalKTables > > >> > > > > > > > > > - on bootstrapping GlobalKTable, could it happen > that > > >> this > > >> > > > never > > >> > > > > > > > > > finishes if the application fails before > bootstrapping > > >> > > finishes > > >> > > > > and > > >> > > > > > > new > > >> > > > > > > > > > data gets written at the same time? Do we need to > > guard > > >> > > against > > >> > > > > > this > > >> > > > > > > > > > (seems to be a very rare corner case, so maybe not > > >> > required)? > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > -Matthias > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote: > > >> > > > > > > > > > > Hi all, > > >> > > > > > > > > > > > > >> > > > > > > > > > > I would like to start the discussion on KIP-99: > > >> > > > > > > > > > > > > >> > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage. > > >> > > > > > > > > action?pageId=67633649 > > >> > > > > > > > > > > > > >> > > > > > > > > > > Looking forward to your feedback. > > >> > > > > > > > > > > > > >> > > > > > > > > > > Thanks, > > >> > > > > > > > > > > Damian > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > -- > > >> > > > > > *Michael G. Noll* > > >> > > > > > Product Manager | Confluent > > >> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860> > > <(650)%20453-5860> > > >> > <(650)%20453-5860> | @miguno < > > >> > > > https://twitter.com/miguno > > >> > > > > > > > >> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | > Blog > > >> > > > > > <http://www.confluent.io/blog> > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang
