This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9251a44  Issue 1118: refine HandlerBase to let only ConsumerImpl and 
ProducerImpl have client-cnx (#1354)
9251a44 is described below

commit 9251a4488c10c47e307fa0f1ebf7cb8ee3c2186a
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Tue Mar 13 23:03:28 2018 -0700

    Issue 1118: refine HandlerBase to let only ConsumerImpl and ProducerImpl 
have client-cnx (#1354)
    
    * refine handlerBase
    
    * change following @sijie's comments
---
 .../client/impl/BrokerClientIntegrationTest.java   |   2 +-
 .../apache/pulsar/client/impl/MessageIdTest.java   |   7 +-
 .../{HandlerBase.java => ConnectionHandler.java}   | 129 ++++++++-------------
 .../apache/pulsar/client/impl/ConsumerBase.java    |   9 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  47 +++++++-
 .../apache/pulsar/client/impl/HandlerState.java    |  69 +++++++++++
 .../client/impl/PartitionedConsumerImpl.java       |  14 +--
 .../client/impl/PartitionedProducerImpl.java       |  10 --
 .../apache/pulsar/client/impl/ProducerBase.java    |   5 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  56 +++++++--
 .../pulsar/client/impl/TopicsConsumerImpl.java     |  14 +--
 11 files changed, 217 insertions(+), 145 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index d7d0d73..c5447b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -59,7 +59,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.HandlerBase.State;
+import org.apache.pulsar.client.impl.HandlerState.State;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 3230006..4a9912d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -454,13 +454,8 @@ public class MessageIdTest extends BrokerTestBase {
         }
 
         // 5. Verify
-
-        // (5.1) Verify: producer's recoverChecksumError and updateChecksum 
invoked
-        verify(producer, times(1)).recoverChecksumError(any(), anyLong());
-        verify(producer, times(1)).verifyLocalBufferIsNotCorrupted(any());
-
         /**
-         * (5.3) verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => 
validates if message is corrupt
+         * verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates 
if message is corrupt
          */
         MessageImpl<byte[]> msg2 = (MessageImpl<byte[]>) 
MessageBuilder.create().setContent("message-1".getBytes())
                 .build();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
similarity index 54%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java
rename to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 867424e..d04e8a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -20,72 +20,61 @@ package org.apache.pulsar.client.impl;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.function.UnaryOperator;
-
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.HandlerState.State;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract class HandlerBase {
-    protected final PulsarClientImpl client;
-    protected final String topic;
-    private static final AtomicReferenceFieldUpdater<HandlerBase, State> 
STATE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, 
State.class, "state");
-    @SuppressWarnings("unused")
-    private volatile State state = null;
-
-    private static final AtomicReferenceFieldUpdater<HandlerBase, ClientCnx> 
CLIENT_CNX_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, 
ClientCnx.class, "clientCnx");
+class ConnectionHandler {
+    private static final AtomicReferenceFieldUpdater<ConnectionHandler, 
ClientCnx> CLIENT_CNX_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, 
ClientCnx.class, "clientCnx");
     @SuppressWarnings("unused")
     private volatile ClientCnx clientCnx = null;
+
+    protected final HandlerState state;
     protected final Backoff backoff;
 
-    enum State {
-        Uninitialized, // Not initialized
-        Connecting, // Client connecting to broker
-        Ready, // Handler is being used
-        Closing, // Close cmd has been sent to broker
-        Closed, // Broker acked the close
-        Terminated, // Topic associated with this handler
-                    // has been terminated
-        Failed // Handler is failed
-    };
-
-    public HandlerBase(PulsarClientImpl client, String topic, Backoff backoff) 
{
-        this.client = client;
-        this.topic = topic;
+    interface Connection {
+        void connectionFailed(PulsarClientException exception);
+        void connectionOpened(ClientCnx cnx);
+    }
+
+    protected Connection connection;
+
+    protected ConnectionHandler(HandlerState state, Backoff backoff, 
Connection connection) {
+        this.state = state;
+        this.connection = connection;
         this.backoff = backoff;
-        STATE_UPDATER.set(this, State.Uninitialized);
         CLIENT_CNX_UPDATER.set(this, null);
     }
 
     protected void grabCnx() {
         if (CLIENT_CNX_UPDATER.get(this) != null) {
-            log.warn("[{}] [{}] Client cnx already set, ignoring reconnection 
request", topic, getHandlerName());
+            log.warn("[{}] [{}] Client cnx already set, ignoring reconnection 
request", state.topic, state.getHandlerName());
             return;
         }
 
         if (!isValidStateForReconnection()) {
             // Ignore connection closed when we are shutting down
-            log.info("[{}] [{}] Ignoring reconnection request (state: {})", 
topic, getHandlerName(), STATE_UPDATER.get(this));
+            log.info("[{}] [{}] Ignoring reconnection request (state: {})", 
state.topic, state.getHandlerName(), state.getState());
             return;
         }
 
         try {
-            client.getConnection(topic) //
-                    .thenAccept(this::connectionOpened) //
+            state.client.getConnection(state.topic) //
+                    .thenAccept(cnx -> connection.connectionOpened(cnx)) //
                     .exceptionally(this::handleConnectionError);
         } catch (Throwable t) {
-            log.warn("[{}] [{}] Exception thrown while getting connection: ", 
topic, getHandlerName(), t);
+            log.warn("[{}] [{}] Exception thrown while getting connection: ", 
state.topic, state.getHandlerName(), t);
             reconnectLater(t);
         }
     }
 
     private Void handleConnectionError(Throwable exception) {
-        log.warn("[{}] [{}] Error connecting to broker: {}", topic, 
getHandlerName(), exception.getMessage());
-        connectionFailed(new PulsarClientException(exception));
+        log.warn("[{}] [{}] Error connecting to broker: {}", state.topic, 
state.getHandlerName(), exception.getMessage());
+        connection.connectionFailed(new PulsarClientException(exception));
 
-        State state = STATE_UPDATER.get(this);
+        State state = this.state.getState();
         if (state == State.Uninitialized || state == State.Connecting || state 
== State.Ready) {
             reconnectLater(exception);
         }
@@ -96,15 +85,15 @@ abstract class HandlerBase {
     protected void reconnectLater(Throwable exception) {
         CLIENT_CNX_UPDATER.set(this, null);
         if (!isValidStateForReconnection()) {
-            log.info("[{}] [{}] Ignoring reconnection request (state: {})", 
topic, getHandlerName(), STATE_UPDATER.get(this));
+            log.info("[{}] [{}] Ignoring reconnection request (state: {})", 
state.topic, state.getHandlerName(), state.getState());
             return;
         }
         long delayMs = backoff.next();
-        log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try 
again in {} s", topic, getHandlerName(),
+        log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try 
again in {} s", state.topic, state.getHandlerName(),
                 exception.getMessage(), delayMs / 1000.0);
-        STATE_UPDATER.set(this, State.Connecting);
-        client.timer().newTimeout(timeout -> {
-            log.info("[{}] [{}] Reconnecting after connection was closed", 
topic, getHandlerName());
+        state.setState(State.Connecting);
+        state.client.timer().newTimeout(timeout -> {
+            log.info("[{}] [{}] Reconnecting after connection was closed", 
state.topic, state.getHandlerName());
             grabCnx();
         }, delayMs, TimeUnit.MILLISECONDS);
     }
@@ -112,15 +101,15 @@ abstract class HandlerBase {
     protected void connectionClosed(ClientCnx cnx) {
         if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
             if (!isValidStateForReconnection()) {
-                log.info("[{}] [{}] Ignoring reconnection request (state: 
{})", topic, getHandlerName(), STATE_UPDATER.get(this));
+                log.info("[{}] [{}] Ignoring reconnection request (state: 
{})", state.topic, state.getHandlerName(), state.getState());
                 return;
             }
             long delayMs = backoff.next();
-            STATE_UPDATER.set(this, State.Connecting);
-            log.info("[{}] [{}] Closed connection {} -- Will try again in {} 
s", topic, getHandlerName(), cnx.channel(),
+            state.setState(State.Connecting);
+            log.info("[{}] [{}] Closed connection {} -- Will try again in {} 
s", state.topic, state.getHandlerName(), cnx.channel(),
                     delayMs / 1000.0);
-            client.timer().newTimeout(timeout -> {
-                log.info("[{}] [{}] Reconnecting after timeout", topic, 
getHandlerName());
+            state.client.timer().newTimeout(timeout -> {
+                log.info("[{}] [{}] Reconnecting after timeout", state.topic, 
state.getHandlerName());
                 grabCnx();
             }, delayMs, TimeUnit.MILLISECONDS);
         }
@@ -138,24 +127,6 @@ abstract class HandlerBase {
         return e instanceof PulsarClientException.LookupException;
     }
 
-    // moves the state to ready if it wasn't closed
-    protected boolean changeToReadyState() {
-        return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, 
State.Ready)
-                || STATE_UPDATER.compareAndSet(this, State.Connecting, 
State.Ready));
-    }
-
-    protected State getState() {
-        return STATE_UPDATER.get(this);
-    }
-
-    protected void setState(State s) {
-        STATE_UPDATER.set(this, s);
-    }
-
-    protected State getAndUpdateState(final UnaryOperator<State> updater) {
-        return STATE_UPDATER.getAndUpdate(this, updater);
-    }
-
     protected ClientCnx getClientCnx() {
         return CLIENT_CNX_UPDATER.get(this);
     }
@@ -165,28 +136,22 @@ abstract class HandlerBase {
     }
 
     private boolean isValidStateForReconnection() {
-        State state = STATE_UPDATER.get(this);
+        State state = this.state.getState();
         switch (state) {
-        case Uninitialized:
-        case Connecting:
-        case Ready:
-            // Ok
-            return true;
-
-        case Closing:
-        case Closed:
-        case Failed:
-        case Terminated:
-            return false;
+            case Uninitialized:
+            case Connecting:
+            case Ready:
+                // Ok
+                return true;
+
+            case Closing:
+            case Closed:
+            case Failed:
+            case Terminated:
+                return false;
         }
         return false;
     }
 
-    abstract void connectionFailed(PulsarClientException exception);
-
-    abstract void connectionOpened(ClientCnx cnx);
-
-    abstract String getHandlerName();
-
-    private static final Logger log = 
LoggerFactory.getLogger(HandlerBase.class);
+    private static final Logger log = 
LoggerFactory.getLogger(ConnectionHandler.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f51b1b3..cc718f3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -43,7 +43,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
-public abstract class ConsumerBase<T> extends HandlerBase implements 
Consumer<T> {
+public abstract class ConsumerBase<T> extends HandlerState implements 
Consumer<T> {
 
     enum ConsumerType {
         PARTITIONED, NON_PARTITIONED
@@ -61,9 +61,10 @@ public abstract class ConsumerBase<T> extends HandlerBase 
implements Consumer<T>
     protected int maxReceiverQueueSize;
     protected Schema<T> schema;
 
-    protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf, int receiverQueueSize,
-                           ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, 
TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS));
+    protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
+                           int receiverQueueSize, ExecutorService 
listenerExecutor,
+                           CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema) {
+        super(client, topic);
         this.maxReceiverQueueSize = receiverQueueSize;
         this.subscription = conf.getSubscriptionName();
         this.conf = conf;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ab812a0..a76529d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -79,7 +79,7 @@ import io.netty.util.Timeout;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
-public class ConsumerImpl<T> extends ConsumerBase<T> {
+public class ConsumerImpl<T> extends ConsumerBase<T> implements 
ConnectionHandler.Connection {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
 
     private final long consumerId;
@@ -123,6 +123,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
 
     private final boolean readCompacted;
 
+    private final ConnectionHandler connectionHandler;
+
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
         // position
@@ -183,9 +185,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        this.connectionHandler = new ConnectionHandler(this,
+            new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, 
TimeUnit.MILLISECONDS),
+            this);
+
         grabCnx();
     }
 
+    public ConnectionHandler getConnectionHandler() {
+        return connectionHandler;
+    }
+
     public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
@@ -533,7 +543,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     @Override
-    void connectionOpened(final ClientCnx cnx) {
+    public void connectionOpened(final ClientCnx cnx) {
         setClientCnx(cnx);
         cnx.registerConsumer(consumerId, this);
 
@@ -612,7 +622,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
                 return null;
             }
             log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, 
subscription, cnx.channel().remoteAddress());
-            if (e.getCause() instanceof PulsarClientException && 
isRetriableError((PulsarClientException) e.getCause())
+            if (e.getCause() instanceof PulsarClientException && 
getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
                     && System.currentTimeMillis() < subscribeTimeout) {
                 reconnectLater(e.getCause());
                 return null;
@@ -677,7 +687,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     @Override
-    void connectionFailed(PulsarClientException exception) {
+    public void connectionFailed(PulsarClientException exception) {
         if (System.currentTimeMillis() > subscribeTimeout && 
subscribeFuture.completeExceptionally(exception)) {
             setState(State.Failed);
             log.info("[{}] Consumer creation failed for consumer {}", topic, 
consumerId);
@@ -1431,6 +1441,35 @@ public class ConsumerImpl<T> extends ConsumerBase<T> {
         return Objects.hash(topic, subscription, consumerName);
     }
 
+    // wrapper for connection methods
+    ClientCnx cnx() {
+        return this.connectionHandler.cnx();
+    }
+
+    void resetBackoff() {
+        this.connectionHandler.resetBackoff();
+    }
+
+    void connectionClosed(ClientCnx cnx) {
+        this.connectionHandler.connectionClosed(cnx);
+    }
+
+    ClientCnx getClientCnx() {
+        return this.connectionHandler.getClientCnx();
+    }
+
+    void setClientCnx(ClientCnx clientCnx) {
+        this.connectionHandler.setClientCnx(clientCnx);
+    }
+
+    void reconnectLater(Throwable exception) {
+        this.connectionHandler.reconnectLater(exception);
+    }
+
+    void grabCnx() {
+        this.connectionHandler.grabCnx();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
new file mode 100644
index 0000000..6189583
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.UnaryOperator;
+
+abstract class HandlerState {
+    protected final PulsarClientImpl client;
+    protected final String topic;
+
+    private static final AtomicReferenceFieldUpdater<HandlerState, State> 
STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(HandlerState.class, 
State.class, "state");
+    @SuppressWarnings("unused")
+    private volatile State state = null;
+
+    enum State {
+        Uninitialized, // Not initialized
+        Connecting, // Client connecting to broker
+        Ready, // Handler is being used
+        Closing, // Close cmd has been sent to broker
+        Closed, // Broker acked the close
+        Terminated, // Topic associated with this handler
+                    // has been terminated
+        Failed // Handler is failed
+    };
+
+    public HandlerState(PulsarClientImpl client, String topic) {
+        this.client = client;
+        this.topic = topic;
+        STATE_UPDATER.set(this, State.Uninitialized);
+    }
+
+    // moves the state to ready if it wasn't closed
+    protected boolean changeToReadyState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, 
State.Ready)
+                || STATE_UPDATER.compareAndSet(this, State.Connecting, 
State.Ready));
+    }
+
+    protected State getState() {
+        return STATE_UPDATER.get(this);
+    }
+
+    protected void setState(State s) {
+        STATE_UPDATER.set(this, s);
+    }
+
+    abstract String getHandlerName();
+
+    protected State getAndUpdateState(final UnaryOperator<State> updater) {
+        return STATE_UPDATER.getAndUpdate(this, updater);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index f46c2b8..2d8ad4e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -130,7 +130,7 @@ public class PartitionedConsumerImpl<T> extends 
ConsumerBase<T> {
 
     private void starReceivingMessages() throws PulsarClientException {
         for (ConsumerImpl<T> consumer : consumers) {
-            consumer.sendFlowPermitsToBroker(consumer.cnx(), 
conf.getReceiverQueueSize());
+            
consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
             receiveMessageFromConsumer(consumer);
         }
     }
@@ -365,18 +365,6 @@ public class PartitionedConsumerImpl<T> extends 
ConsumerBase<T> {
         return consumers.stream().allMatch(ConsumerImpl::isConnected);
     }
 
-    @Override
-    void connectionFailed(PulsarClientException exception) {
-        // noop
-
-    }
-
-    @Override
-    void connectionOpened(ClientCnx cnx) {
-        // noop
-
-    }
-
     void messageReceived(Message<T> message) {
         lock.writeLock().lock();
         try {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 23a86e6..494fbce 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -218,16 +218,6 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
     private static final Logger log = 
LoggerFactory.getLogger(PartitionedProducerImpl.class);
 
     @Override
-    void connectionFailed(PulsarClientException exception) {
-        // noop
-    }
-
-    @Override
-    void connectionOpened(ClientCnx cnx) {
-        // noop
-    }
-
-    @Override
     String getHandlerName() {
         return "partition-producer";
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 14a9af0..45453ae 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -30,7 +30,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 
-public abstract class ProducerBase<T> extends HandlerBase implements 
Producer<T> {
+public abstract class ProducerBase<T> extends HandlerState implements 
Producer<T> {
 
     protected final CompletableFuture<Producer<T>> producerCreatedFuture;
     protected final ProducerConfigurationData conf;
@@ -38,8 +38,7 @@ public abstract class ProducerBase<T> extends HandlerBase 
implements Producer<T>
 
     protected ProducerBase(PulsarClientImpl client, String topic, 
ProducerConfigurationData conf,
             CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> 
schema) {
-        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, 
TimeUnit.SECONDS,
-                Math.max(100, conf.getSendTimeoutMs() - 100), 
TimeUnit.MILLISECONDS));
+        super(client, topic);
         this.producerCreatedFuture = producerCreatedFuture;
         this.conf = conf;
         this.schema = schema;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 8f0586b..75661cb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -69,7 +69,7 @@ import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.ScheduledFuture;
 
-public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
+public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, 
ConnectionHandler.Connection {
 
     // Producer id, used to identify a producer within a single connection
     private final long producerId;
@@ -104,6 +104,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask {
 
     private final Map<String, String> metadata;
 
+    private final ConnectionHandler connectionHandler;
+
     @SuppressWarnings("rawtypes")
     private static final AtomicLongFieldUpdater<ProducerImpl> 
msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
@@ -172,9 +174,16 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask {
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        this.connectionHandler = new ConnectionHandler(this,
+            new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 
Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS),
+            this);
         grabCnx();
     }
 
+    public ConnectionHandler getConnectionHandler() {
+        return connectionHandler;
+    }
+
     private boolean isBatchMessagingEnabled() {
         return conf.isBatchingEnabled();
     }
@@ -379,8 +388,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask {
             ByteBuf compressedPayload) throws IOException {
         ChecksumType checksumType;
 
-        if (getClientCnx() == null
-                || getClientCnx().getRemoteEndpointProtocolVersion() >= 
brokerChecksumSupportedVersion()) {
+        if (connectionHandler.getClientCnx() == null
+                || 
connectionHandler.getClientCnx().getRemoteEndpointProtocolVersion() >= 
brokerChecksumSupportedVersion()) {
             checksumType = ChecksumType.Crc32c;
         } else {
             checksumType = ChecksumType.None;
@@ -568,11 +577,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask {
 
     @Override
     public boolean isConnected() {
-        return getClientCnx() != null && (getState() == State.Ready);
+        return connectionHandler.getClientCnx() != null && (getState() == 
State.Ready);
     }
 
     public boolean isWritable() {
-        ClientCnx cnx = getClientCnx();
+        ClientCnx cnx = connectionHandler.getClientCnx();
         return cnx != null && cnx.channel().isWritable();
     }
 
@@ -808,10 +817,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask {
     }
 
     @Override
-    void connectionOpened(final ClientCnx cnx) {
+    public void connectionOpened(final ClientCnx cnx) {
         // we set the cnx reference before registering the producer on the 
cnx, so if the cnx breaks before creating the
         // producer, it will try to grab a new cnx
-        setClientCnx(cnx);
+        connectionHandler.setClientCnx(cnx);
         cnx.registerProducer(producerId, this);
 
         log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, 
cnx.ctx().channel());
@@ -892,7 +901,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask {
                         producerCreatedFuture.completeExceptionally(cause);
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //
-                    (cause instanceof PulsarClientException && 
isRetriableError((PulsarClientException) cause)
+                    (cause instanceof PulsarClientException && 
connectionHandler.isRetriableError((PulsarClientException) cause)
                             && System.currentTimeMillis() < 
createProducerTimeout)) {
                         // Either we had already created the producer once 
(producerCreatedFuture.isDone()) or we are
                         // still within the initial timeout budget and we are 
dealing with a retriable error
@@ -908,7 +917,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask {
     }
 
     @Override
-    void connectionFailed(PulsarClientException exception) {
+    public void connectionFailed(PulsarClientException exception) {
         if (System.currentTimeMillis() > createProducerTimeout
                 && producerCreatedFuture.completeExceptionally(exception)) {
             log.info("[{}] Producer creation failed for producer {}", topic, 
producerId);
@@ -1248,5 +1257,34 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask {
         return producerName;
     }
 
+    // wrapper for connection methods
+    ClientCnx cnx() {
+        return this.connectionHandler.cnx();
+    }
+
+    void resetBackoff() {
+        this.connectionHandler.resetBackoff();
+    }
+
+    void connectionClosed(ClientCnx cnx) {
+        this.connectionHandler.connectionClosed(cnx);
+    }
+
+    ClientCnx getClientCnx() {
+        return this.connectionHandler.getClientCnx();
+    }
+
+    void setClientCnx(ClientCnx clientCnx) {
+        this.connectionHandler.setClientCnx(clientCnx);
+    }
+
+    void reconnectLater(Throwable exception) {
+        this.connectionHandler.reconnectLater(exception);
+    }
+
+    void grabCnx() {
+        this.connectionHandler.grabCnx();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ProducerImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index e8c018c..e39de96 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -189,7 +189,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
         }
         if (getState() == State.Ready) {
             newConsumers.forEach(consumer -> {
-                consumer.sendFlowPermitsToBroker(consumer.cnx(), 
conf.getReceiverQueueSize());
+                
consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
                 receiveMessageFromConsumer(consumer);
             });
         }
@@ -470,18 +470,6 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> 
{
     }
 
     @Override
-    void connectionFailed(PulsarClientException exception) {
-        // noop
-
-    }
-
-    @Override
-    void connectionOpened(ClientCnx cnx) {
-        // noop
-
-    }
-
-    @Override
     String getHandlerName() {
         return subscription;
     }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to