>> >> The point I want to make is that avoiding doing binary search on index >> file and avoid reading the log segments during fetch has some additional >> benefits. So if the solution works for the current KIP, it might be a >> better choice.
>Let's discuss this in a follow-on KIP. If the discussion will potentially change the protocol in the current proposal. Would it be better to discuss it now instead of in a follow-up KIP so we don't have some protocol that immediately requires a change. On Tue, Dec 19, 2017 at 9:26 AM, Colin McCabe <co...@cmccabe.xyz> wrote: > On Tue, Dec 19, 2017, at 02:16, Jan Filipiak wrote: > > Sorry for coming back at this so late. > > > > > > > > On 11.12.2017 07:12, Colin McCabe wrote: > > > On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote: > > >> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote: > > >>> Hi, > > >>> > > >>> sorry for the late reply, busy times :-/ > > >>> > > >>> I would ask you one thing maybe. Since the timeout > > >>> argument seems to be settled I have no further argument > > >>> form your side except the "i don't want to". > > >>> > > >>> Can you see that connection.max.idle.max is the exact time > > >>> that expresses "We expect the client to be away for this long, > > >>> and come back and continue"? > > >> Hi Jan, > > >> > > >> Sure, connection.max.idle.max is the exact time that we want to keep > > >> around a TCP session. TCP sessions are relatively cheap, so we can > > >> afford to keep them around for 10 minutes by default. Incremental > fetch > > >> state is less cheap, so we want to set a shorter timeout for it. We > > >> also want new TCP sessions to be able to reuse an existing incremental > > >> fetch session rather than creating a new one and waiting for the old > one > > >> to time out. > > >> > > >>> also clarified some stuff inline > > >>> > > >>> Best Jan > > >>> > > >>> > > >>> > > >>> > > >>> On 05.12.2017 23:14, Colin McCabe wrote: > > >>>> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote: > > >>>>> Hi Colin > > >>>>> > > >>>>> Addressing the topic of how to manage slots from the other thread. > > >>>>> With tcp connections all this comes for free essentially. > > >>>> Hi Jan, > > >>>> > > >>>> I don't think that it's accurate to say that cache management > "comes for > > >>>> free" by coupling the incremental fetch session with the TCP > session. > > >>>> When a new TCP session is started by a fetch request, you still > have to > > >>>> decide whether to grant that request an incremental fetch session or > > >>>> not. If your answer is that you always grant the request, I would > argue > > >>>> that you do not have cache management. > > >>> First I would say, the client has a big say in this. If the client > > >>> is not going to issue incremental he shouldn't ask for a cache > > >>> when the client ask for the cache we still have all options to deny. > > >> To put it simply, we have to have some cache management above and > beyond > > >> just giving out an incremental fetch session to anyone who has a TCP > > >> session. Therefore, caching does not become simpler if you couple the > > >> fetch session to the TCP session. > > Simply giving out an fetch session for everyone with a connection is too > > simple, > > but I think it plays well into the idea of consumers choosing to use the > > feature > > therefore only enabling where it brings maximum gains > > (replicas,MirrorMakers) > > >> > > >>>> I guess you could argue that timeouts are cache management, but I > don't > > >>>> find that argument persuasive. Anyone could just create a lot of > TCP > > >>>> sessions and use a lot of resources, in that case. So there is > > >>>> essentially no limit on memory use. In any case, TCP sessions don't > > >>>> help us implement fetch session timeouts. > > >>> We still have all the options denying the request to keep the state. > > >>> What you want seems like a max connections / ip safeguard. > > >>> I can currently take down a broker with to many connections easily. > > >>> > > >>> > > >>>>> I still would argue we disable it by default and make a flag in the > > >>>>> broker to ask the leader to maintain the cache while replicating > and also only > > >>>>> have it optional in consumers (default to off) so one can turn it > on > > >>>>> where it really hurts. MirrorMaker and audit consumers > prominently. > > >>>> I agree with Jason's point from earlier in the thread. Adding extra > > >>>> configuration knobs that aren't really necessary can harm usability. > > >>>> Certainly asking people to manually turn on a feature "where it > really > > >>>> hurts" seems to fall in that category, when we could easily enable > it > > >>>> automatically for them. > > >>> This doesn't make much sense to me. > > >> There are no tradeoffs to think about from the client's point of view: > > >> it always wants an incremental fetch session. So there is no benefit > to > > >> making the clients configure an extra setting. Updating and managing > > >> client configurations is also more difficult than managing broker > > >> configurations for most users. > > >> > > >>> You also wanted to implement > > >>> a "turn of in case of bug"-knob. Having the client indicate if the > > >>> feauture will be used seems reasonable to me., > > >> True. However, if there is a bug, we could also roll back the client, > > >> so having this configuration knob is not strictly required. > > >> > > >>>>> Otherwise I left a few remarks in-line, which should help to > understand > > >>>>> my view of the situation better > > >>>>> > > >>>>> Best Jan > > >>>>> > > >>>>> > > >>>>> On 05.12.2017 08:06, Colin McCabe wrote: > > >>>>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote: > > >>>>>>> On 03.12.2017 21:55, Colin McCabe wrote: > > >>>>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote: > > >>>>>>>>> Thanks for the explanation, Colin. A few more questions. > > >>>>>>>>> > > >>>>>>>>>> The session epoch is not complex. It's just a number which > increments > > >>>>>>>>>> on each incremental fetch. The session epoch is also useful > for > > >>>>>>>>>> debugging-- it allows you to match up requests and responses > when > > >>>>>>>>>> looking at log files. > > >>>>>>>>> Currently each request in Kafka has a correlation id to help > match the > > >>>>>>>>> requests and responses. Is epoch doing something differently? > > >>>>>>>> Hi Becket, > > >>>>>>>> > > >>>>>>>> The correlation ID is used within a single TCP session, to > uniquely > > >>>>>>>> associate a request with a response. The correlation ID is not > unique > > >>>>>>>> (and has no meaning) outside the context of that single TCP > session. > > >>>>>>>> > > >>>>>>>> Keep in mind, NetworkClient is in charge of TCP sessions, and > generally > > >>>>>>>> tries to hide that information from the upper layers of the > code. So > > >>>>>>>> when you submit a request to NetworkClient, you don't know if > that > > >>>>>>>> request creates a TCP session, or reuses an existing one. > > >>>>>>>>>> Unfortunately, this doesn't work. Imagine the client misses > an > > >>>>>>>>>> increment fetch response about a partition. And then the > partition is > > >>>>>>>>>> never updated after that. The client has no way to know > about the > > >>>>>>>>>> partition, since it won't be included in any future > incremental fetch > > >>>>>>>>>> responses. And there are no offsets to compare, since the > partition is > > >>>>>>>>>> simply omitted from the response. > > >>>>>>>>> I am curious about in which situation would the follower miss > a response > > >>>>>>>>> of a partition. If the entire FetchResponse is lost (e.g. > timeout), the > > >>>>>>>>> follower would disconnect and retry. That will result in > sending a full > > >>>>>>>>> FetchRequest. > > >>>>>>>> Basically, you are proposing that we rely on TCP for reliable > delivery > > >>>>>>>> in a distributed system. That isn't a good idea for a bunch of > > >>>>>>>> different reasons. First of all, TCP timeouts tend to be very > long. So > > >>>>>>>> if the TCP session timing out is your error detection > mechanism, you > > >>>>>>>> have to wait minutes for messages to timeout. Of course, we > add a > > >>>>>>>> timeout on top of that after which we declare the connection > bad and > > >>>>>>>> manually close it. But just because the session is closed on > one end > > >>>>>>>> doesn't mean that the other end knows that it is closed. So > the leader > > >>>>>>>> may have to wait quite a long time before TCP decides that yes, > > >>>>>>>> connection X from the follower is dead and not coming back, > even though > > >>>>>>>> gremlins ate the FIN packet which the follower attempted to > translate. > > >>>>>>>> If the cache state is tied to that TCP session, we have to keep > that > > >>>>>>>> cache around for a much longer time than we should. > > >>>>>>> Hi, > > >>>>>>> > > >>>>>>> I see this from a different perspective. The cache expiry time > > >>>>>>> has the same semantic as idle connection time in this scenario. > > >>>>>>> It is the time range we expect the client to come back an reuse > > >>>>>>> its broker side state. I would argue that on close we would get > an > > >>>>>>> extra shot at cleaning up the session state early. As opposed to > > >>>>>>> always wait for that duration for expiry to happen. > > >>>>>> Hi Jan, > > >>>>>> > > >>>>>> The idea here is that the incremental fetch cache expiry time can > be > > >>>>>> much shorter than the TCP session timeout. In general the TCP > session > > >>>>>> timeout is common to all TCP connections, and very long. To make > these > > >>>>>> numbers a little more concrete, the TCP session timeout is often > > >>>>>> configured to be 2 hours on Linux. (See > > >>>>>> https://www.cyberciti.biz/tips/linux-increasing-or- > decreasing-tcp-sockets-timeouts.html > > >>>>>> ) The timeout I was proposing for incremental fetch sessions was > one or > > >>>>>> two minutes at most. > > >>>>> Currently this is taken care of by > > >>>>> connections.max.idle.ms on the broker and defaults to something > of few > > >>>>> minutes. > > >>>> It is 10 minutes by default, which is longer than what we want the > > >>>> incremental fetch session timeout to be. There's no reason to > couple > > >>>> these two things. > > >>>> > > >>>>> Also something we could let the client change if we really wanted > to. > > >>>>> So there is no need to worry about coupling our implementation to > some > > >>>>> timeouts given by the OS, with TCP one always has full control > over the worst > > >>>>> times + one gets the extra shot cleaning up early when the close > comes through. > > >>>>> Which is the majority of the cases. > > >>>> In the majority of cases, the TCP session will be re-established. > In > > >>>> that case, we have to send a full fetch request rather than an > > >>>> incremental fetch request. > > >>> I actually have a hard time believing this. Do you have any numbers > of > > >>> any existing production system? Is it the virtualisation layer > cutting > > >>> all the connections? > > >>> We see this only on application crashes and restarts where the app > needs > > >>> todo the full anyways > > >>> as it probably continues with stores offsets. > > >> Yes, TCP connections get dropped. It happens very often in production > > >> clusters, actually. When I was working on Hadoop, one of the most > > >> common questions I heard from newcomers was "why do I see so many > > >> EOFException messages in the logs"? The other thing that happens a > lot > > >> is DNS outages or slowness. Public clouds seem to have even more > > >> unstable networks than the on-premise clusters. I am not sure why > that > > >> is. > > Hadoop has a wiki page on exactly this > > https://wiki.apache.org/hadoop/EOFException > > > > besides user errors they have servers crashing and actually loss of > > connection high on their list. > > In the case of "server goes away" the cache goes with it. So nothing to > > argue about the cache beeing reused by > > a new connection. > > > > Can you make an argument at which point the epoch would be updated > > broker side to maximise re-usage of the cache on > > lost connections. In many cases the epoch would go out of sync and we > > would need a full fetch anyways. Am I mistaken here? > > The current proposal is that the server can accept multiple requests in a > row with the same sequence number. > > Colin > > > > > > > > > > > >> > > >>>>>>>> Secondly, from a software engineering perspective, it's not a > good idea > > >>>>>>>> to try to tightly tie together TCP and our code. We would have > to > > >>>>>>>> rework how we interact with NetworkClient so that we are aware > of things > > >>>>>>>> like TCP sessions closing or opening. We would have to be > careful > > >>>>>>>> preserve the ordering of incoming messages when doing things > like > > >>>>>>>> putting incoming requests on to a queue to be processed by > multiple > > >>>>>>>> threads. It's just a lot of complexity to add, and there's no > upside. > > >>>>>>> I see the point here. And I had a small chat with Dong Lin > already > > >>>>>>> making me aware of this. I tried out the approaches and propose > the > > >>>>>>> following: > > >>>>>>> > > >>>>>>> The client start and does a full fetch. It then does incremental > fetches. > > >>>>>>> The connection to the broker dies and is re-established by > NetworkClient > > >>>>>>> under the hood. > > >>>>>>> The broker sees an incremental fetch without having state => > returns > > >>>>>>> error: > > >>>>>>> Client sees the error, does a full fetch and goes back to > incrementally > > >>>>>>> fetching. > > >>>>>>> > > >>>>>>> having this 1 additional error round trip is essentially the > same as > > >>>>>>> when something > > >>>>>>> with the sessions or epoch changed unexpectedly to the client > (say > > >>>>>>> expiry). > > >>>>>>> > > >>>>>>> So its nothing extra added but the conditions are easier to > evaluate. > > >>>>>>> Especially since we do everything with NetworkClient. Other > implementers > > >>>>>>> on the > > >>>>>>> protocol are free to optimizes this and do not do the errornours > > >>>>>>> roundtrip on the > > >>>>>>> new connection. > > >>>>>>> Its a great plus that the client can know when the error is gonna > > >>>>>>> happen. instead of > > >>>>>>> the server to always have to report back if something changes > > >>>>>>> unexpectedly for the client > > >>>>>> You are assuming that the leader and the follower agree that the > TCP > > >>>>>> session drops at the same time. When there are network problems, > this > > >>>>>> may not be true. The leader may still think the previous TCP > session is > > >>>>>> active. In that case, we have to keep the incremental fetch > session > > >>>>>> state around until we learn otherwise (which could be up to that > 2 hour > > >>>>>> timeout I mentioned). And if we get a new incoming incremental > fetch > > >>>>>> request, we can't assume that it replaces the previous one, > because the > > >>>>>> IDs will be different (the new one starts a new session). > > >>>>> As mentioned, no reason to fear some time-outs out of our control > > >>>>>>>> Imagine that I made an argument that client IDs are "complex" > and should > > >>>>>>>> be removed from our APIs. After all, we can just look at the > remote IP > > >>>>>>>> address and TCP port of each connection. Would you think that > was a > > >>>>>>>> good idea? The client ID is useful when looking at logs. For > example, > > >>>>>>>> if a rebalance is having problems, you want to know what > clients were > > >>>>>>>> having a problem. So having the client ID field to guide you is > > >>>>>>>> actually much less "complex" in practice than not having an ID. > > >>>>>>> I still cant follow why the correlation idea will not help here. > > >>>>>>> Correlating logs with it usually works great. Even with > primitive tools > > >>>>>>> like grep > > >>>>>> The correlation ID does help somewhat, but certainly not as much > as a > > >>>>>> unique 64-bit ID. The correlation ID is not unique in the > broker, just > > >>>>>> unique to a single NetworkClient. Simiarly, the correlation ID > is not > > >>>>>> unique on the client side, if there are multiple Consumers, etc. > > >>>>> Can always bump entropy in correlation IDs, never had a problem > > >>>>> of finding to many duplicates. Would be a different KIP though. > > >>>>>>>> Similarly, if metadata responses had epoch numbers (simple > incrementing > > >>>>>>>> numbers), we would not have to debug problems like clients > accidentally > > >>>>>>>> getting old metadata from servers that had been partitioned off > from the > > >>>>>>>> network for a while. Clients would know the difference between > old and > > >>>>>>>> new metadata. So putting epochs in to the metadata request is > much less > > >>>>>>>> "complex" operationally, even though it's an extra field in the > request. > > >>>>>>>> This has been discussed before on the mailing list. > > >>>>>>>> > > >>>>>>>> So I think the bottom line for me is that having the session ID > and > > >>>>>>>> session epoch, while it adds two extra fields, reduces > operational > > >>>>>>>> complexity and increases debuggability. It avoids tightly > coupling us > > >>>>>>>> to assumptions about reliable ordered delivery which tend to be > violated > > >>>>>>>> in practice in multiple layers of the stack. Finally, it > avoids the > > >>>>>>>> necessity of refactoring NetworkClient. > > >>>>>>> So there is stacks out there that violate TCP guarantees? And > software > > >>>>>>> still works? How can this be? Can you elaborate a little where > this > > >>>>>>> can be violated? I am not very familiar with virtualized > environments > > >>>>>>> but they can't really violate TCP contracts. > > >>>>>> TCP's guarantees of reliable, in-order transmission certainly can > be > > >>>>>> violated. For example, I once had to debug a cluster where a > certain > > >>>>>> node had a network card which corrupted its transmissions > occasionally. > > >>>>>> With all the layers of checksums, you would think that this was > not > > >>>>>> possible, but it happened. We occasionally got corrupted data > written > > >>>>>> to disk on the other end because of it. Even more frustrating, > the data > > >>>>>> was not corrupted on disk on the sending node-- it was a bug in > the > > >>>>>> network card driver that was injecting the errors. > > >>>>> true, but your broker might aswell read a corrupted 600GB as size > from > > >>>>> the network and die with OOM instantly. > > >>>> If you read 600 GB as the size from the network, you will not "die > with > > >>>> OOM instantly." That would be a bug. Instead, you will notice > that 600 > > >>>> GB is greater than max.message.bytes, and close the connection. > > >>> We only check max.message.bytes to late to guard against consumer > > >>> stalling. > > >>> we dont have a notion of max.networkpacket.size before we allocate > the > > >>> bytebuffer to read it into. > > >> "network packets" are not the same thing as "kafka RPCs." One Kafka > RPC > > >> could take up mutiple ethernet packets. > > >> > > >> Also, max.message.bytes has nothing to do with "consumer stalling" -- > > >> you are probably thinking about some of the fetch request > > >> configurations. max.message.bytes is used by the RPC system to figure > > >> out whether to read the full incoming RP > > > Whoops, this is incorrect. I was thinking about > > > "socket.request.max.bytes" rather than "max.message.bytes." Sorry > about > > > that. See Ismael's email as well. > > > > > > best, > > > Colin > > > > > >> best, > > >> Colin > > >> > > >>>>> Optimizing for still having functional > > >>>>> software under this circumstances is not reasonable. > > >>>>> You want to get rid of such a > > >>>>> node ASAP and pray that zookeepers ticks get corrupted often enough > > >>>>> that it finally drops out of the cluster. > > >>>>> > > >>>>> There is a good reason that these kinda things > > >>>>> https://issues.apache.org/jira/browse/MESOS-4105 > > >>>>> don't end up as kafka Jiras. In the end you can't run any software > in > > >>>>> these containers anymore. Application layer checksums are a neat > thing to > > >>>>> fail fast but trying to cope with this probably causes more bad > than > > >>>>> good. So I would argue that we shouldn't try this for the fetch > requests. > > >>>> One of the goals of Apache Kafka is to be "a streaming platform... > > >>>> [that] lets you store streams of records in a fault-tolerant way." > For > > >>>> more information, see https://kafka.apache.org/intro . > Fault-tolerance > > >>>> is explicitly part of the goal of Kafka. Prayer should be > optional, not > > >>>> required, when running the software. > > >>> Yes, we need to fail ASAP when we read corrupted packages. It seemed > > >>> to me like you tried to make the case for pray and try to stay alive. > > >>> Fault > > >>> tolerance here means. I am a fishy box i am going to let a good box > > >>> handle > > >>> it and be silent until i get fixed up. > > >>>> Crashing because someone sent you a bad packet is not reasonable > > >>>> behavior. It is a bug. Similarly, bringing down the whole cluster, > > >>>> which could a hundred nodes, because someone had a bad network > adapter > > >>>> is not reasonable behavior. It is perhaps reasonable for the > cluster to > > >>>> perform worse when hardware is having problems. But that's a > different > > >>>> discussion. > > >>> See above. > > >>>> best, > > >>>> Colin > > >>>> > > >>>>>> However, my point was not about TCP's guarantees being violated. > My > > >>>>>> point is that TCP's guarantees are only one small building block > to > > >>>>>> build a robust distributed system. TCP basically just says that > if you > > >>>>>> get any bytes from the stream, you will get the ones that were > sent by > > >>>>>> the sender, in the order they were sent. TCP does not guarantee > that > > >>>>>> the bytes you send will get there. It does not guarantee that if > you > > >>>>>> close the connection, the other end will know about it in a timely > > >>>>>> fashion. > > >>>>> These are very powerful grantees and since we use TCP we should > > >>>>> piggy pack everything that is reasonable on to it. IMO there is no > > >>>>> need to reimplement correct sequencing again if you get that from > > >>>>> your transport layer. It saves you the complexity, it makes > > >>>>> you application behave way more naturally and your api easier to > > >>>>> understand. > > >>>>> > > >>>>> There is literally nothing the Kernel wont let you decide > > >>>>> especially not any timings. Only noticeable exception being > TIME_WAIT > > >>>>> of usually 240 seconds but that already has little todo with the > broker > > >>>>> itself and > > >>>>> if we are running out of usable ports because of this then expiring > > >>>>> fetch requests > > >>>>> wont help much anyways. > > >>>>> > > >>>>> I hope I could strengthen the trust you have in userland TCP > connection > > >>>>> management. It is really powerful and can be exploited for maximum > gains > > >>>>> without much risk in my opinion. > > >>>>> > > >>>>> > > >>>>> > > >>>>>> It does not guarantee that the bytes will be received in a > > >>>>>> certain timeframe, and certainly doesn't guarantee that if you > send a > > >>>>>> byte on connection X and then on connection Y, that the remote > end will > > >>>>>> read a byte on X before reading a byte on Y. > > >>>>> Noone expects this from two independent paths of any kind. > > >>>>> > > >>>>>> best, > > >>>>>> Colin > > >>>>>> > > >>>>>>> Hope this made my view clearer, especially the first part. > > >>>>>>> > > >>>>>>> Best Jan > > >>>>>>> > > >>>>>>> > > >>>>>>>> best, > > >>>>>>>> Colin > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> If there is an error such as NotLeaderForPartition is > > >>>>>>>>> returned for some partitions, the follower can always send a > full > > >>>>>>>>> FetchRequest. Is there a scenario that only some of the > partitions in a > > >>>>>>>>> FetchResponse is lost? > > >>>>>>>>> > > >>>>>>>>> Thanks, > > >>>>>>>>> > > >>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe< > cmcc...@apache.org> wrote: > > >>>>>>>>> > > >>>>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > > >>>>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe< > cmcc...@apache.org> > > >>>>>>>>>> wrote: > > >>>>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > >>>>>>>>>>>>> Hey Colin, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks much for the update. I have a few questions below: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 1. I am not very sure that we need Fetch Session Epoch. It > seems that > > >>>>>>>>>>>>> Fetch > > >>>>>>>>>>>>> Session Epoch is only needed to help leader distinguish > between "a > > >>>>>>>>>> full > > >>>>>>>>>>>>> fetch request" and "a full fetch request and request a new > > >>>>>>>>>> incremental > > >>>>>>>>>>>>> fetch session". Alternatively, follower can also indicate > "a full > > >>>>>>>>>> fetch > > >>>>>>>>>>>>> request and request a new incremental fetch session" by > setting Fetch > > >>>>>>>>>>>>> Session ID to -1 without using Fetch Session Epoch. Does > this make > > >>>>>>>>>> sense? > > >>>>>>>>>>>> Hi Dong, > > >>>>>>>>>>>> > > >>>>>>>>>>>> The fetch session epoch is very important for ensuring > correctness. It > > >>>>>>>>>>>> prevents corrupted or incomplete fetch data due to network > reordering > > >>>>>>>>>> or > > >>>>>>>>>>>> loss. > > >>>>>>>>>>>> > > >>>>>>>>>>>> For example, consider a scenario where the follower sends a > fetch > > >>>>>>>>>>>> request to the leader. The leader responds, but the > response is lost > > >>>>>>>>>>>> because of network problems which affected the TCP > session. In that > > >>>>>>>>>>>> case, the follower must establish a new TCP session and > re-send the > > >>>>>>>>>>>> incremental fetch request. But the leader does not know > that the > > >>>>>>>>>>>> follower didn't receive the previous incremental fetch > response. It is > > >>>>>>>>>>>> only the incremental fetch epoch which lets the leader know > that it > > >>>>>>>>>>>> needs to resend that data, and not data which comes > afterwards. > > >>>>>>>>>>>> > > >>>>>>>>>>>> You could construct similar scenarios with message > reordering, > > >>>>>>>>>>>> duplication, etc. Basically, this is a stateful protocol > on an > > >>>>>>>>>>>> unreliable network, and you need to know whether the > follower got the > > >>>>>>>>>>>> previous data you sent before you move on. And you need to > handle > > >>>>>>>>>>>> issues like duplicated or delayed requests. These issues > do not affect > > >>>>>>>>>>>> the full fetch request, because it is not stateful-- any > full fetch > > >>>>>>>>>>>> request can be understood and properly responded to in > isolation. > > >>>>>>>>>>>> > > >>>>>>>>>>> Thanks for the explanation. This makes sense. On the other > hand I would > > >>>>>>>>>>> be interested in learning more about whether Becket's > solution can help > > >>>>>>>>>>> simplify the protocol by not having the echo field and > whether that is > > >>>>>>>>>>> worth doing. > > >>>>>>>>>> Hi Dong, > > >>>>>>>>>> > > >>>>>>>>>> I commented about this in the other thread. A solution which > doesn't > > >>>>>>>>>> maintain session information doesn't work here. > > >>>>>>>>>> > > >>>>>>>>>>>>> 2. It is said that Incremental FetchRequest will include > partitions > > >>>>>>>>>> whose > > >>>>>>>>>>>>> fetch offset or maximum number of fetch bytes has been > changed. If > > >>>>>>>>>>>>> follower's logStartOffet of a partition has changed, > should this > > >>>>>>>>>>>>> partition also be included in the next FetchRequest to the > leader? > > >>>>>>>>>>>> Otherwise, it > > >>>>>>>>>>>>> may affect the handling of DeleteRecordsRequest because > leader may > > >>>>>>>>>> not > > >>>>>>>>>>>> know > > >>>>>>>>>>>>> the corresponding data has been deleted on the follower. > > >>>>>>>>>>>> Yeah, the follower should include the partition if the > logStartOffset > > >>>>>>>>>>>> has changed. That should be spelled out on the KIP. Fixed. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> 3. In the section "Per-Partition Data", a partition is not > considered > > >>>>>>>>>>>>> dirty if its log start offset has changed. Later in the > section > > >>>>>>>>>>>> "FetchRequest > > >>>>>>>>>>>>> Changes", it is said that incremental fetch responses will > include a > > >>>>>>>>>>>>> partition if its logStartOffset has changed. It seems > inconsistent. > > >>>>>>>>>> Can > > >>>>>>>>>>>>> you update the KIP to clarify it? > > >>>>>>>>>>>>> > > >>>>>>>>>>>> In the "Per-Partition Data" section, it does say that > logStartOffset > > >>>>>>>>>>>> changes make a partition dirty, though, right? The first > bullet point > > >>>>>>>>>>>> is: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> * The LogCleaner deletes messages, and this changes the > log start > > >>>>>>>>>> offset > > >>>>>>>>>>>> of the partition on the leader., or > > >>>>>>>>>>>> > > >>>>>>>>>>> Ah I see. I think I didn't notice this because statement > assumes that the > > >>>>>>>>>>> LogStartOffset in the leader only changes due to LogCleaner. > In fact the > > >>>>>>>>>>> LogStartOffset can change on the leader due to either log > retention and > > >>>>>>>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner > can change > > >>>>>>>>>>> LogStartOffset though. It may be a bit better to just say > that a > > >>>>>>>>>>> partition is considered dirty if LogStartOffset changes. > > >>>>>>>>>> I agree. It should be straightforward to just resend the > partition if > > >>>>>>>>>> logStartOffset changes. > > >>>>>>>>>> > > >>>>>>>>>>>>> 4. In "Fetch Session Caching" section, it is said that > each broker > > >>>>>>>>>> has a > > >>>>>>>>>>>>> limited number of slots. How is this number determined? > Does this > > >>>>>>>>>> require > > >>>>>>>>>>>>> a new broker config for this number? > > >>>>>>>>>>>> Good point. I added two broker configuration parameters to > control > > >>>>>>>>>> this > > >>>>>>>>>>>> number. > > >>>>>>>>>>>> > > >>>>>>>>>>> I am curious to see whether we can avoid some of these new > configs. For > > >>>>>>>>>>> example, incremental.fetch.session.cache.slots.per.broker > is probably > > >>>>>>>>>> not > > >>>>>>>>>>> necessary because if a leader knows that a FetchRequest > comes from a > > >>>>>>>>>>> follower, we probably want the leader to always cache the > information > > >>>>>>>>>>> from that follower. Does this make sense? > > >>>>>>>>>> Yeah, maybe we can avoid having > > >>>>>>>>>> incremental.fetch.session.cache.slots.per.broker. > > >>>>>>>>>> > > >>>>>>>>>>> Maybe we can discuss the config later after there is > agreement on how the > > >>>>>>>>>>> protocol would look like. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>>>> What is the error code if broker does > > >>>>>>>>>>>>> not have new log for the incoming FetchRequest? > > >>>>>>>>>>>> Hmm, is there a typo in this question? Maybe you meant to > ask what > > >>>>>>>>>>>> happens if there is no new cache slot for the incoming > FetchRequest? > > >>>>>>>>>>>> That's not an error-- the incremental fetch session ID just > gets set to > > >>>>>>>>>>>> 0, indicating no incremental fetch session was created. > > >>>>>>>>>>>> > > >>>>>>>>>>> Yeah there is a typo. You have answered my question. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>>>> 5. Can you clarify what happens if follower adds a > partition to the > > >>>>>>>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? > Does leader > > >>>>>>>>>>>>> needs to generate a new session for this > ReplicaFetcherThread or > > >>>>>>>>>> does it > > >>>>>>>>>>>> re-use > > >>>>>>>>>>>>> the existing session? If it uses a new session, is the > old session > > >>>>>>>>>>>>> actively deleted from the slot? > > >>>>>>>>>>>> The basic idea is that you can't make changes, except by > sending a full > > >>>>>>>>>>>> fetch request. However, perhaps we can allow the client to > re-use its > > >>>>>>>>>>>> existing session ID. If the client sets sessionId = id, > epoch = 0, it > > >>>>>>>>>>>> could re-initialize the session. > > >>>>>>>>>>>> > > >>>>>>>>>>> Yeah I agree with the basic idea. We probably want to > understand more > > >>>>>>>>>>> detail about how this works later. > > >>>>>>>>>> Sounds good. I updated the KIP with this information. A > > >>>>>>>>>> re-initialization should be exactly the same as an > initialization, > > >>>>>>>>>> except that it reuses an existing ID. > > >>>>>>>>>> > > >>>>>>>>>> best, > > >>>>>>>>>> Colin > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>>>> BTW, I think it may be useful if the KIP can include the > example > > >>>>>>>>>> workflow > > >>>>>>>>>>>>> of how this feature will be used in case of partition > change and so > > >>>>>>>>>> on. > > >>>>>>>>>>>> Yeah, that might help. > > >>>>>>>>>>>> > > >>>>>>>>>>>> best, > > >>>>>>>>>>>> Colin > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>> Dong > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe< > cmcc...@apache.org> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> I updated the KIP with the ideas we've been discussing. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> best, > > >>>>>>>>>>>>>> Colin > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > > >>>>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > > >>>>>>>>>>>>>>>> Hi Colin, thank you for this KIP, it can become a > really > > >>>>>>>>>> useful > > >>>>>>>>>>>> thing. > > >>>>>>>>>>>>>>>> I just scanned through the discussion so far and wanted > to > > >>>>>>>>>> start a > > >>>>>>>>>>>>>>>> thread to make as decision about keeping the > > >>>>>>>>>>>>>>>> cache with the Connection / Session or having some sort > of UUID > > >>>>>>>>>>>> indN > > >>>>>>>>>>>>>> exed > > >>>>>>>>>>>>>>>> global Map. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Sorry if that has been settled already and I missed it. > In this > > >>>>>>>>>>>> case > > >>>>>>>>>>>>>>>> could anyone point me to the discussion? > > >>>>>>>>>>>>>>> Hi Jan, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I don't think anyone has discussed the idea of tying the > cache > > >>>>>>>>>> to an > > >>>>>>>>>>>>>>> individual TCP session yet. I agree that since the > cache is > > >>>>>>>>>>>> intended to > > >>>>>>>>>>>>>>> be used only by a single follower or client, it's an > interesting > > >>>>>>>>>>>> thing > > >>>>>>>>>>>>>>> to think about. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I guess the obvious disadvantage is that whenever your > TCP > > >>>>>>>>>> session > > >>>>>>>>>>>>>>> drops, you have to make a full fetch request rather than > an > > >>>>>>>>>>>> incremental > > >>>>>>>>>>>>>>> one. It's not clear to me how often this happens in > practice -- > > >>>>>>>>>> it > > >>>>>>>>>>>>>>> probably depends a lot on the quality of the network. > From a > > >>>>>>>>>> code > > >>>>>>>>>>>>>>> perspective, it might also be a bit difficult to access > data > > >>>>>>>>>>>> associated > > >>>>>>>>>>>>>>> with the Session from classes like KafkaApis (although > we could > > >>>>>>>>>>>> refactor > > >>>>>>>>>>>>>>> it to make this easier). > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> It's also clear that even if we tie the cache to the > session, we > > >>>>>>>>>>>> still > > >>>>>>>>>>>>>>> have to have limits on the number of caches we're > willing to > > >>>>>>>>>> create. > > >>>>>>>>>>>>>>> And probably we should reserve some cache slots for each > > >>>>>>>>>> follower, so > > >>>>>>>>>>>>>>> that clients don't take all of them. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Id rather see a protocol in which the client is hinting > the > > >>>>>>>>>> broker > > >>>>>>>>>>>>>> that, > > >>>>>>>>>>>>>>>> he is going to use the feature instead of a client > > >>>>>>>>>>>>>>>> realizing that the broker just offered the feature > (regardless > > >>>>>>>>>> of > > >>>>>>>>>>>>>>>> protocol version which should only indicate that the > feature > > >>>>>>>>>>>>>>>> would be usable). > > >>>>>>>>>>>>>>> Hmm. I'm not sure what you mean by "hinting." I do > think that > > >>>>>>>>>> the > > >>>>>>>>>>>>>>> server should have the option of not accepting > incremental > > >>>>>>>>>> requests > > >>>>>>>>>>>> from > > >>>>>>>>>>>>>>> specific clients, in order to save memory space. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> This seems to work better with a per > > >>>>>>>>>>>>>>>> connection/session attached Metadata than with a Map > and could > > >>>>>>>>>>>> allow > > >>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>> easier client implementations. > > >>>>>>>>>>>>>>>> It would also make Client-side code easier as there > wouldn't > > >>>>>>>>>> be any > > >>>>>>>>>>>>>>>> Cache-miss error Messages to handle. > > >>>>>>>>>>>>>>> It is nice not to have to handle cache-miss responses, I > agree. > > >>>>>>>>>>>>>>> However, TCP sessions aren't exposed to most of our > client-side > > >>>>>>>>>> code. > > >>>>>>>>>>>>>>> For example, when the Producer creates a message and > hands it > > >>>>>>>>>> off to > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> NetworkClient, the NC will transparently re-connect and > re-send a > > >>>>>>>>>>>>>>> message if the first send failed. The higher-level code > will > > >>>>>>>>>> not be > > >>>>>>>>>>>>>>> informed about whether the TCP session was > re-established, > > >>>>>>>>>> whether an > > >>>>>>>>>>>>>>> existing TCP session was used, and so on. So overall I > would > > >>>>>>>>>> still > > >>>>>>>>>>>> lean > > >>>>>>>>>>>>>>> towards not coupling this to the TCP session... > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> best, > > >>>>>>>>>>>>>>> Colin > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thank you again for the KIP. And again, if this > was clarified > > >>>>>>>>>>>> already > > >>>>>>>>>>>>>>>> please drop me a hint where I could read about it. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Best Jan > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote: > > >>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I created a KIP to improve the scalability and latency > of > > >>>>>>>>>>>>>> FetchRequest: > > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > >>>>>>>>>>>>>> Partition+Scalability > > >>>>>>>>>>>>>>>>> Please take a look. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> cheers, > > >>>>>>>>>>>>>>>>> Colin > > > >