Randall, I updated the KIP to add your suggestion above and better explain the few points that were outstanding since last week.
Almog, I agree that 'timestamp' was too generic. I'm ok with 'discoverTimestamp'. I updated the KIP with your naming suggestion. Best, Konstantine On Tue, Jan 21, 2020 at 10:43 AM Almog Gavra <[email protected]> wrote: > Thanks again Konstantine - really excited about this KIP! > > I'm about ready to +1 (non-binding) it with just one comment left: What do > you think about changing the timestamp field to "discoverTimestamp" or > something like that to indicate that it is the timestamp of the _first_ > time we recorded/discovered this topic. This is important if we later want > to add a "mostRecentTimestamp" field in the future to distinguish between > the two. > > Almog > > On Tue, Jan 21, 2020 at 8:01 AM Randall Hauch <[email protected]> wrote: > > > Thanks, Konstantine. > > > > One minor request to clarify the following sentence: > > > > > > As soon as a worker detects the addition of a topic to a connector's set > of > > active topics, the worker will cease to post update messages to the > > status.storage.topic for that connector. > > > > > > As it stands, it sounds like the worker will not write *any more active > > topic records for this or any connectors* to the topic specified by the > > `status.storage.topic` worker configuration once the worker detects (by > > reading) a new active topic. I suspect that this is not the intention, > and > > that instead it is trying to say that no more messages *for this topic > and > > connector*. IOW, something more like: > > > > > > As soon as a worker detects the addition of a topic to a connector's set > of > > active topics, the worker will not post to the status.storage.topic > > additional update records for the connector and this newly-detected > active > > topic. > > > > > > Otherwise, this KIP looks great! > > > > Best regards, > > > > Randall > > > > On Fri, Jan 17, 2020 at 8:04 PM Konstantine Karantasis < > > [email protected]> wrote: > > > > > Hi all, > > > > > > I've updated KIP-558 with the following based on our previous > discussion: > > > > > > * Added timestamp to the metadata (the record value). > > > * The KIP now mentions a metric-based implementation in the Rejected > > > Alternatives section. > > > * The record key format is now using the single character ':' as a > > > separator between topic-${topic name} and connector-${connector name} > > > * Added a bullet point to mention that the topic storing the new > > > information can be a partitioned topic. > > > * The KIP mentions that the feature does not require rebuilding > > connectors > > > (no changes in public interfaces/classes). > > > * Added a security section. > > > * KIP preserves symmetry with respect to reset between both types of > > > connectors and keeps reset and config as separate, unrelated endpoints. > > > > > > Given than we made significant progress these past few days and only a > > few > > > minor improvements in the KIPs text are remaining, I'd like to start > the > > > vote today, so that we give this KIP the necessary time (72 hours) to > > have > > > a chance to be voted by the KIP deadline next Wednesday, Jan 22nd. > > > Let's return here, or the main vote thread for any comments (either > minor > > > to major). > > > > > > Best, > > > Konstantine > > > > > > > > > > > > On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis < > > > [email protected]> wrote: > > > > > > > > > > > Thanks for the follow up Chris. Replies below: > > > > > > > > On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton < > > [email protected] > > > > > > > > wrote: > > > > > > > >> Thanks, Konstantine. Just a few more questions: > > > >> > > > >> > > 2. What is the motivation for the `topic.tracking.allow.reset` > > > config? > > > >> Is > > > >> > > there any anticipated case where it would be useful to have > topic > > > >> tracking > > > >> > > enabled but with resets disabled? We could easily add this > > > >> configuration > > > >> > > later if a use case arises, but if we add it now it'll be > > difficult > > > to > > > >> > > remove. > > > >> > > > > > >> > > > >> > The motivation is for operators of a Connect cluster to be able to > > > >> disable > > > >> > resetting the history of active topics altogether, while allowing > at > > > the > > > >> > same time to view the active sets. > > > >> > > > >> What I was trying to ask was, is there a use case for enabling the > > > latter > > > >> but not the former? We should be careful about adding extra worker > > > configs > > > >> and unless we can anticipate a reasonable scenario in which this > would > > > >> happen, we should err on the side of caution and avoid adding a > config > > > >> that > > > >> would be difficult to remove later but, comparably, much easier to > > add. > > > >> > > > > > > > > The application use case is the ability to have immutable histories > of > > > > topic usage or control when resets are allowed and how they are > > performed > > > > (e.g. resets could be allowed briefly during a maintenance phase and > > get > > > > disabled again). > > > > I'm also never thrilled when I add an extra configuration parameter. > > > > However namespacing here will help with the extra cognitive burden. > > > > Similarly the defaults should cover most use cases too. > > > > > > > > > > 5. As far as automatic resets for sink connectors go, I agree > with > > > your > > > >> > > reasoning about the inherent asymmetry between sinks and > sources, > > > and > > > >> with > > > >> > > the motivation to avoid confusing users by listing > > > no-longer-consumed > > > >> > > topics in the active topics for a sink connector. I think that > > this > > > >> > > asymmetry is worth avoiding a scenario where a connector is > > > >> reconfigured to > > > >> > > only consume from topic "foo" but, from a prior configuration, > > topic > > > >> "bar" > > > >> > > is still listed in its active topics. > > > >> > > I do want to request clarification on the meaning of the phrase > > "any > > > >> topics > > > >> > > no longer consumed" as used under the header "Restarting, > > > >> reconfiguring > > > >> or > > > >> > > deleting a connector". Does this mean that the current set of > > active > > > >> topics > > > >> > > for the connector will be filtered and any that are longer > > contained > > > >> in > > > >> the > > > >> > > sink connector's "topics" config or matched by its > "topics.regex" > > > >> config > > > >> > > will be removed, or does it mean that all topics will be removed > > and > > > >> then > > > >> > > the active topics list will be repopulated as records are > consumed > > > >> from > > > >> new > > > >> > > topics? > > > >> > > > > > >> > > > >> > The intention was to imply the former. But based on Randall's > > comment > > > >> > above, I'm changing the KIP to include a reset parameter in the > PUT > > > >> > /connectors/{name}/config endpoint > > > >> > In this case, the reset will be a complete reset for both source > and > > > >> sink > > > >> > connectors. This will help keeping the behavior symmetric between > > the > > > >> two > > > >> > connector types. > > > >> > > > >> I did see Randall's suggestion, but I was hoping we could retain > some > > > more > > > >> intelligent behavior. Two things I'd like for us to avoid if > possible: > > > >> > > > >> - Sinks consuming from infrequently-written topics unnecessarily > > > dropping > > > >> those topics, either as part of an explicit reset or an implicit one > > > >> - Sinks listing a topic in their active topics list that they are, > in > > > >> reality, no longer consuming from > > > >> > > > >> Intelligently filtering out no-longer-consumed topics from a sink > > > >> connector's active topics list (instead of blanket resetting, or not > > > >> resetting at all) would prevent both of those from happening. We > could > > > >> expand the proposed reset parameter from a boolean to a three-option > > > >> parameter with "none" (don't reset the topics list), "all" (reset > the > > > >> entire topics list), and "infer" (reset all topics if for a source, > or > > > >> intelligently filter out no-longer-consumed topics if for a sink). > > > >> > > > > > > > > I don't see significant advantages in the complexity that the > > three-value > > > > query parameter would introduce. > > > > Overall resetting is included for convenience and is not essential to > > the > > > > main objective of this KIP which is to track the topics used by a > > > connector > > > > during its lifetime. > > > > I think it's desirable to strike a good balance between the objective > > of > > > > tracking the topics used by connectors and keeping things simple. > > > > Given that the general programming model that Kafka Connect supports > is > > > > that of continuous streams of events, representing active topics as > > > either > > > > the topics that a connector has used since it was first created or as > > the > > > > topics that have been actively used since the latest reset is > > sufficient > > > to > > > > cover a large majority of use cases. > > > > > > > > Furthermore, I see future KIPs that would add features to topic > > tracking > > > > preferable in comparison to future KIPs that would try to remedy this > > > first > > > > KIP with adjustments, simplifications and deprecation of features. > > > > Therefore, the query parameter, which I'll add to the KIP shortly, > can > > > > indeed be represented as a boolean. Absence retains all topics and > > > presence > > > > resets all the topics. > > > > Based on the discussion so far, it seems that keeping things simple > and > > > > symmetric is preferred. I think I agree, even if in the initial draft > > of > > > > the KIP I described how we could inspect the configuration of a sink > > > > connector for changes. > > > > > > > > Best, > > > > Konstantine > > > > > > > > > > > > Independent of your thoughts on the above, what will the default > value > > > for > > > >> the newly-proposed parameter be? If resets are performed by default, > > the > > > >> first scenario I outlined would become possible; if not, then the > > second > > > >> would become possible. I'd lean towards performing them by default > but > > > >> would be interested in others' thoughts. (If the proposed "infer" > > value > > > >> were the default, neither scenario would be an issue). > > > >> > > > >> Cheers, > > > >> > > > >> Chris > > > >> > > > >> On Fri, Jan 17, 2020 at 2:01 PM Konstantine Karantasis < > > > >> [email protected]> wrote: > > > >> > > > >> > Hey Chris! Thanks for the comments. Answers inline below: > > > >> > > > > >> > On Fri, Jan 17, 2020 at 11:47 AM Christopher Egerton < > > > >> [email protected]> > > > >> > wrote: > > > >> > > > > >> > > Hi Konstantine, > > > >> > > > > > >> > > Thanks for the KIP! There's been a lot of productive discussion > so > > > >> far so > > > >> > > I'll try to keep my remarks brief. > > > >> > > > > > >> > > 1. As far as resetting the active topics for a connector goes, > > it's > > > >> noted > > > >> > > in the KIP that this can be done for a deleted connector. Can > this > > > >> also > > > >> > be > > > >> > > done for connectors that were never created to begin with? What > > > would > > > >> the > > > >> > > behavior be in this case? (Can this be clarified in the KIP?) > > > >> > > > > > >> > > > > >> > Indeed, the intention is to keep reset as an independent and > > > idempotent > > > >> > method. > > > >> > Keep in mind that a tombstone will be written to the topic if the > > > >> in-memory > > > >> > view (of active topics) of the worker that serves the request > > contains > > > >> this > > > >> > connector. > > > >> > This should at least prevent fake reset requests from filling up > the > > > >> topic > > > >> > with tombstone messages. > > > >> > > > > >> > > > > >> > > > > >> > > 2. What is the motivation for the `topic.tracking.allow.reset` > > > >> config? Is > > > >> > > there any anticipated case where it would be useful to have > topic > > > >> > tracking > > > >> > > enabled but with resets disabled? We could easily add this > > > >> configuration > > > >> > > later if a use case arises, but if we add it now it'll be > > difficult > > > to > > > >> > > remove. > > > >> > > > > > >> > > > > >> > The motivation is for operators of a Connect cluster to be able to > > > >> disable > > > >> > resetting the history of active topics altogether, while allowing > at > > > the > > > >> > same time to view the active sets. > > > >> > > > > >> > > > > >> > > 3. Nit - the JSON formatting in the value format/value example > > > columns > > > >> > > under the "Format of the new status record" heading is a little > > > >> > confusing. > > > >> > > Assuming the top-level value is meant to be an object, it should > > be > > > >> > wrapped > > > >> > > in braces ("{" and "}"). > > > >> > > > > > >> > > > > >> > Good catch. Fixed. > > > >> > > > > >> > > > > >> > > 4. The KIP focuses heavily on the use of the status topic for > > > storage > > > >> of > > > >> > > connector topic information, but presumably we'd also want this > > > >> > information > > > >> > > to be available in standalone mode. If this is the case, it'd be > > > nice > > > >> to > > > >> > > tweak the language to refer explicitly to distributed mode when > > > >> > discussing > > > >> > > the changes to the status topic and note (probably just in once > > > place) > > > >> > that > > > >> > > similar functionality will also be added to the standalone > > worker's > > > >> > > in-memory status store. > > > >> > > > > > >> > > > > >> > It's true that the design is detailed w.r.t. what should happen in > > the > > > >> > KafkaStatusBackingStore which is a Kafka-based implementation of > the > > > >> > StatusBackingStore interface. This is intentional because this > > > >> > implementation influences and informs the semantics of topic > > tracking. > > > >> I'd > > > >> > prefer not to make the language too abstract here. A KIP is not > > > exactly > > > >> a > > > >> > standard and KIPs often discuss the impact of implementation in > > > behavior > > > >> > (this KIP is a good example). But I'm happy to add a note to > mention > > > >> that > > > >> > these semantics will apply to standalone mode too. > > > >> > > > > >> > > > > >> > > 5. As far as automatic resets for sink connectors go, I agree > with > > > >> your > > > >> > > reasoning about the inherent asymmetry between sinks and > sources, > > > and > > > >> > with > > > >> > > the motivation to avoid confusing users by listing > > > no-longer-consumed > > > >> > > topics in the active topics for a sink connector. I think that > > this > > > >> > > asymmetry is worth avoiding a scenario where a connector is > > > >> reconfigured > > > >> > to > > > >> > > only consume from topic "foo" but, from a prior configuration, > > topic > > > >> > "bar" > > > >> > > is still listed in its active topics. > > > >> > > I do want to request clarification on the meaning of the phrase > > "any > > > >> > topics > > > >> > > no longer consumed" as used under the header "Restarting, > > > >> reconfiguring > > > >> > or > > > >> > > deleting a connector". Does this mean that the current set of > > active > > > >> > topics > > > >> > > for the connector will be filtered and any that are longer > > contained > > > >> in > > > >> > the > > > >> > > sink connector's "topics" config or matched by its > "topics.regex" > > > >> config > > > >> > > will be removed, or does it mean that all topics will be removed > > and > > > >> then > > > >> > > the active topics list will be repopulated as records are > consumed > > > >> from > > > >> > new > > > >> > > topics? > > > >> > > > > > >> > > > > >> > The intention was to imply the former. But based on Randall's > > comment > > > >> > above, I'm changing the KIP to include a reset parameter in the > PUT > > > >> > /connectors/{name}/config endpoint > > > >> > In this case, the reset will be a complete reset for both source > and > > > >> sink > > > >> > connectors. This will help keeping the behavior symmetric between > > the > > > >> two > > > >> > connector types. > > > >> > > > > >> > Best, > > > >> > Konstantine > > > >> > > > > >> > > > > >> > > Cheers, > > > >> > > > > > >> > > Chris > > > >> > > > > > >> > > On Thu, Jan 16, 2020 at 1:51 PM Konstantine Karantasis < > > > >> > > [email protected]> wrote: > > > >> > > > > > >> > > > Thanks for new comments Randall. Following up with my replies > > > inline > > > >> > > below. > > > >> > > > I'll also go ahead and update the KIP with the suggestions > that > > > are > > > >> > > > outstanding right now and post a summary of the changes. > > > >> > > > > > > >> > > > On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch < > [email protected] > > > > > > >> > wrote: > > > >> > > > > > > >> > > > > My responses are inline: > > > >> > > > > > > > >> > > > > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis < > > > >> > > > > [email protected]> wrote: > > > >> > > > > > > > >> > > > > > Hi Randall, Tom and Almog. I'm excited to read your > > comments. > > > >> I'll > > > >> > > > reply > > > >> > > > > in > > > >> > > > > > separate emails, in order. > > > >> > > > > > > > > >> > > > > > First, to Randall's comments, I'm replying below with a > > > >> reference > > > >> > to > > > >> > > > the > > > >> > > > > > comment number: > > > >> > > > > > > > > >> > > > > > 1. Although I can imagine we'd be interested in adding > > > >> additional > > > >> > > > > metadata > > > >> > > > > > in the record value, I didn't see the need for a timestamp > > in > > > >> this > > > >> > > > first > > > >> > > > > > draft. > > > >> > > > > > Now that you mention, the way I'd interpret a timestamp in > > the > > > >> > > > connector > > > >> > > > > > status record value would be as an approximation of since > > when > > > >> this > > > >> > > > > > connector has been using this topic. > > > >> > > > > > Happy to add this if we think this info is useful. Of > > course, > > > >> > > accuracy > > > >> > > > of > > > >> > > > > > this information depends on message retention in Kafka and > > on > > > >> how > > > >> > > long > > > >> > > > > the > > > >> > > > > > workers have been running without a restart, so this might > > > make > > > >> > this > > > >> > > > > > approximation less useful if it gets recomputed from time > to > > > >> time. > > > >> > > > > > To your reference in "Recording active topics" I'll reply > > > below, > > > >> > > > because > > > >> > > > > > that's Tom's question too. > > > >> > > > > > > > > >> > > > > > > > >> > > > > Makes sense that the timestamp in the connector is the > > > >> (approximate) > > > >> > > time > > > >> > > > > that the connector has been using the topic. I do think it's > > > worth > > > >> > > adding > > > >> > > > > in the record value (not relying upon Kafka record > timestamp). > > > >> > > > > > > > >> > > > > Regarding "message retention", by default Connect creates > the > > > >> status > > > >> > > > topic > > > >> > > > > with compaction but no deletion policy, which means infinite > > > >> > retention. > > > >> > > > > Don't several things become problematic if finite retention > is > > > >> used > > > >> > on > > > >> > > > the > > > >> > > > > status topic, or do we need to worry about this for the > active > > > >> topic > > > >> > > > > records. Do we need to periodically rewrite all of the > active > > > >> topic > > > >> > > > > records? If so, we could just write new records using the > > > original > > > >> > > > > timestamp as originally read by the worker. If the worker > does > > > >> > > > periodically > > > >> > > > > (maybe just on task startup) rewrite the active topic > records, > > > >> then > > > >> > > we'd > > > >> > > > > have to be sure about the semantics of and interplay with > > > >> concurrent > > > >> > > > > explicit "reset" calls. > > > >> > > > > > > > >> > > > > > > >> > > > Good point. These topics are configured to have infinite > > > retention. > > > >> > I'll > > > >> > > > add the timestamp as type 'long'. > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > 2. I'll explain with an example, that maybe is worth > adding > > to > > > >> the > > > >> > > KIP > > > >> > > > > > because what's expected to happen might not be as obvious > > as I > > > >> > > thought > > > >> > > > > when > > > >> > > > > > a new topic is recorded. > > > >> > > > > > Let's say we have two workers, W1 and W2, each running two > > > >> worker > > > >> > > tasks > > > >> > > > > T11 > > > >> > > > > > T12 and T21 T22 respectively associated with a connector > C1. > > > All > > > >> > > tasks > > > >> > > > > will > > > >> > > > > > run producers that will produce records to the same topic, > > > >> > > > "test-topic". > > > >> > > > > > When the connector starts, both workers track this > > connector's > > > >> set > > > >> > of > > > >> > > > > > active topics as empty. Given the absence of > synchronization > > > >> > (that's > > > >> > > > > good) > > > >> > > > > > in how this information is recorded and persisted in the > > > status > > > >> > > topic, > > > >> > > > > all > > > >> > > > > > four tasks might race to record status messages: > > > >> > > > > > > > > >> > > > > > For example: > > > >> > > > > > > > > >> > > > > > T11, running at worker W1, will send Kafka records with: > > > >> > > > > > key: topic-test-topic-connector-C1 > > > >> > > > > > value: "topic": { "connector": "some-source", "task": > > > >> > > > > "some-source-TT11", > > > >> > > > > > "name": "test-topic" } > > > >> > > > > > > > > >> > > > > > and T22, running at worker W2, will send Kafka records > with: > > > >> > > > > > key: topic-test-topic-connector-C1 > > > >> > > > > > value: "topic": { "connector": "some-source", "task": > > > >> > > > > "some-source-TT22", > > > >> > > > > > "name": "test-topic" } > > > >> > > > > > > > > >> > > > > > (similarly tasks T12 and T21 might send topic status > > records). > > > >> > > > > > > > > >> > > > > > These four records (they might not even be four but > there's > > > >> going > > > >> > to > > > >> > > be > > > >> > > > > at > > > >> > > > > > least one) may be written in any order. Because the topic > is > > > >> > > compacted > > > >> > > > > and > > > >> > > > > > these records have the same key, eventually only one > message > > > >> will > > > >> > be > > > >> > > > > > retained. > > > >> > > > > > The task ID of that message will be the ID of the task > that > > > >> wrote > > > >> > > > last. I > > > >> > > > > > can see this being used mostly for troubleshooting. > > > >> > > > > > > > > >> > > > > > > > >> > > > > Thanks for the clarification. Might be good to clarify the > > > >> language a > > > >> > > bit > > > >> > > > > more, though I'm not convinced an example is really needed. > > > >> > > > > > > > >> > > > > > > >> > > > I'll try to see how they both fit. Sure. > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > 3. I believe across the whole KIP, when I'm referring to > the > > > >> task > > > >> > > > > entity, I > > > >> > > > > > imply the worker task. Not the user code that is running > as > > > >> > > > > implementation > > > >> > > > > > of the SourceTask or SinkTask abstract classes. Didn't > want > > to > > > >> > > increase > > > >> > > > > > complexity by referring to a task as worker task. > > > >> > > > > > But I see your point and I'm going to prefer the terms > > > "worker" > > > >> and > > > >> > > > > "worker > > > >> > > > > > task" to highlight that it's the framework that is aware > of > > > this > > > >> > > > feature > > > >> > > > > > and not the user code. > > > >> > > > > > > > > >> > > > > > > > >> > > > > Thank you. > > > >> > > > > > > > >> > > > > > > >> > > > +1 > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > 4. I assumed that absence of changes to the public API > would > > > >> > indicate > > > >> > > > > that > > > >> > > > > > these interfaces/abstract classes remain unchanged. But > > > >> definitely > > > >> > > it's > > > >> > > > > > worth to explicitly mention that. > > > >> > > > > > > > > >> > > > > > > > >> > > > > Thanks! > > > >> > > > > > > > >> > > > > +1 > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > 5. That is correct. My intention is to make reset work > well > > > with > > > >> > the > > > >> > > > > > streaming programming model. Resetting (which btw is not > > > >> mandatory) > > > >> > > > means > > > >> > > > > > that you are cleaning the slate for a connector that is > > > >> currently > > > >> > > > > running, > > > >> > > > > > and its currently active topics will soon be populated > from > > > >> scratch > > > >> > > > > because > > > >> > > > > > new records will be produced or consumed. > > > >> > > > > > But resetting is not required. I see it more like a useful > > > >> > operation, > > > >> > > > in > > > >> > > > > > case users want to clean the active topics history, > without > > > >> having > > > >> > to > > > >> > > > > > delete a connector, since delete has further implications > in > > > the > > > >> > > > > > connector's progress tracking. > > > >> > > > > > > > > >> > > > > > > > >> > > > > I do think it's worth trying to clarify in the document what > > > >> happens > > > >> > > when > > > >> > > > > active topics are cleared for a connector that is currently > > > >> running. > > > >> > > > > > > > >> > > > > > > >> > > > Good point. > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > 6. I fixed the typo - thanks! I'm very much in favor of > > > >> preserving > > > >> > > > > symmetry > > > >> > > > > > between the two connector types. This has definitely more > > long > > > >> term > > > >> > > > > > benefits and may help to avoid confusion. However, the > > > >> asymmetry is > > > >> > > > > > inherited here by the asymmetry that exists today between > > > source > > > >> > and > > > >> > > > sink > > > >> > > > > > connectors. > > > >> > > > > > Source connector don't list topics in their configurations > > but > > > >> sink > > > >> > > > > > connectors do. So, if a user reconfigures a sink connector > > > with > > > >> a > > > >> > > > > different > > > >> > > > > > set of topics, if we don't reset the topics based on the > new > > > >> > configs > > > >> > > > (and > > > >> > > > > > my thought here was to match the new configuration with > the > > > set > > > >> of > > > >> > > > active > > > >> > > > > > topics), the old topics, currently not listed in the > > > connectors > > > >> > > > > > configuration, will keep showing up as active topics. The > > user > > > >> will > > > >> > > > have > > > >> > > > > to > > > >> > > > > > explicitly reset the active topics after reconfiguring to > > > avoid > > > >> > this. > > > >> > > > If > > > >> > > > > > there's consensus that preserving this asymmetry is worse > > than > > > >> > having > > > >> > > > to > > > >> > > > > > reset the active topics, I'm happy to change this in the > > KIP. > > > >> > > > > > > > > >> > > > > > > > >> > > > > Would it be easier to keep the symmetric approach (the > active > > > >> topics > > > >> > > are > > > >> > > > > cleared only explicitly) if the POST connector method > > supported > > > a > > > >> new > > > >> > > > query > > > >> > > > > parameter to reset the topics before starting (but after > > > stopping > > > >> any > > > >> > > > > already running tasks)? That makes it easy to reconfigure a > > > >> connector > > > >> > > > > (source or sink) and atomically clear the active topics > before > > > the > > > >> > > > > connector is (re)started. Without that feature, I can't just > > > >> > > reconfigure > > > >> > > > a > > > >> > > > > running sink connector and be sure that the active topics > are > > > >> cleared > > > >> > > > > atomically -- unless we adopt the asymmetric behavior > > currently > > > in > > > >> > the > > > >> > > > KIP. > > > >> > > > > > > > >> > > > > WDYT? > > > >> > > > > > > > >> > > > > > > >> > > > I like the idea. I'll update the KIP. > > > >> > > > > > > >> > > > > > > >> > > > > > 7. What I try to avoid here is the following situation: > For > > > some > > > >> > > reason > > > >> > > > > (a > > > >> > > > > > sequence of failures to write tombstones to the status > > topic), > > > >> > stale > > > >> > > > > topic > > > >> > > > > > status records remain in that topic even after a connector > > has > > > >> been > > > >> > > > > > deleted. Requiring to restart a connector with the same > name > > > >> just > > > >> > to > > > >> > > > > apply > > > >> > > > > > a follow up reset of active topics doesn't seem > necessary. I > > > >> like > > > >> > the > > > >> > > > > idea > > > >> > > > > > of decoupling connector existence from the maintenance of > > the > > > >> > status > > > >> > > > > topic. > > > >> > > > > > Of course, a similar clean up is something that the > workers > > > >> could > > > >> > > also > > > >> > > > > > perform, but to avoid complexity and potential race > > > conditions, > > > >> I'm > > > >> > > > > leaving > > > >> > > > > > this out for the moment (it's also an implementation > > detail). > > > >> > > > > > > > > >> > > > > > 8. Indeed, a security section is warranted. I believe the > > main > > > >> > > > > implication > > > >> > > > > > is that if you are able to query a connector's status, > > config, > > > >> etc > > > >> > > you > > > >> > > > > will > > > >> > > > > > be able to also see its active topics. Furthermore, if you > > are > > > >> > > > allowing a > > > >> > > > > > worker task to create topics as well as produce or consume > > > from > > > >> > > topics > > > >> > > > > only > > > >> > > > > > via connector config overrides, leaving the worker configs > > > >> without > > > >> > > > > > permissions to these topics, meaning that you assign per > > > >> connector > > > >> > > > > > permissions and not across the board, then this feature > > should > > > >> > > respect > > > >> > > > > > this. The topics are still stored in common data > structures > > > >> within > > > >> > > the > > > >> > > > > > worker and are persisted in the status topic. But this > info > > > >> should > > > >> > > not > > > >> > > > be > > > >> > > > > > leaked to anyone who's not supposed to have access to the > > > status > > > >> > > topic > > > >> > > > or > > > >> > > > > > the Connect REST API endpoints. To this respect I feel > this > > > >> feature > > > >> > > > > > inherits the assumptions and security guarantees of > similar > > > >> > > information > > > >> > > > > > already stored by the Connect framework. I'm happy to add > > this > > > >> to a > > > >> > > > > > security section, if we agree that the above cover the > > > subject. > > > >> > > > > > > > > >> > > > > > > > >> > > > > I think that makes sense, and it'd be great to add that in a > > > >> Security > > > >> > > > > section. > > > >> > > > > > > > >> > > > > > > >> > > > I'll go ahead and add this info to a Security section. > > > >> > > > > > > >> > > > > > > > >> > > > > > 9. I assumed that partitioning is implied by default, > > because > > > >> > there's > > > >> > > > no > > > >> > > > > > requirement for complete ordering of topic status records. > > But > > > >> I'll > > > >> > > add > > > >> > > > > > this fact as a separate bullet. The status.storage.topic > is > > > >> > already a > > > >> > > > > > partitioned topic. > > > >> > > > > > > > > >> > > > > > > > >> > > > > Agreed. I think it'd be sufficient to simply mention that > > > >> partition > > > >> > > will > > > >> > > > be > > > >> > > > > chosen based upon the active topic records' keys, ensuring > > that > > > >> all > > > >> > > > active > > > >> > > > > topic records for the same connector will be written to the > > same > > > >> > > > partition > > > >> > > > > and will be totally ordered. > > > >> > > > > > > > >> > > > > > > >> > > > I'm adding a bullet point to refer to partitioning for this > > topic. > > > >> > Thanks > > > >> > > > > > > >> > > > - Konstantine > > > >> > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > I'm following up with the rest of the comments, shortly. > > > >> > > > > > Thanks, > > > >> > > > > > Konstantine > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra < > > > [email protected] > > > >> > > > > >> > > > wrote: > > > >> > > > > > > > > >> > > > > > > Hi Konstantine, > > > >> > > > > > > > > > >> > > > > > > Thanks for the KIP! This is going to make automatic > > > >> integration > > > >> > > with > > > >> > > > > > > Connect much more powerful. > > > >> > > > > > > > > > >> > > > > > > My thoughts are mostly around freshness of the data and > > > being > > > >> > able > > > >> > > to > > > >> > > > > > > expose that to users. Riffing on Randall's timestamp > > > question > > > >> - > > > >> > > have > > > >> > > > we > > > >> > > > > > > considered adding some interval at which point a > connector > > > >> will > > > >> > > > > republish > > > >> > > > > > > any topics that it encounters and update the timestamp? > > That > > > >> way > > > >> > we > > > >> > > > > have > > > >> > > > > > > some refreshing mechanism that isn't as powerful as the > > > >> complete > > > >> > > > reset > > > >> > > > > > > (which may not be practical in many scenarios). > > > >> > > > > > > > > > >> > > > > > > I also agree with Randall's other point (Would it be > > better > > > to > > > >> > not > > > >> > > > > > > automatically reset connector's active topics when a > sink > > > >> > connector > > > >> > > > is > > > >> > > > > > > restarted?). I think keeping the behavior as symmetrical > > > >> between > > > >> > > sink > > > >> > > > > and > > > >> > > > > > > source connectors is a good idea. > > > >> > > > > > > > > > >> > > > > > > Lastly, with regards to the API, I can imagine it is > also > > > >> pretty > > > >> > > > useful > > > >> > > > > > to > > > >> > > > > > > answer the inverse question: "which connectors write to > > > topic > > > >> X". > > > >> > > > > Perhaps > > > >> > > > > > > we can achieve this by letting the users compute it and > > just > > > >> > expose > > > >> > > > an > > > >> > > > > > API > > > >> > > > > > > that returns the entire mapping at once (instead of > > needing > > > to > > > >> > call > > > >> > > > the > > > >> > > > > > > /connectors/{name}/topics endpoint for each connector). > > > >> > > > > > > > > > >> > > > > > > Otherwise, looks good to me! Hits the requirements that > I > > > had > > > >> in > > > >> > > mind > > > >> > > > > on > > > >> > > > > > > the nose. > > > >> > > > > > > - Almog > > > >> > > > > > > > > > >> > > > > > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley < > > > >> [email protected] > > > >> > > > > > >> > > > > wrote: > > > >> > > > > > > > > > >> > > > > > > > Hi Konstantine, > > > >> > > > > > > > > > > >> > > > > > > > Thanks for the KIP, I can see how it could be useful. > > > >> > > > > > > > > > > >> > > > > > > > a) Did you consider using a metric for this? I don't > > think > > > >> it > > > >> > > would > > > >> > > > > > > satisfy > > > >> > > > > > > > all the use cases you have in mind, but you could > > mention > > > >> it in > > > >> > > the > > > >> > > > > > > > rejected alternatives. > > > >> > > > > > > > > > > >> > > > > > > > b) If the topic name contains the string "-connector" > > then > > > >> the > > > >> > > key > > > >> > > > > > format > > > >> > > > > > > > is ambiguous. This isn't necessarily fatal because the > > > value > > > >> > will > > > >> > > > > > > > disambiguate, but it could be misleading. Any reason > not > > > to > > > >> > just > > > >> > > > use > > > >> > > > > a > > > >> > > > > > > JSON > > > >> > > > > > > > key, and simplify the value? > > > >> > > > > > > > > > > >> > > > > > > > c) I didn't understand this part: "As soon as a worker > > > >> detects > > > >> > > the > > > >> > > > > > > addition > > > >> > > > > > > > of a topic to a connector's set of active topics, the > > > worker > > > >> > will > > > >> > > > > cease > > > >> > > > > > > to > > > >> > > > > > > > post update messages to the status.storage.topic for > > that > > > >> > > > connector. > > > >> > > > > ". > > > >> > > > > > > I'm > > > >> > > > > > > > sure I've overlooking something but why is this > > necessary? > > > >> Is > > > >> > > this > > > >> > > > > were > > > >> > > > > > > the > > > >> > > > > > > > task id in the value is used? > > > >> > > > > > > > > > > >> > > > > > > > Thanks again, > > > >> > > > > > > > > > > >> > > > > > > > Tom > > > >> > > > > > > > > > > >> > > > > > > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch < > > > >> > [email protected] > > > >> > > > > > > >> > > > > > wrote: > > > >> > > > > > > > > > > >> > > > > > > > > Oh, one more thing: > > > >> > > > > > > > > > > > >> > > > > > > > > 9. There's no mention of how the status topic is > > > >> partitioned, > > > >> > > or > > > >> > > > > how > > > >> > > > > > > > > partitioning will be used by the new topic records. > > The > > > >> KIP > > > >> > > > should > > > >> > > > > > > > probably > > > >> > > > > > > > > outline this for clarity and completeness. > > > >> > > > > > > > > > > > >> > > > > > > > > Best regards, > > > >> > > > > > > > > > > > >> > > > > > > > > Randall > > > >> > > > > > > > > > > > >> > > > > > > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch < > > > >> > > [email protected]> > > > >> > > > > > > wrote: > > > >> > > > > > > > > > > > >> > > > > > > > > > Thanks, Konstantine. Overall, this KIP looks > > > interesting > > > >> > and > > > >> > > > > really > > > >> > > > > > > > > > useful, and for the most part is spot on. I do > have > > a > > > >> > number > > > >> > > of > > > >> > > > > > > > > > questions/comments about specifics: > > > >> > > > > > > > > > > > > >> > > > > > > > > > 1. The topic records have a value that includes > > the > > > >> > > > connector > > > >> > > > > > > name, > > > >> > > > > > > > > > task number that last reported the topic is > used, > > > and > > > >> > the > > > >> > > > > topic > > > >> > > > > > > > name. > > > >> > > > > > > > > > There's no mention of record timestamps, but I > > > >> wonder if > > > >> > > > it'd > > > >> > > > > be > > > >> > > > > > > > > useful to > > > >> > > > > > > > > > record this. One challenge might be that a > > > connector > > > >> > does > > > >> > > > not > > > >> > > > > > > write > > > >> > > > > > > > > to a > > > >> > > > > > > > > > topic for a while or the task remains running > for > > > >> long > > > >> > > > periods > > > >> > > > > > of > > > >> > > > > > > > > time and > > > >> > > > > > > > > > therefore the worker doesn't record that this > > topic > > > >> has > > > >> > > been > > > >> > > > > > newly > > > >> > > > > > > > > written > > > >> > > > > > > > > > to since it the task was restarted. IOW, the > > > >> semantics > > > >> > of > > > >> > > > the > > > >> > > > > > > > > timestamp may > > > >> > > > > > > > > > be a bit murky. Have you thought about > recording > > > the > > > >> > > > > timestamp, > > > >> > > > > > > and > > > >> > > > > > > > > if so > > > >> > > > > > > > > > what are the pros and cons? > > > >> > > > > > > > > > - The "Recording active topics" section says > the > > > >> > > following: > > > >> > > > > > > > > > "As soon as a worker detects the addition > of a > > > >> topic > > > >> > > to a > > > >> > > > > > > > > > connector's set of active topics, all the > > > >> connector's > > > >> > > > tasks > > > >> > > > > > > that > > > >> > > > > > > > > inspect > > > >> > > > > > > > > > source or sink records will cease to post > > update > > > >> > > messages > > > >> > > > > to > > > >> > > > > > > the > > > >> > > > > > > > > > status.storage.topic." > > > >> > > > > > > > > > This probably means the timestamp won't be > > very > > > >> > useful. > > > >> > > > > > > > > > 2. The KIP says "the Kafka record value stores > > the > > > >> ID of > > > >> > > the > > > >> > > > > > task > > > >> > > > > > > > that > > > >> > > > > > > > > > succeeded to store a topic status record last." > > > >> However, > > > >> > > > this > > > >> > > > > > is a > > > >> > > > > > > > bit > > > >> > > > > > > > > > unclear: is it really storing the last task > that > > > >> > > > successfully > > > >> > > > > > > wrote > > > >> > > > > > > > > to that > > > >> > > > > > > > > > topic (as this would require very frequent > writes > > > to > > > >> > this > > > >> > > > > > topic), > > > >> > > > > > > or > > > >> > > > > > > > > is it > > > >> > > > > > > > > > more that this is the task that was last > > *recorded* > > > >> as > > > >> > > > having > > > >> > > > > > > > written > > > >> > > > > > > > > > to the topic? (Here, "recorded" could be a bit > > of a > > > >> gray > > > >> > > > area, > > > >> > > > > > > since > > > >> > > > > > > > > this > > > >> > > > > > > > > > would depend on the how the worker periodically > > > >> records > > > >> > > this > > > >> > > > > > > > > information.) > > > >> > > > > > > > > > Any kind of clarity here might be helpful. > > > >> > > > > > > > > > 3. In the "Recording active topics" section > (and > > > the > > > >> > > > > surrounding > > > >> > > > > > > > > > sections), the "task" is used ambiguously. For > > > >> example, > > > >> > > > "when > > > >> > > > > > its > > > >> > > > > > > > > tasks > > > >> > > > > > > > > > start processing their first records ... these > > > tasks > > > >> > will > > > >> > > > > start > > > >> > > > > > > > > inspecting > > > >> > > > > > > > > > which is the Kafka topic of each of these > > records". > > > >> > IIUC, > > > >> > > > the > > > >> > > > > > > first > > > >> > > > > > > > > "task" > > > >> > > > > > > > > > mentioned is the connector's task, and the > second > > > is > > > >> the > > > >> > > > > > worker's > > > >> > > > > > > > > task. Do > > > >> > > > > > > > > > we need to distinguish this more clearly? > > > >> > > > > > > > > > 4. Maybe I missed it, but does this KIP > > explicitly > > > >> say > > > >> > > that > > > >> > > > > the > > > >> > > > > > > > > > Connector API is unchanged? It's probably worth > > > >> pointing > > > >> > > out > > > >> > > > > to > > > >> > > > > > > help > > > >> > > > > > > > > > assuage any concerns that connector > > implementations > > > >> have > > > >> > > to > > > >> > > > > > change > > > >> > > > > > > > to > > > >> > > > > > > > > make > > > >> > > > > > > > > > use of this feature. > > > >> > > > > > > > > > 5. In the "Resetting a connector's set of > active > > > >> topics" > > > >> > > > > section > > > >> > > > > > > the > > > >> > > > > > > > > > behavior is not exactly clear. Consider a user > > > >> running > > > >> > > > > connector > > > >> > > > > > > > "A", > > > >> > > > > > > > > the > > > >> > > > > > > > > > connector has been fully started and is > > processing > > > >> > > records, > > > >> > > > > and > > > >> > > > > > > the > > > >> > > > > > > > > worker > > > >> > > > > > > > > > has recorded topic usage records. Then the user > > > >> resets > > > >> > the > > > >> > > > > > active > > > >> > > > > > > > > topics > > > >> > > > > > > > > > for connector A while the connector is still > > > >> running? If > > > >> > > the > > > >> > > > > > > > connector > > > >> > > > > > > > > > writes to no new topics, before the tasks are > > > >> rebalanced > > > >> > > > then > > > >> > > > > is > > > >> > > > > > > it > > > >> > > > > > > > > correct > > > >> > > > > > > > > > that Connect would report no active topics? And > > > after > > > >> > the > > > >> > > > > tasks > > > >> > > > > > > are > > > >> > > > > > > > > > rebalance, will the worker record any topics > used > > > by > > > >> > > > connector > > > >> > > > > > A? > > > >> > > > > > > > > > 6. In the "Restaring" (misspelled) section: > > > >> > > "Reconfiguring a > > > >> > > > > > > source > > > >> > > > > > > > > > connector has also no altering effect for a > > source > > > >> > > > connector. > > > >> > > > > > > > > However, when > > > >> > > > > > > > > > reconfiguring a sink connector if the new > > > >> configuration > > > >> > no > > > >> > > > > > longer > > > >> > > > > > > > > includes > > > >> > > > > > > > > > any of the previously tracked topics, these > > topics > > > >> will > > > >> > be > > > >> > > > > > removed > > > >> > > > > > > > > from the > > > >> > > > > > > > > > set of active topics for this sink connector by > > > >> > appending > > > >> > > > > > > tombstone > > > >> > > > > > > > > > messages appropriately after the > reconfiguration > > of > > > >> the > > > >> > > > > > > connector." > > > >> > > > > > > > > Would > > > >> > > > > > > > > > it be better to not automatically reset > > connector's > > > >> > active > > > >> > > > > > topics > > > >> > > > > > > > > when a > > > >> > > > > > > > > > sink connector is restarted? Isn't that more > > > >> consistent > > > >> > > with > > > >> > > > > the > > > >> > > > > > > > > > "Resetting" behavior and the goals at the top > of > > > the > > > >> > KIP: > > > >> > > > > "it'd > > > >> > > > > > be > > > >> > > > > > > > > useful > > > >> > > > > > > > > > for users, operators and applications to know > > which > > > >> are > > > >> > > the > > > >> > > > > > topics > > > >> > > > > > > > > that a > > > >> > > > > > > > > > connector has used since it was first created"? > > > >> > > > > > > > > > 7. The `PUT /connectors/{name}/topics/reset` > > > endpoint > > > >> > > "this > > > >> > > > > > > request > > > >> > > > > > > > > > can be reapplied after the deletion of the > > > >> connector". > > > >> > > IOW, > > > >> > > > > even > > > >> > > > > > > > > though > > > >> > > > > > > > > > connector with that name doesn't exist, we can > > > still > > > >> > make > > > >> > > > this > > > >> > > > > > > > > request? How > > > >> > > > > > > > > > does this compare with other methods such as > > > >> "status"? > > > >> > > > > > > > > > 8. What are the security implications of this > > > >> proposal? > > > >> > > > > > > > > > > > > >> > > > > > > > > > As you can see, most of these can probably be > > > addressed > > > >> > > without > > > >> > > > > > much > > > >> > > > > > > > > work. > > > >> > > > > > > > > > > > > >> > > > > > > > > > Best regards, > > > >> > > > > > > > > > > > > >> > > > > > > > > > Randall > > > >> > > > > > > > > > > > > >> > > > > > > > > > On Mon, Jan 13, 2020 at 11:05 PM Konstantine > > > Karantasis > > > >> < > > > >> > > > > > > > > > [email protected]> wrote: > > > >> > > > > > > > > > > > > >> > > > > > > > > >> Hi all. > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> I just posted KIP-558: Track the set of actively > > used > > > >> > topics > > > >> > > > by > > > >> > > > > > > > > connectors > > > >> > > > > > > > > >> in Kafka Connect > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> Wiki link here: > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> I think it's a nice extension to follow up on > > KIP-158 > > > >> and > > > >> > a > > > >> > > > > useful > > > >> > > > > > > > > feature > > > >> > > > > > > > > >> to the ever increasing number of applications > that > > > are > > > >> > built > > > >> > > > > > around > > > >> > > > > > > > > Kafka > > > >> > > > > > > > > >> Connect. > > > >> > > > > > > > > >> Would love to hear what you think. > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> Best, > > > >> > > > > > > > > >> Konstantine > > > >> > > > > > > > > >> > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > >
