Sergey Alaev created KAFKA-3746:
-----------------------------------

             Summary: InvalidReceiveException when connecting to broker over SSL
                 Key: KAFKA-3746
                 URL: https://issues.apache.org/jira/browse/KAFKA-3746
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.9.0.1
         Environment: 3-node cluster on localhost
            Reporter: Sergey Alaev


When trying to do KafkaConsumer.poll(), server closes connection with 
InvalidReceiveException. Strangely, it is repoduced only with SSL enabled 
between consumer and broker. We do not use SSL for inter-broker communication.

Consumer configuration:
{code}
[2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] [INFO]: 
ConsumerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer
        group.id = sds
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = false
        ssl.key.password = [hidden]
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = [hidden]
        session.timeout.ms = 30000
        metrics.num.samples = 2
        client.id = 
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 1000
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = SSL
        ssl.truststore.location = 
src/main/resources/ssl/kafka.client.truststore.jks
        ssl.keystore.password = [hidden]
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest
{code}

Server configuration:
{code}
[2016-05-23 15:04:51,707] INFO KafkaConfig values:
        advertised.host.name = null
        metric.reporters = []
        quota.producer.default = 9223372036854775807
        offsets.topic.num.partitions = 50
        log.flush.interval.messages = 9223372036854775807
        auto.create.topics.enable = true
        controller.socket.timeout.ms = 30000
        log.flush.interval.ms = null
        principal.builder.class = class 
org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        replica.socket.receive.buffer.bytes = 65536
        min.insync.replicas = 2
        replica.fetch.wait.max.ms = 500
        num.recovery.threads.per.data.dir = 1
        ssl.keystore.type = JKS
        default.replication.factor = 3
        ssl.truststore.password = [hidden]
        log.preallocate = false
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        fetch.purgatory.purge.interval.requests = 1000
        ssl.endpoint.identification.algorithm = null
        replica.socket.timeout.ms = 30000
        message.max.bytes = 1000012
        num.io.threads = 10
        offsets.commit.required.acks = -1
        log.flush.offset.checkpoint.interval.ms = 60000
        delete.topic.enable = true
        quota.window.size.seconds = 1
        ssl.truststore.type = JKS
        offsets.commit.timeout.ms = 5000
        quota.window.num = 11
        zookeeper.connect = 127.0.0.1:2181
        authorizer.class.name =
        num.replica.fetchers = 1
        log.retention.ms = null
        log.roll.jitter.hours = 0
        log.cleaner.enable = true
        offsets.load.buffer.size = 5242880
        log.cleaner.delete.retention.ms = 86400000
        ssl.client.auth = none
        controlled.shutdown.max.retries = 3
        queued.max.requests = 500
        offsets.topic.replication.factor = 3
        log.cleaner.threads = 1
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        socket.request.max.bytes = 104857600
        ssl.trustmanager.algorithm = PKIX
        zookeeper.session.timeout.ms = 6000
        log.retention.bytes = -1
        sasl.kerberos.min.time.before.relogin = 60000
        zookeeper.set.acl = false
        connections.max.idle.ms = 600000
        offsets.retention.minutes = 1440
        replica.fetch.backoff.ms = 1000
        inter.broker.protocol.version = 0.9.0.X
        log.retention.hours = 24
        num.partitions = 1
        broker.id.generation.enable = true
        listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        log.roll.ms = null
        log.flush.scheduler.interval.ms = 9223372036854775807
        ssl.cipher.suites = null
        log.index.size.max.bytes = 10485760
        ssl.keymanager.algorithm = SunX509
        security.inter.broker.protocol = PLAINTEXT
        replica.fetch.max.bytes = 1048576
        advertised.port = null
        log.cleaner.dedupe.buffer.size = 134217728
        replica.high.watermark.checkpoint.interval.ms = 5000
        log.cleaner.io.buffer.size = 524288
        sasl.kerberos.ticket.renew.window.factor = 0.8
        zookeeper.connection.timeout.ms = 60000
        controlled.shutdown.retry.backoff.ms = 5000
        log.roll.hours = 168
        log.cleanup.policy = delete
        host.name =
        log.roll.jitter.ms = null
        max.connections.per.ip = 2147483647
        offsets.topic.segment.bytes = 104857600
        background.threads = 10
        quota.consumer.default = 9223372036854775807
        request.timeout.ms = 30000
        log.index.interval.bytes = 4096
        log.dir = /tmp/kafka-logs
        log.segment.bytes = 1073741824
        log.cleaner.backoff.ms = 15000
        offset.metadata.max.bytes = 4096
        ssl.truststore.location = /ssl/server.truststore.jks
        group.max.session.timeout.ms = 30000
        ssl.keystore.password = [hidden]
        zookeeper.sync.time.ms = 2000
        port = 9092
        log.retention.minutes = null
        log.segment.delete.delay.ms = 60000
        log.dirs = /data
        controlled.shutdown.enable = true
        compression.type = producer
        max.connections.per.ip.overrides =
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        auto.leader.rebalance.enable = true
        leader.imbalance.check.interval.seconds = 300
        log.cleaner.min.cleanable.ratio = 0.5
        replica.lag.time.max.ms = 10000
        num.network.threads = 5
        ssl.key.password = [hidden]
        reserved.broker.max.id = 1000
        metrics.num.samples = 2
        socket.send.buffer.bytes = 102400
        ssl.protocol = TLS
        socket.receive.buffer.bytes = 102400
        ssl.keystore.location = /ssl/server.keystore.jks
        replica.fetch.min.bytes = 1
        unclean.leader.election.enable = true
        group.min.session.timeout.ms = 6000
        log.cleaner.io.buffer.load.factor = 0.9
        offsets.retention.check.interval.ms = 600000
        producer.purgatory.purge.interval.requests = 1000
        metrics.sample.window.ms = 30000
        broker.id = 1
        offsets.topic.compression.codec = 0
        log.retention.check.interval.ms = 300000
        advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
        leader.imbalance.per.broker.percentage = 10
 (kafka.server.KafkaConfig)
{code}

Client:
{code}
java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
        at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
        at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:161)
        at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45)
        at org.apache.kafka.common.network.Selector.close(Selector.java:442)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:310)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
{code}

Server:
{code}
[2016-05-23 15:07:16,427] WARN Unexpected error from /127.0.0.1; closing 
connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size 
= 369296128 larger than 104857600)
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
        at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
        at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to