zhangyue19921010 opened a new pull request #10551: URL: https://github.com/apache/druid/pull/10551
<!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. --> Fixes https://github.com/apache/druid/issues/8279 <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. --> <!-- If you are a committer, follow the PR action item checklist for committers: https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. --> ### Description In our PRD Environment, we deployed two versions of Kafka, which are 0.10.2.1 and 2.4.1. And open source druid(from 0.15.0 to 0.20.0) only can support to consume messages from Kafka 2.4.1. And when consume Kafka 0.10.2.1, it throws UnsupportedVersionException : ```json 2020-11-03T07:40:03,539 WARN [KafkaSupervisor-xxxxx] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Exception in supervisor run loop for dataSource [xxxxx] org.apache.druid.indexing.seekablestream.common.StreamException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1]. at org.apache.druid.indexing.kafka.KafkaRecordSupplier.wrapExceptions(KafkaRecordSupplier.java:261) ~[?:?] at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getPosition(KafkaRecordSupplier.java:158) ~[?:?] at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:138) ~[?:?] at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:53) ~[?:?] … Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1]. ``` And as the Druid docs says ``` The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. These changes make the Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or better before using this functionality. Refer Kafka upgrade guide if you are using older version of Kafka brokers. ``` The root cause of this exception (SDK 0.2.6 + Kafka 0.10.2.1) is Hard-coded ‘props.put("isolation.level", "read_committed");’ in the ‘KafkaConsumerConfigs’ which is unnecessary. This PR cancel setting this parameter. And Users could set this parameter in consumerProperties of Supervisor spec to make druid have the ability to consume transactional topics, like ```json { "type": "kafka", "dataSchema": { "dataSource": "metrics-kafka", "timestampSpec": { "column": "timestamp", "format": "auto" }, "dimensionsSpec": { "dimensions": [], "dimensionExclusions": [ "timestamp", "value" ] }, "metricsSpec": [ { "name": "count", "type": "count" }, { "name": "value_sum", "fieldName": "value", "type": "doubleSum" }, { "name": "value_min", "fieldName": "value", "type": "doubleMin" }, { "name": "value_max", "fieldName": "value", "type": "doubleMax" } ], "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "NONE" } }, "ioConfig": { "topic": "metrics", "inputFormat": { "type": "json" }, "consumerProperties": { "bootstrap.servers": "localhost:9092", "isolation.level", "read_committed" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" }, "tuningConfig": { "type": "kafka", "maxRowsPerSegment": 5000000 } } ``` Enable this feature, Druid can both consumes Kafka 2.4.1 and Kafka 0.10.2.1 through different configurations of supervisors. Here are logs of overlord service: Supervisors with "isolation.level", "read_committed" ``` 2020-11-03T08:31:15,470 INFO [LeaderSelector[/druid/overlord/_OVERLORD]] org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [xxx, xxx, xxx] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-kafka-supervisor-hhphhhag-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = kafka-supervisor-hhphhhag group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_committed key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 10000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ``` Supervisors without "isolation.level", "read_committed" ``` allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [xxx, xxx, xxx] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-kafka-supervisor-bhfahfke-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = kafka-supervisor-bhfahfke group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 10000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ``` <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. --> <!-- Describe your patch: what did you change in code? How did you fix the problem? --> <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: --> #### Fixed the bug ... #### Renamed the class ... #### Added a forbidden-apis entry ... <!-- In each section, please describe design decisions made, including: - Choice of algorithms - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources? - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns) - Method organization and design (how the logic is split between methods, parameters and return types) - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics) --> <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. --> <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. --> <!-- Some of the aspects mentioned above may be omitted for simple and small changes. --> <hr> This PR has: - [ ] been self-reviewed. - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.) - [ ] added documentation for new or modified features or behaviors. - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links. - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml) - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader. - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met. - [ ] added integration tests. - [ ] been tested in a test Druid cluster. <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist above are strictly necessary, but it would be very helpful if you at least self-review the PR. --> <hr> ##### Key changed/added classes in this PR * `KafkaConsumerConfigs` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
