Hi, Colin, Thanks for the reply.
60. Yes, I think you are right. We probably need the controller id when a broker starts up. A broker only stores the Raft leader id in the metadata file. To do the initial fetch to the Raft leader, it needs to know the host/port associated with the leader id. 62. It seems there are 2 parts to this : (1) which listener a client should use to initiate a connection to the controller and (2) which listener should a controller use to accept client requests. For (1), at any point of time, a client only needs to use one listener. I think controller.listener.name is meant for the client. So, a single value seems to make more sense. Currently, we don't have a configuration for (2). We could add a new one for that and support a list. I am wondering how useful it will be. One example that I can think of is that we can reject non-controller related requests if accepted on controller-only listeners. However, we already support separate authentication for the controller listener. So, not sure how useful it is. 63. (a) I think most users won't know controller.id defaults to broker.id + 3000. So, it can be confusing for them to set up controller.connect. If this is truly needed, it seems that it's less confusing to make controller.id required. (b) I am still trying to understand if we truly need to expose a controller.id. What if we only expose broker.id and let controller.connect just use broker.id? What will be missing? Thanks, Jun On Thu, Sep 24, 2020 at 10:55 PM Colin McCabe <cmcc...@apache.org> wrote: > On Thu, Sep 24, 2020, at 16:24, Jun Rao wrote: > > Hi, Colin, > > > > Thanks for the reply and the updated KIP. A few more comments below. > > > > Hi Jun, > > > > > 53. It seems that you already incorporated the changes in KIP-516. With > > topic ids, we don't need to wait for the topic's data to be deleted > before > > removing the topic metadata. If the topic is recreated, we can still > delete > > the data properly based on the topic id. So, it seems that we can remove > > TopicRecord.Deleting. > > > > Thanks for the reply. What I was thinking of doing here was using topic > IDs internally, but still using names externally. So the topic UUIDs are > only for the purpose of associating topics with partitions -- from the > user's point of view, topics are still identified by names. > > You're right that KIP-516 will simplify things, but I'm not sure when that > will land, so I wanted to avoid blocking the initial implementation of this > KIP on it. > > > > > 55. It seems to me that the current behavior where we favor the current > > broker registration is better. This is because uncontrolled broker > shutdown > > is rare and its impact is less severe since one just needs to wait for > the > > session timeout before restarting the broker. If a mis-configured broker > > replaces an existing broker, the consequence is more severe since it can > > cause the leader to be unavailable, a replica to be out of ISR, and add > > more load on the leaders etc. > > > > Hmm, that's a good point. Let me check this a bit more before I change > it, though. > > > > > 60. controller.connect=0...@controller0.example.com:9093, > > 1...@controller1.example.com:9093,2...@controller2.example.com : Do we need > > to > > include the controller id before @? It seems that the host/port is enough > > for establishing the connection. It would also be useful to make it clear > > that controller.connect replaces quorum.voters in KIP-595. > > > > I discussed this with Jason earlier, and he felt that the controller IDs > were needed in this configuration key. It is certainly needed when > configuring the controllers themselves, since they need to know each > others' IDs. > > > > > 61. I am not sure that we need both controller.listeners and > > controller.connect.security.protocol since the former implies the > security > > protocol. The reason that we have both inter.broker.listener.name and > > inter.broker.security.protocol is because we had the latter first and > later > > realized that the former is more general. > > > > That's a good point. I've removed this from the KIP. > > > > > 62. I am still not sure that you need controller.listeners to be a list. > > All listeners are already defined in listeners. To migrate from plaintext > > to SSL, one can configure listeners to have both plaintext and SSL. After > > that, one can just change controller.listeners from plaintext to SSL. > This > > is similar to how to change the listener for inter broker connections. > > Also, controller.listener.name may be a more accurate name? > > > > The issue that I see here is that if you are running with the controller > and broker in the same JVM, if you define a few listeners in "listeners" > they will get used as regular broker listeners, unless you put them in > controller.listeners. Therefore, controller.listeners needs to be a list. > > controller.listener.names does sound like a better name, though... I've > updated it to that. > > > > > 63. Regarding controller.id, I am trying to understand whether it's a > > required configuration or an optional one. From the KIP, it sounds like > > controller.id is optional. Then, if this is unset, it's not clear how > the > > user will obtain the controller id for setting controller.connect. > > > > If you specify broker.id but not controller.id, then controller.id will > just be set to broker.id + 3000. This was intended to make some > configurations a little bit more concise to write. You still do have to > know the controller IDs when configuring the brokers, though. If this > seems confusing then I can just make it mandatory. > > > > > 64. KIP-516 adds a flag in LeaderAndIsrRequest to indicate whether it > > includes all partitions or not. This will be used to clean up strayed > logs. > > I was thinking how we will do the same thing once the controller moves to > > Raft based. One way to do that is on broker startup, it gets the HWM in > the > > metadata log from the Raft leader and waits until its metadata catches up > > to HWM. At that point, any local log not seen in the metadata can be > > removed. Since the Fetch response returns the HWM, there seems to be > enough > > APIs to achieve this. > > > > That's a very good point. I added a note about this under Broker Startup. > > best, > Colin > > > > > Jun > > > > > > > > On Thu, Sep 24, 2020 at 1:07 PM Colin McCabe <co...@cmccabe.xyz> wrote: > > > > > On Mon, Sep 21, 2020, at 18:13, Jun Rao wrote: > > > > Hi, Colin, > > > > > > > > Sorry for the late reply. A few more comments below. > > > > > > > > > > Hi Jun, > > > > > > Thanks for taking another look. > > > > > > > > > > > 50. Configurations > > > > 50.1 controller.listeners: It seems that a controller just needs one > > > > listener. Why do we need to have a list here? Also, could you > provide an > > > > example of how this is set and what's its relationship with existing > > > > configs such as "security.inter.broker.protocol" and " > > > > inter.broker.listener.name"? > > > > > > > > > > I agree that most administrators will want to run with only one > controller > > > listener. However, just as with brokers, it is nice to have the > option to > > > expose multiple ports. > > > > > > One reason why you might want multiple ports is if you were doing a > > > migration from plaintext to SSL. You could add new SSL listeners but > > > continue to expose the PLAINTEXT listeners. Then you could add the > new SSL > > > controller config to each broker and roll each broker. Then you could > > > migrate from PLAINTEXT to SSL with no downtime. > > > > > > Here's an example configuration for the controller: > > > controller.connect=0...@controller0.example.com:9093, > > > 1...@controller1.example.com:9093,2...@controller2.example.com > > > controller.listeners=CONTROLLER > > > listeners=CONTROLLER://controller0.example.com:9093 > > > listener.security.protocol.map=CONTROLLER:SSL > > > > > > Here's an example configuration for the broker: > > > controller.connect=0...@controller0.example.com:9093, > > > 1...@controller1.example.com:9093,2...@controller2.example.com > > > controller.connect.security.protocol=SSL > > > > > > security.inter.broker.protocol or inter.broker.listener.name do not > > > affect how the broker communicates with the controller. Those > > > configurations relate to how brokers communicate with each other, but > the > > > controller is not a broker. > > > > > > (Note that I just added controller.connect.security.protocol to the > KIP -- > > > I had forgotten to put it in earlier) > > > > > > > > > > > 50.2 registration.heartbeat.interval.ms and > > > registration.lease.timeout.ms. > > > > Should we match their default value with the corresponding default > for > > > ZK? > > > > > > > > > > Fair enough. I'll set them to the values of the > zookeeper.sync.time.ms > > > and zookeeper.connection.timeout.ms configurations. I do think we > should > > > experiment later on to see what works well here, but the ZK values are > at > > > least a starting point. > > > > > > > > > > > 50.3 controller.connect: Could you provide an example? I am > wondering how > > > > it differs from quorum.voters=1@kafka-1:9092, 2@kafka-2:9092, > 3@kafka-3 > > > > :9092. > > > > > > > > > > controller.connect is intended to be the new name for quorum.voters. > > > During the vote for KIP-595, we sort of agreed to defer the discussion > > > about what this configuration should be called. I proposed this new > name > > > because it makes it clear what the configuration is for (how to > connect to > > > the controllers). > > > > > > > > > > > 50.4 controller.id: I am still not sure how this is being used. > Could > > > you > > > > explain this in more detail? > > > > > > > > > > The controller ID needs to be set on each controller. It also appears > in > > > controller.connect as the thing before the "at" sign. Its function is > > > pretty similar to broker.id. > > > > > > Broker IDs and controller IDs exist in the same ID space, so you can't > > > have a broker and a controller that use the same ID. > > > > > > > > > > > 51. BrokerHeartbeat: It seems a bit wasteful to include Listeners in > > > every > > > > heartbeat request since it typically doesn't change. Could we make > that > > > an > > > > optional field? > > > > > > > > > > Ok. > > > > > > > > > > > 52. KIP-584 adds a new ZK node /features. Should we add a > corresponding > > > > metadata record? > > > > > > > > > > Good point. I added FeatureLevelRecord. > > > > > > > > > > > 53. TopicRecord and DeleteTopic: Both DeleteTopic and > > > TopicRecord.Deleting > > > > indicate topic deletion. Could we outline the flow when each will be > set? > > > > In particular, which one indicates the intention to delete and which > one > > > > indicates the completion of the deletion. > > > > > > > > > > TopicRecord.Deleting is set when we intend to delete the topic. > > > > > > DeleteTopic removes the topic completely. I will rename DeleteTopic to > > > RemoveTopic to make this clearer. > > > > > > > > > > > 54. "The controller can generate a new broker epoch by using the > latest > > > log > > > > offset." Which offset is that? Is it the offset of the metadata > topic for > > > > the corresponding BrokerRecord? Is it guaranteed to be different on > each > > > > broker restart? > > > > > > > > > > Yes.... a new broker epoch implies that there has been a new record > > > created in the metadata log. Therefore the last committed offset must > be > > > different. > > > > > > > > > > > 55. "Thereafter, it may lose subsequent conflicts if its broker > epoch is > > > > stale. (See KIP-380 for some background on broker epoch.) The > reason > > > for > > > > favoring new processes is to accommodate the common case where a > process > > > is > > > > killed with kill -9 and then restarted. " Are you saying that if > there is > > > > an active broker registered in the controller, a new broker heartbeat > > > > request with the INITIAL state will fence the current broker > session? Not > > > > sure about this. For example, this will allow a broker with > incorrectly > > > > configured broker id to kill an existing valid broker. > > > > > > > > > > Yes, a new broker with an incorrectly configured broker id could fence > an > > > existing valid broker. > > > > > > The goal of the conflict resolution process described here is to avoid > > > having two brokers go back and forth in claiming the broker id. > Choosing > > > the newest was just an easy way to do that. > > > > > > Choosing the oldest is another possibility, and more inline with what > > > ZooKeeper does today. The problem with that is that it would not be > > > possible to re-create the broker if the old instance died suddenly, > except > > > by waiting for the full lease timeout. > > > > > > It might be possible to do a little better here by breaking the lease > if > > > the TCP connection from the broker drops. But that requires crossing > some > > > layers of abstraction in the Kafka networking stack. There's also the > > > possibility of false negatives or positives. For example, TCP > connections > > > sometimes just drop even if the node is still there. Or sometimes the > > > networking stack keeps a TCP connection alive for a while when the > other > > > end is gone. > > > > > > > > > > > 56. kafka-storage.sh: > > > > 56.1 In the info mode, what other information does it show in > addition to > > > > "kip.500.mode=enabled"? > > > > > > > > > > I think it should show a list of all the storage directories, and > whether > > > each one is formatted. In addition, it should show whether > kip.500.mode is > > > enabled, and what the cluster id is. > > > > > > > > > > > 56.2 Should the format mode take the config file as the input too > like > > > the > > > > info mode? > > > > > > > > > > Yes, that's a good idea. > > > > > > best, > > > Colin > > > > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Thu, Sep 17, 2020 at 4:12 PM Colin McCabe <cmcc...@apache.org> > wrote: > > > > > > > > > Hi Unmesh, > > > > > > > > > > That's a fair point. I have moved the lease duration to the broker > > > > > heartbeat response. That way lease durations can be changed just > be > > > > > reconfiguring the controllers. > > > > > > > > > > best, > > > > > Colin > > > > > > > > > > On Wed, Sep 16, 2020, at 07:40, Unmesh Joshi wrote: > > > > > > Thanks Colin, the changes look good to me. One small thing. > > > > > > registration.lease.timeout.ms is the configuration on the > controller > > > > > side. > > > > > > It will be good to comment on how brokers know about it, to be > able > > > to > > > > > > send LeaseDurationMs > > > > > > in the heartbeat request, > > > > > > or else it can be added in the heartbeat response for brokers to > know > > > > > about > > > > > > it. > > > > > > > > > > > > Thanks, > > > > > > Unmesh > > > > > > > > > > > > On Fri, Sep 11, 2020 at 10:32 PM Colin McCabe < > cmcc...@apache.org> > > > > > wrote: > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > I think you're right that we should use a duration here rather > > > than a > > > > > > > time. As you said, the clock on the controller will probably > not > > > > > match the > > > > > > > one on the broker. I have updated the KIP. > > > > > > > > > > > > > > > > It's important to keep in mind that messages may be > delayed in > > > the > > > > > > > > > network, or arrive out of order. When this happens, we > will > > > use > > > > > the > > > > > > > start > > > > > > > > > time specified in the request to determine if the request > is > > > stale. > > > > > > > > I am assuming that there will be a single TCP connection > > > maintained > > > > > > > between > > > > > > > > broker and active controller. So, there won't be any out of > order > > > > > > > requests? > > > > > > > > There will be a scenario of broker GC pause, which might > cause > > > > > connection > > > > > > > > timeout and broker might need to reestablish the connection. > If > > > the > > > > > pause > > > > > > > > is too long, lease will expire and the heartbeat sent after > the > > > pause > > > > > > > will > > > > > > > > be treated as a new registration (similar to restart case), > and > > > a new > > > > > > > epoch > > > > > > > > number will be assigned to the broker. > > > > > > > > > > > > > > I agree with the end of this paragraph, but not with the start > :) > > > > > > > > > > > > > > There can be out-of-order requests, since the broker will > simply > > > use a > > > > > new > > > > > > > TCP connection if the old one has problems. This can happen > for a > > > > > variety > > > > > > > of reasons. I don't think GC pauses are the most common > reason for > > > > > this to > > > > > > > happen. It's more common to see issues issues in the network > > > itself > > > > > that > > > > > > > result connections getting dropped from time to time. > > > > > > > > > > > > > > So we have to assume that messages may arrive out of order, and > > > > > possibly > > > > > > > be delayed. I added a note that heartbeat requests should be > > > ignored > > > > > if > > > > > > > the metadata log offset they contain is smaller than a previous > > > > > heartbeat. > > > > > > > > > > > > > > > When the active controller fails, the new active controller > > > needs to > > > > > be > > > > > > > > sure that it considers all the known brokers as alive till > the > > > lease > > > > > > > > expiration interval. Because registration.lease.timeout.ms, > is > > > > > > > configured > > > > > > > > on the controller, the new active controller will extend all > the > > > > > leases > > > > > > > by > > > > > > > > registration.lease.timeout.ms. I see that it won't need last > > > > > heartbeat > > > > > > > > time. > > > > > > > > > > > > > > Agreed. > > > > > > > > > > > > > > best, > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Unmesh > > > > > > > > > > > > > > > > On Sat, Sep 5, 2020 at 1:28 AM Colin McCabe < > cmcc...@apache.org> > > > > > wrote: > > > > > > > > > > > > > > > > > > Colin wrote: > > > > > > > > > > > The reason for including LeaseStartTimeMs in the > request > > > is to > > > > > > > ensure > > > > > > > > > > > that the time required to communicate with the > controller > > > gets > > > > > > > > > included in > > > > > > > > > > > the lease time. Since requests can potentially be > delayed > > > in > > > > > the > > > > > > > > > network > > > > > > > > > > > for a long time, this is important. > > > > > > > > > > > > > > > > > > On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote: > > > > > > > > > > The network time will be added anyway, because the lease > > > timer > > > > > on the > > > > > > > > > > active controller will start only after the heartbeat > request > > > > > > > reaches the > > > > > > > > > > server. > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > If the start time is not specified in the request, then the > > > network > > > > > > > time > > > > > > > > > is excluded from the heartbeat time. > > > > > > > > > > > > > > > > > > Here's an example: > > > > > > > > > Let's say broker A sends a heartbeat at time 100, and it > > > arrives > > > > > on the > > > > > > > > > controller at time 200, and the lease duration is 1000. > > > > > > > > > > > > > > > > > > The controller looks at the start time in the request, > which is > > > > > 100, > > > > > > > and > > > > > > > > > adds 1000 to it, getting 1100. On the other hand, if start > > > time > > > > > is not > > > > > > > > > specified in the request, then the expiration will be at > time > > > 1200. > > > > > > > > > That is what I mean by "the network time is included in the > > > > > expiration > > > > > > > > > time." > > > > > > > > > > > > > > > > > > > And I think, some assumption about network round trip > time is > > > > > > > > > > needed anyway to decide on the frequency of the > heartbeat ( > > > > > > > > > > registration.heartbeat.interval.ms), and lease timeout ( > > > > > > > > > > registration.lease.timeout.ms). So I think just having a > > > > > leaseTTL > > > > > > > in the > > > > > > > > > > request is easier to understand and implement. > > > > > > > > > > > > > > > > > > It's important to keep in mind that messages may be > delayed in > > > the > > > > > > > > > network, or arrive out of order. When this happens, we > will > > > use > > > > > the > > > > > > > start > > > > > > > > > time specified in the request to determine if the request > is > > > stale. > > > > > > > > > > > > > > > > > > > > Yes, I agree that the lease timeout on the controller > side > > > > > should > > > > > > > be > > > > > > > > > > > reset in the case of controller failover. The > alternative > > > > > would > > > > > > > be to > > > > > > > > > > > track the lease as hard state rather than soft state, > but I > > > > > think > > > > > > > that > > > > > > > > > > > is not really needed, and would result in more log > entries. > > > > > > > > > > My interpretation of the mention of BrokerRecord in the > KIP > > > was > > > > > that > > > > > > > this > > > > > > > > > > record exists in the Raft log. > > > > > > > > > > > > > > > > > > BrokerRecord does exist in the Raft log, but does not > include > > > the > > > > > last > > > > > > > > > heartbeat time. > > > > > > > > > > > > > > > > > > > By soft state, do you mean the broker > > > > > > > > > > records exist only on the active leader and will not be > > > > > replicated > > > > > > > in the > > > > > > > > > > raft log? If the live brokers list is maintained only on > the > > > > > active > > > > > > > > > > controller (raft leader), then, in case of leader > failure, > > > there > > > > > > > will be > > > > > > > > > a > > > > > > > > > > window where the new leader does not know about the live > > > brokers, > > > > > > > till > > > > > > > > > the > > > > > > > > > > brokers establish the leases again. > > > > > > > > > > I think it will be safer to have leases as a hard state > > > managed > > > > > by > > > > > > > > > standard > > > > > > > > > > Raft replication. > > > > > > > > > > > > > > > > > > Leases are short, so the need to re-establish them after a > > > > > controller > > > > > > > > > failover doesn't seem like a big problem. But this is > > > something > > > > > we can > > > > > > > > > tweak if it becomes an issue. One option would be to have > a > > > > > separate > > > > > > > log > > > > > > > > > which is only used by the controller nodes for this (since, > > > after > > > > > all, > > > > > > > > > brokers don't care about registration renewals). > > > > > > > > > > > > > > > > > > > Or am I misunderstanding something? (I assume that with > soft > > > > > state, > > > > > > > you > > > > > > > > > > mean something like zookeeper local sessions > > > > > > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1147.) > > > > > > > > > > > > > > > > > > > > > Our code is single threaded as well. I think it makes > > > sense > > > > > for > > > > > > > the > > > > > > > > > > > controller, since otherwise locking becomes very messy. > > > I'm > > > > > not > > > > > > > sure I > > > > > > > > > > > understand your question about duplicate broker ID > > > detection, > > > > > > > though. > > > > > > > > > > There's a section in the KIP about this -- is there a > detail > > > we > > > > > > > should > > > > > > > > > add > > > > > > > > > > there? > > > > > > > > > > > > > > > > > > This is an implementation detail that doesn't need to be > in the > > > > > KIP. > > > > > > > > > > > > > > > > > > best, > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > I assumed broker leases are implemented as a hard state. > In > > > that > > > > > > > case, to > > > > > > > > > > check for broker id conflict, we need to check the broker > > > ids at > > > > > two > > > > > > > > > places > > > > > > > > > > 1. Pending broker registrations (which are yet to be > > > committed) > > > > > 2. > > > > > > > > > Already > > > > > > > > > > committed broker registrations. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe < > > > cmcc...@apache.org > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote: > > > > > > > > > > > > >>>Can you repeat your questions about broker leases? > > > > > > > > > > > > > > > > > > > > > > > > >>>>The LeaseStartTimeMs is expected to be the > broker's > > > > > > > > > > > > 'System.currentTimeMillis()' at the point of the > > > request. The > > > > > > > active > > > > > > > > > > > > controller will add its lease period to this in order > > > >>>>to > > > > > > > compute > > > > > > > > > the > > > > > > > > > > > > LeaseEndTimeMs. > > > > > > > > > > > > > > > > > > > > > > > > I think the use of LeaseStartTimeMs and > LeaseEndTimeMs > > > in the > > > > > > > KIP is > > > > > > > > > a > > > > > > > > > > > > bit > > > > > > > > > > > > confusing. Monotonic Clock (System.nanoTime) on the > > > active > > > > > > > > > controller > > > > > > > > > > > > should be used to track leases. > > > > > > > > > > > > (For example, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114 > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > Then we will not need LeaseStartTimeMs? > > > > > > > > > > > > Instead of LeaseStartTimeMs, can we call it > LeaseTTL? The > > > > > active > > > > > > > > > > > controller > > > > > > > > > > > > can then calculate LeaseEndTime = System.nanoTime() + > > > > > LeaseTTL. > > > > > > > > > > > > In this case we might just drop LeaseEndTimeMs from > the > > > > > > > response, as > > > > > > > > > the > > > > > > > > > > > > broker already knows about the TTL and can send > > > heartbeats at > > > > > > > some > > > > > > > > > > > fraction > > > > > > > > > > > > of TTL, say every TTL/4 milliseconds.(elapsed time > on the > > > > > broker > > > > > > > > > measured > > > > > > > > > > > > by System.nanoTime) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > > > I agree that the monotonic clock is probably a better > idea > > > > > here. > > > > > > > It is > > > > > > > > > > > good to be robust against wall clock changes, although > I > > > think > > > > > a > > > > > > > > > cluster > > > > > > > > > > > which had them might suffer other issues. I will > change > > > it to > > > > > > > specify > > > > > > > > > a > > > > > > > > > > > monotonic clock. > > > > > > > > > > > > > > > > > > > > > > The reason for including LeaseStartTimeMs in the > request > > > is to > > > > > > > ensure > > > > > > > > > that > > > > > > > > > > > the time required to communicate with the controller > gets > > > > > included > > > > > > > in > > > > > > > > > the > > > > > > > > > > > lease time. Since requests can potentially be delayed > in > > > the > > > > > > > network > > > > > > > > > for a > > > > > > > > > > > long time, this is important. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have a prototype built to demonstrate this as > > > following: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala > > > > > > > > > > > > > > > > > > > > > > > > The Kip631Controller itself depends on a Consensus > > > module, to > > > > > > > > > demonstrate > > > > > > > > > > > > how possible interactions with the consensus module > will > > > look > > > > > > > like > > > > > > > > > > > > (The Consensus can be pluggable really, with an API > to > > > allow > > > > > > > reading > > > > > > > > > > > > replicated log upto HighWaterMark) > > > > > > > > > > > > > > > > > > > > > > > > It has an implementation of LeaseTracker > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala > > > > > > > > > > > > to demonstrate LeaseTracker's interaction with the > > > consensus > > > > > > > module. > > > > > > > > > > > > > > > > > > > > > > > > The implementation has the following aspects: > > > > > > > > > > > > 1. The lease tracking happens only on the active > > > controller > > > > > (raft > > > > > > > > > > > > leader) > > > > > > > > > > > > 2. Once the lease expires, it needs to propose and > > > commit a > > > > > > > > > FenceBroker > > > > > > > > > > > > record for that lease. > > > > > > > > > > > > 3. In case of active controller failure, the lease > will > > > be > > > > > > > tracked by > > > > > > > > > > > > the > > > > > > > > > > > > newly raft leader. The new raft leader starts the > lease > > > timer > > > > > > > again, > > > > > > > > > (as > > > > > > > > > > > > implemented in onBecomingLeader method of > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala > > > > > > > > > > > > ) > > > > > > > > > > > > in effect extending the lease by the time spent in > the > > > leader > > > > > > > > > election > > > > > > > > > > > > and > > > > > > > > > > > > whatever time was elapsed on the old leader. > > > > > > > > > > > > > > > > > > > > > > Yes, I agree that the lease timeout on the controller > side > > > > > should > > > > > > > be > > > > > > > > > reset > > > > > > > > > > > in the case of controller failover. The alternative > would > > > be > > > > > to > > > > > > > track > > > > > > > > > the > > > > > > > > > > > lease as hard state rather than soft state, but I think > > > that > > > > > is not > > > > > > > > > really > > > > > > > > > > > needed, and would result in more log entries. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There are working tests for this implementation here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala > > > > > > > > > > > > and an end to end test here > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala > > > > > > > > > > > > > > > > > > > > > > > > >>'m not sure what you mean by "de-duplication of the > > > > > broker." > > > > > > > Can > > > > > > > > > you > > > > > > > > > > > > give a little more context? > > > > > > > > > > > > Apologies for using the confusing term > deduplication. I > > > meant > > > > > > > broker > > > > > > > > > id > > > > > > > > > > > > conflict. > > > > > > > > > > > > As you can see in the prototype handleRequest of > > > > > KIP631Controller > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala > > > > > > > > > > > >, > > > > > > > > > > > > the duplicate broker id needs to be detected before > the > > > > > > > BrokerRecord > > > > > > > > > is > > > > > > > > > > > > submitted to the raft module. > > > > > > > > > > > > Also as implemented in the prototype, the > > > KIP631Controller is > > > > > > > single > > > > > > > > > > > > threaded, handling requests one at a time. (an > example of > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > Our code is single threaded as well. I think it makes > > > sense > > > > > for > > > > > > > the > > > > > > > > > > > controller, since otherwise locking becomes very messy. > > > I'm > > > > > not > > > > > > > sure I > > > > > > > > > > > understand your question about duplicate broker ID > > > detection, > > > > > > > though. > > > > > > > > > > > There's a section in the KIP about this -- is there a > > > detail we > > > > > > > should > > > > > > > > > add > > > > > > > > > > > there? > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe < > > > > > co...@cmccabe.xyz > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote: > > > > > > > > > > > > > > Hi Colin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > There were a few of questions I had.. > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Were my comments on the broker lease > > > implementation > > > > > (and > > > > > > > > > > > corresponding > > > > > > > > > > > > > > prototype) appropriate and do we need to change > the > > > KIP > > > > > > > > > > > > > > description accordingly?. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you repeat your questions about broker leases? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. How will broker epochs be generated? I am > > > assuming it > > > > > can > > > > > > > be > > > > > > > > > the > > > > > > > > > > > > > > committed log offset (like zxid?) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There isn't any need to use a log offset. We can > just > > > > > look at > > > > > > > an > > > > > > > > > > > > > in-memory hash table and see what the latest > number is, > > > > > and add > > > > > > > > > one, to > > > > > > > > > > > > > generate a new broker epoch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. How will producer registration happen? I am > > > assuming > > > > > it > > > > > > > > > should be > > > > > > > > > > > > > > similar to broker registration, with a similar > way to > > > > > > > generate > > > > > > > > > > > producer > > > > > > > > > > > > > id. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For the EOS stuff, we will need a few new RPCs to > the > > > > > > > controller. > > > > > > > > > I > > > > > > > > > > > think > > > > > > > > > > > > > we should do that in a follow-on KIP, though, since > > > this > > > > > one is > > > > > > > > > already > > > > > > > > > > > > > pretty big. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Because we expose Raft log to all the > brokers, any > > > > > > > > > de-duplication > > > > > > > > > > > of > > > > > > > > > > > > > the > > > > > > > > > > > > > > broker needs to happen before the requests are > > > proposed > > > > > to > > > > > > > Raft. > > > > > > > > > For > > > > > > > > > > > this > > > > > > > > > > > > > > the controller needs to be single threaded, and > > > should do > > > > > > > > > validation > > > > > > > > > > > > > > against the in-process or pending requests and > the > > > final > > > > > > > state. I > > > > > > > > > > > read a > > > > > > > > > > > > > > mention of this, in the responses in this > > > thread.Will it > > > > > be > > > > > > > > > useful to > > > > > > > > > > > > > > mention this in the KIP? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not sure what you mean by "de-duplication of > the > > > > > broker." > > > > > > > Can > > > > > > > > > you > > > > > > > > > > > > > give a little more context? > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe < > > > > > > > cmcc...@apache.org > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm thinking of calling a vote on KIP-631 on > > > Monday. > > > > > Let > > > > > > > me > > > > > > > > > know > > > > > > > > > > > if > > > > > > > > > > > > > > > there's any more comments I should address > before I > > > > > start > > > > > > > the > > > > > > > > > vote. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > cheers, > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi > wrote: > > > > > > > > > > > > > > > > >>Hi Unmesh, > > > > > > > > > > > > > > > > >>Thanks, I'll take a look. > > > > > > > > > > > > > > > > Thanks. I will be adding more to the > prototype > > > and > > > > > will > > > > > > > be > > > > > > > > > happy > > > > > > > > > > > to > > > > > > > > > > > > > help > > > > > > > > > > > > > > > > and collaborate. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Unmesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 11, 2020 at 12:28 AM Colin > McCabe < > > > > > > > > > > > cmcc...@apache.org> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jose, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > That'a s good point that I hadn't > considered. > > > It's > > > > > > > > > probably > > > > > > > > > > > worth > > > > > > > > > > > > > > > having > > > > > > > > > > > > > > > > > a separate leader change message, as you > > > mentioned. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, I'll take a look. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia > > > Sancio > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Hi Unmesh, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Very cool prototype! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Colin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The KIP proposes a record called > IsrChange > > > which > > > > > > > > > includes the > > > > > > > > > > > > > > > > > > partition, topic, isr, leader and leader > > > epoch. > > > > > > > During > > > > > > > > > normal > > > > > > > > > > > > > > > > > > operation ISR changes do not result in > leader > > > > > > > changes. > > > > > > > > > > > Similarly, > > > > > > > > > > > > > > > > > > leader changes do not necessarily > involve ISR > > > > > > > changes. > > > > > > > > > The > > > > > > > > > > > > > controller > > > > > > > > > > > > > > > > > > implementation that uses ZK modeled them > > > together > > > > > > > because > > > > > > > > > > > > > > > > > > 1. All of this information is stored in > one > > > > > znode. > > > > > > > > > > > > > > > > > > 2. ZK's optimistic lock requires that you > > > specify > > > > > > > the new > > > > > > > > > > > value > > > > > > > > > > > > > > > > > completely > > > > > > > > > > > > > > > > > > 3. The change to that znode was being > > > performed > > > > > by > > > > > > > both > > > > > > > > > the > > > > > > > > > > > > > > > controller > > > > > > > > > > > > > > > > > > and the leader. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > None of these reasons are true in > KIP-500. > > > Have > > > > > we > > > > > > > > > considered > > > > > > > > > > > > > having > > > > > > > > > > > > > > > > > > two different records? For example > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. IsrChange record which includes topic, > > > > > partition, > > > > > > > isr > > > > > > > > > > > > > > > > > > 2. LeaderChange record which includes > topic, > > > > > > > partition, > > > > > > > > > > > leader > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > leader epoch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I suspect that making this change will > also > > > > > require > > > > > > > > > changing > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > message AlterIsrRequest introduced in > > > KIP-497: > > > > > Add > > > > > > > > > > > inter-broker > > > > > > > > > > > > > API > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > alter ISR. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > -Jose > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >