zhangyue19921010 opened a new pull request #10551:
URL: https://github.com/apache/druid/pull/10551


   <!-- Thanks for trying to help us make Apache Druid be the best it can be! 
Please fill out as much of the following information as is possible (where 
relevant, and remove it when irrelevant) to help make the intention and scope 
of this PR clear in order to ease review. -->
   
   Fixes https://github.com/apache/druid/issues/8279
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this 
section if there is no corresponding issue. Don't reference the issue in the 
title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for 
committers:
   
https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers.
 -->
   
   ### Description
   In our PRD Environment, we deployed two versions of Kafka, which are 
0.10.2.1 and 2.4.1. And open source druid(from 0.15.0 to 0.20.0) only can 
support to consume messages from Kafka 2.4.1. And when consume Kafka 0.10.2.1, 
it throws UnsupportedVersionException :
   ```json
   2020-11-03T07:40:03,539 WARN [KafkaSupervisor-xxxxx] 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - 
Exception in supervisor run loop for dataSource [xxxxx]
   org.apache.druid.indexing.seekablestream.common.StreamException: 
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not 
support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1].
        at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.wrapExceptions(KafkaRecordSupplier.java:261)
 ~[?:?]
        at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.getPosition(KafkaRecordSupplier.java:158)
 ~[?:?]
        at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:138)
 ~[?:?]
        at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:53)
 ~[?:?]
   …
   
   Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The 
broker does not support LIST_OFFSETS with version in range [2,5]. The supported 
range is [0,1].
   ```
   And as the Druid docs says 
   ```
   The Kafka indexing service supports transactional topics which were 
introduced in Kafka 0.11.x. These changes make the Kafka consumer that Druid 
uses incompatible with older brokers. Ensure that your Kafka brokers are 
version 0.11.x or better before using this functionality. Refer Kafka upgrade 
guide if you are using older version of Kafka brokers.
   
   ```
   
   The root cause of this exception (SDK 0.2.6 + Kafka 0.10.2.1) is Hard-coded 
‘props.put("isolation.level", "read_committed");’ in the ‘KafkaConsumerConfigs’ 
which is unnecessary.
   This PR cancel setting this parameter. And Users could set this parameter in 
consumerProperties of Supervisor spec to make druid have the ability to consume 
transactional topics, like
   ```json
   {
     "type": "kafka",
     "dataSchema": {
       "dataSource": "metrics-kafka",
       "timestampSpec": {
         "column": "timestamp",
         "format": "auto"
       },
       "dimensionsSpec": {
         "dimensions": [],
         "dimensionExclusions": [
           "timestamp",
           "value"
         ]
       },
       "metricsSpec": [
         {
           "name": "count",
           "type": "count"
         },
         {
           "name": "value_sum",
           "fieldName": "value",
           "type": "doubleSum"
         },
         {
           "name": "value_min",
           "fieldName": "value",
           "type": "doubleMin"
         },
         {
           "name": "value_max",
           "fieldName": "value",
           "type": "doubleMax"
         }
       ],
       "granularitySpec": {
         "type": "uniform",
         "segmentGranularity": "HOUR",
         "queryGranularity": "NONE"
       }
     },
     "ioConfig": {
       "topic": "metrics",
       "inputFormat": {
         "type": "json"
       },
       "consumerProperties": {
         "bootstrap.servers": "localhost:9092",
         "isolation.level", "read_committed"
       },
       "taskCount": 1,
       "replicas": 1,
       "taskDuration": "PT1H"
     },
     "tuningConfig": {
       "type": "kafka",
       "maxRowsPerSegment": 5000000
     }
   }
   ```
   
   Enable this feature, Druid can both consumes Kafka 2.4.1 and Kafka 0.10.2.1 
through different configurations of supervisors.
   Here are logs of overlord service:
   Supervisors with "isolation.level", "read_committed"
   ```
   2020-11-03T08:31:15,470 INFO [LeaderSelector[/druid/overlord/_OVERLORD]] 
org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = none
        bootstrap.servers = [xxx, xxx, xxx]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-kafka-supervisor-hhphhhag-1
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = kafka-supervisor-hhphhhag
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_committed
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 10000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
   ```
   
   Supervisors without "isolation.level", "read_committed"
   
   ```
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = none
        bootstrap.servers = [xxx, xxx, xxx]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-kafka-supervisor-bhfahfke-1
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = kafka-supervisor-bhfahfke
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 10000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
   ```
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is 
a corresponding issue (referenced above), it's not necessary to repeat the 
description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the 
problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, 
create a mini-section for each of them. For example: -->
   
   #### Fixed the bug ...
   #### Renamed the class ...
   #### Added a forbidden-apis entry ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are 
corner cases and error conditions handled, such as when there are insufficient 
resources?
    - Class organization and design (how the logic is split between classes, 
inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, 
parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of 
emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative 
name) for every design (or naming) decision point and compare the alternatives 
with the designs that you've implemented (or the names you've chosen) to 
highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in 
this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in 
the development mailing list), link to that discussion from this PR description 
and explain what have changed in your final design compared to your original 
proposal or the consensus version in the end of the discussion. If something 
hasn't changed since the original discussion, you can omit a detailed 
discussion of those aspects of the design here, perhaps apart from brief 
mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small 
changes. -->
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
 (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not 
all of these items apply to every PR. Remove the items which are not done or 
not relevant to the PR. None of the items from the checklist above are strictly 
necessary, but it would be very helpful if you at least self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `KafkaConsumerConfigs`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to