merlimat closed pull request #1354: Issue 1118: refine HandlerBase to let only 
ConsumerImpl and ProducerImpl have client-cnx
URL: https://github.com/apache/incubator-pulsar/pull/1354
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index d7d0d73b3..c5447b199 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -59,7 +59,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.HandlerBase.State;
+import org.apache.pulsar.client.impl.HandlerState.State;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 323000622..4a9912dce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -454,13 +454,8 @@ public void testCorruptMessageRemove() throws Exception {
         }
 
         // 5. Verify
-
-        // (5.1) Verify: producer's recoverChecksumError and updateChecksum 
invoked
-        verify(producer, times(1)).recoverChecksumError(any(), anyLong());
-        verify(producer, times(1)).verifyLocalBufferIsNotCorrupted(any());
-
         /**
-         * (5.3) verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => 
validates if message is corrupt
+         * verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates 
if message is corrupt
          */
         MessageImpl<byte[]> msg2 = (MessageImpl<byte[]>) 
MessageBuilder.create().setContent("message-1".getBytes())
                 .build();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
similarity index 54%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java
rename to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 867424e39..d04e8a847 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -20,72 +20,61 @@
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.function.UnaryOperator;
-
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.HandlerState.State;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract class HandlerBase {
-    protected final PulsarClientImpl client;
-    protected final String topic;
-    private static final AtomicReferenceFieldUpdater<HandlerBase, State> 
STATE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, 
State.class, "state");
-    @SuppressWarnings("unused")
-    private volatile State state = null;
-
-    private static final AtomicReferenceFieldUpdater<HandlerBase, ClientCnx> 
CLIENT_CNX_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, 
ClientCnx.class, "clientCnx");
+class ConnectionHandler {
+    private static final AtomicReferenceFieldUpdater<ConnectionHandler, 
ClientCnx> CLIENT_CNX_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, 
ClientCnx.class, "clientCnx");
     @SuppressWarnings("unused")
     private volatile ClientCnx clientCnx = null;
+
+    protected final HandlerState state;
     protected final Backoff backoff;
 
-    enum State {
-        Uninitialized, // Not initialized
-        Connecting, // Client connecting to broker
-        Ready, // Handler is being used
-        Closing, // Close cmd has been sent to broker
-        Closed, // Broker acked the close
-        Terminated, // Topic associated with this handler
-                    // has been terminated
-        Failed // Handler is failed
-    };
-
-    public HandlerBase(PulsarClientImpl client, String topic, Backoff backoff) 
{
-        this.client = client;
-        this.topic = topic;
+    interface Connection {
+        void connectionFailed(PulsarClientException exception);
+        void connectionOpened(ClientCnx cnx);
+    }
+
+    protected Connection connection;
+
+    protected ConnectionHandler(HandlerState state, Backoff backoff, 
Connection connection) {
+        this.state = state;
+        this.connection = connection;
         this.backoff = backoff;
-        STATE_UPDATER.set(this, State.Uninitialized);
         CLIENT_CNX_UPDATER.set(this, null);
     }
 
     protected void grabCnx() {
         if (CLIENT_CNX_UPDATER.get(this) != null) {
-            log.warn("[{}] [{}] Client cnx already set, ignoring reconnection 
request", topic, getHandlerName());
+            log.warn("[{}] [{}] Client cnx already set, ignoring reconnection 
request", state.topic, state.getHandlerName());
             return;
         }
 
         if (!isValidStateForReconnection()) {
             // Ignore connection closed when we are shutting down
-            log.info("[{}] [{}] Ignoring reconnection request (state: {})", 
topic, getHandlerName(), STATE_UPDATER.get(this));
+            log.info("[{}] [{}] Ignoring reconnection request (state: {})", 
state.topic, state.getHandlerName(), state.getState());
             return;
         }
 
         try {
-            client.getConnection(topic) //
-                    .thenAccept(this::connectionOpened) //
+            state.client.getConnection(state.topic) //
+                    .thenAccept(cnx -> connection.connectionOpened(cnx)) //
                     .exceptionally(this::handleConnectionError);
         } catch (Throwable t) {
-            log.warn("[{}] [{}] Exception thrown while getting connection: ", 
topic, getHandlerName(), t);
+            log.warn("[{}] [{}] Exception thrown while getting connection: ", 
state.topic, state.getHandlerName(), t);
             reconnectLater(t);
         }
     }
 
     private Void handleConnectionError(Throwable exception) {
-        log.warn("[{}] [{}] Error connecting to broker: {}", topic, 
getHandlerName(), exception.getMessage());
-        connectionFailed(new PulsarClientException(exception));
+        log.warn("[{}] [{}] Error connecting to broker: {}", state.topic, 
state.getHandlerName(), exception.getMessage());
+        connection.connectionFailed(new PulsarClientException(exception));
 
-        State state = STATE_UPDATER.get(this);
+        State state = this.state.getState();
         if (state == State.Uninitialized || state == State.Connecting || state 
== State.Ready) {
             reconnectLater(exception);
         }
@@ -96,15 +85,15 @@ private Void handleConnectionError(Throwable exception) {
     protected void reconnectLater(Throwable exception) {
         CLIENT_CNX_UPDATER.set(this, null);
         if (!isValidStateForReconnection()) {
-            log.info("[{}] [{}] Ignoring reconnection request (state: {})", 
topic, getHandlerName(), STATE_UPDATER.get(this));
+            log.info("[{}] [{}] Ignoring reconnection request (state: {})", 
state.topic, state.getHandlerName(), state.getState());
             return;
         }
         long delayMs = backoff.next();
-        log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try 
again in {} s", topic, getHandlerName(),
+        log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try 
again in {} s", state.topic, state.getHandlerName(),
                 exception.getMessage(), delayMs / 1000.0);
-        STATE_UPDATER.set(this, State.Connecting);
-        client.timer().newTimeout(timeout -> {
-            log.info("[{}] [{}] Reconnecting after connection was closed", 
topic, getHandlerName());
+        state.setState(State.Connecting);
+        state.client.timer().newTimeout(timeout -> {
+            log.info("[{}] [{}] Reconnecting after connection was closed", 
state.topic, state.getHandlerName());
             grabCnx();
         }, delayMs, TimeUnit.MILLISECONDS);
     }
@@ -112,15 +101,15 @@ protected void reconnectLater(Throwable exception) {
     protected void connectionClosed(ClientCnx cnx) {
         if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
             if (!isValidStateForReconnection()) {
-                log.info("[{}] [{}] Ignoring reconnection request (state: 
{})", topic, getHandlerName(), STATE_UPDATER.get(this));
+                log.info("[{}] [{}] Ignoring reconnection request (state: 
{})", state.topic, state.getHandlerName(), state.getState());
                 return;
             }
             long delayMs = backoff.next();
-            STATE_UPDATER.set(this, State.Connecting);
-            log.info("[{}] [{}] Closed connection {} -- Will try again in {} 
s", topic, getHandlerName(), cnx.channel(),
+            state.setState(State.Connecting);
+            log.info("[{}] [{}] Closed connection {} -- Will try again in {} 
s", state.topic, state.getHandlerName(), cnx.channel(),
                     delayMs / 1000.0);
-            client.timer().newTimeout(timeout -> {
-                log.info("[{}] [{}] Reconnecting after timeout", topic, 
getHandlerName());
+            state.client.timer().newTimeout(timeout -> {
+                log.info("[{}] [{}] Reconnecting after timeout", state.topic, 
state.getHandlerName());
                 grabCnx();
             }, delayMs, TimeUnit.MILLISECONDS);
         }
@@ -138,24 +127,6 @@ protected boolean isRetriableError(PulsarClientException 
e) {
         return e instanceof PulsarClientException.LookupException;
     }
 
-    // moves the state to ready if it wasn't closed
-    protected boolean changeToReadyState() {
-        return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, 
State.Ready)
-                || STATE_UPDATER.compareAndSet(this, State.Connecting, 
State.Ready));
-    }
-
-    protected State getState() {
-        return STATE_UPDATER.get(this);
-    }
-
-    protected void setState(State s) {
-        STATE_UPDATER.set(this, s);
-    }
-
-    protected State getAndUpdateState(final UnaryOperator<State> updater) {
-        return STATE_UPDATER.getAndUpdate(this, updater);
-    }
-
     protected ClientCnx getClientCnx() {
         return CLIENT_CNX_UPDATER.get(this);
     }
@@ -165,28 +136,22 @@ protected void setClientCnx(ClientCnx clientCnx) {
     }
 
     private boolean isValidStateForReconnection() {
-        State state = STATE_UPDATER.get(this);
+        State state = this.state.getState();
         switch (state) {
-        case Uninitialized:
-        case Connecting:
-        case Ready:
-            // Ok
-            return true;
-
-        case Closing:
-        case Closed:
-        case Failed:
-        case Terminated:
-            return false;
+            case Uninitialized:
+            case Connecting:
+            case Ready:
+                // Ok
+                return true;
+
+            case Closing:
+            case Closed:
+            case Failed:
+            case Terminated:
+                return false;
         }
         return false;
     }
 
-    abstract void connectionFailed(PulsarClientException exception);
-
-    abstract void connectionOpened(ClientCnx cnx);
-
-    abstract String getHandlerName();
-
-    private static final Logger log = 
LoggerFactory.getLogger(HandlerBase.class);
+    private static final Logger log = 
LoggerFactory.getLogger(ConnectionHandler.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f51b1b3db..cc718f3d5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -43,7 +43,7 @@
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
-public abstract class ConsumerBase<T> extends HandlerBase implements 
Consumer<T> {
+public abstract class ConsumerBase<T> extends HandlerState implements 
Consumer<T> {
 
     enum ConsumerType {
         PARTITIONED, NON_PARTITIONED
@@ -61,9 +61,10 @@
     protected int maxReceiverQueueSize;
     protected Schema<T> schema;
 
-    protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf, int receiverQueueSize,
-                           ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, 
TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS));
+    protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
+                           int receiverQueueSize, ExecutorService 
listenerExecutor,
+                           CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema) {
+        super(client, topic);
         this.maxReceiverQueueSize = receiverQueueSize;
         this.subscription = conf.getSubscriptionName();
         this.conf = conf;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ab812a0b3..a76529dcf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -79,7 +79,7 @@
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
-public class ConsumerImpl<T> extends ConsumerBase<T> {
+public class ConsumerImpl<T> extends ConsumerBase<T> implements 
ConnectionHandler.Connection {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
 
     private final long consumerId;
@@ -123,6 +123,8 @@
 
     private final boolean readCompacted;
 
+    private final ConnectionHandler connectionHandler;
+
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
         // position
@@ -183,9 +185,17 @@
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        this.connectionHandler = new ConnectionHandler(this,
+            new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, 
TimeUnit.MILLISECONDS),
+            this);
+
         grabCnx();
     }
 
+    public ConnectionHandler getConnectionHandler() {
+        return connectionHandler;
+    }
+
     public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
@@ -533,7 +543,7 @@ public void operationComplete(Future<Void> future) throws 
Exception {
     }
 
     @Override
-    void connectionOpened(final ClientCnx cnx) {
+    public void connectionOpened(final ClientCnx cnx) {
         setClientCnx(cnx);
         cnx.registerConsumer(consumerId, this);
 
@@ -612,7 +622,7 @@ void connectionOpened(final ClientCnx cnx) {
                 return null;
             }
             log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, 
subscription, cnx.channel().remoteAddress());
-            if (e.getCause() instanceof PulsarClientException && 
isRetriableError((PulsarClientException) e.getCause())
+            if (e.getCause() instanceof PulsarClientException && 
getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
                     && System.currentTimeMillis() < subscribeTimeout) {
                 reconnectLater(e.getCause());
                 return null;
@@ -677,7 +687,7 @@ void sendFlowPermitsToBroker(ClientCnx cnx, int 
numMessages) {
     }
 
     @Override
-    void connectionFailed(PulsarClientException exception) {
+    public void connectionFailed(PulsarClientException exception) {
         if (System.currentTimeMillis() > subscribeTimeout && 
subscribeFuture.completeExceptionally(exception)) {
             setState(State.Failed);
             log.info("[{}] Consumer creation failed for consumer {}", topic, 
consumerId);
@@ -1431,6 +1441,35 @@ public int hashCode() {
         return Objects.hash(topic, subscription, consumerName);
     }
 
+    // wrapper for connection methods
+    ClientCnx cnx() {
+        return this.connectionHandler.cnx();
+    }
+
+    void resetBackoff() {
+        this.connectionHandler.resetBackoff();
+    }
+
+    void connectionClosed(ClientCnx cnx) {
+        this.connectionHandler.connectionClosed(cnx);
+    }
+
+    ClientCnx getClientCnx() {
+        return this.connectionHandler.getClientCnx();
+    }
+
+    void setClientCnx(ClientCnx clientCnx) {
+        this.connectionHandler.setClientCnx(clientCnx);
+    }
+
+    void reconnectLater(Throwable exception) {
+        this.connectionHandler.reconnectLater(exception);
+    }
+
+    void grabCnx() {
+        this.connectionHandler.grabCnx();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
new file mode 100644
index 000000000..618958305
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.UnaryOperator;
+
+abstract class HandlerState {
+    protected final PulsarClientImpl client;
+    protected final String topic;
+
+    private static final AtomicReferenceFieldUpdater<HandlerState, State> 
STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(HandlerState.class, 
State.class, "state");
+    @SuppressWarnings("unused")
+    private volatile State state = null;
+
+    enum State {
+        Uninitialized, // Not initialized
+        Connecting, // Client connecting to broker
+        Ready, // Handler is being used
+        Closing, // Close cmd has been sent to broker
+        Closed, // Broker acked the close
+        Terminated, // Topic associated with this handler
+                    // has been terminated
+        Failed // Handler is failed
+    };
+
+    public HandlerState(PulsarClientImpl client, String topic) {
+        this.client = client;
+        this.topic = topic;
+        STATE_UPDATER.set(this, State.Uninitialized);
+    }
+
+    // moves the state to ready if it wasn't closed
+    protected boolean changeToReadyState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, 
State.Ready)
+                || STATE_UPDATER.compareAndSet(this, State.Connecting, 
State.Ready));
+    }
+
+    protected State getState() {
+        return STATE_UPDATER.get(this);
+    }
+
+    protected void setState(State s) {
+        STATE_UPDATER.set(this, s);
+    }
+
+    abstract String getHandlerName();
+
+    protected State getAndUpdateState(final UnaryOperator<State> updater) {
+        return STATE_UPDATER.getAndUpdate(this, updater);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index f46c2b8b4..2d8ad4ed8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -130,7 +130,7 @@ private void start() {
 
     private void starReceivingMessages() throws PulsarClientException {
         for (ConsumerImpl<T> consumer : consumers) {
-            consumer.sendFlowPermitsToBroker(consumer.cnx(), 
conf.getReceiverQueueSize());
+            
consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
             receiveMessageFromConsumer(consumer);
         }
     }
@@ -365,18 +365,6 @@ public boolean isConnected() {
         return consumers.stream().allMatch(ConsumerImpl::isConnected);
     }
 
-    @Override
-    void connectionFailed(PulsarClientException exception) {
-        // noop
-
-    }
-
-    @Override
-    void connectionOpened(ClientCnx cnx) {
-        // noop
-
-    }
-
     void messageReceived(Message<T> message) {
         lock.writeLock().lock();
         try {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 23a86e612..494fbce27 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -217,16 +217,6 @@ public synchronized ProducerStatsRecorderImpl getStats() {
 
     private static final Logger log = 
LoggerFactory.getLogger(PartitionedProducerImpl.class);
 
-    @Override
-    void connectionFailed(PulsarClientException exception) {
-        // noop
-    }
-
-    @Override
-    void connectionOpened(ClientCnx cnx) {
-        // noop
-    }
-
     @Override
     String getHandlerName() {
         return "partition-producer";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 14a9af0c8..45453aedb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -30,7 +30,7 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 
-public abstract class ProducerBase<T> extends HandlerBase implements 
Producer<T> {
+public abstract class ProducerBase<T> extends HandlerState implements 
Producer<T> {
 
     protected final CompletableFuture<Producer<T>> producerCreatedFuture;
     protected final ProducerConfigurationData conf;
@@ -38,8 +38,7 @@
 
     protected ProducerBase(PulsarClientImpl client, String topic, 
ProducerConfigurationData conf,
             CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> 
schema) {
-        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, 
TimeUnit.SECONDS,
-                Math.max(100, conf.getSendTimeoutMs() - 100), 
TimeUnit.MILLISECONDS));
+        super(client, topic);
         this.producerCreatedFuture = producerCreatedFuture;
         this.conf = conf;
         this.schema = schema;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 8f0586bbd..75661cbc9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -69,7 +69,7 @@
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.ScheduledFuture;
 
-public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask {
+public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, 
ConnectionHandler.Connection {
 
     // Producer id, used to identify a producer within a single connection
     private final long producerId;
@@ -104,6 +104,8 @@
 
     private final Map<String, String> metadata;
 
+    private final ConnectionHandler connectionHandler;
+
     @SuppressWarnings("rawtypes")
     private static final AtomicLongFieldUpdater<ProducerImpl> 
msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
@@ -172,9 +174,16 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        this.connectionHandler = new ConnectionHandler(this,
+            new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 
Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS),
+            this);
         grabCnx();
     }
 
+    public ConnectionHandler getConnectionHandler() {
+        return connectionHandler;
+    }
+
     private boolean isBatchMessagingEnabled() {
         return conf.isBatchingEnabled();
     }
@@ -379,8 +388,8 @@ private ByteBufPair sendMessage(long producerId, long 
sequenceId, int numMessage
             ByteBuf compressedPayload) throws IOException {
         ChecksumType checksumType;
 
-        if (getClientCnx() == null
-                || getClientCnx().getRemoteEndpointProtocolVersion() >= 
brokerChecksumSupportedVersion()) {
+        if (connectionHandler.getClientCnx() == null
+                || 
connectionHandler.getClientCnx().getRemoteEndpointProtocolVersion() >= 
brokerChecksumSupportedVersion()) {
             checksumType = ChecksumType.Crc32c;
         } else {
             checksumType = ChecksumType.None;
@@ -568,11 +577,11 @@ protected WriteInEventLoopCallback 
newObject(Handle<WriteInEventLoopCallback> ha
 
     @Override
     public boolean isConnected() {
-        return getClientCnx() != null && (getState() == State.Ready);
+        return connectionHandler.getClientCnx() != null && (getState() == 
State.Ready);
     }
 
     public boolean isWritable() {
-        ClientCnx cnx = getClientCnx();
+        ClientCnx cnx = connectionHandler.getClientCnx();
         return cnx != null && cnx.channel().isWritable();
     }
 
@@ -808,10 +817,10 @@ protected OpSendMsg newObject(Handle<OpSendMsg> handle) {
     }
 
     @Override
-    void connectionOpened(final ClientCnx cnx) {
+    public void connectionOpened(final ClientCnx cnx) {
         // we set the cnx reference before registering the producer on the 
cnx, so if the cnx breaks before creating the
         // producer, it will try to grab a new cnx
-        setClientCnx(cnx);
+        connectionHandler.setClientCnx(cnx);
         cnx.registerProducer(producerId, this);
 
         log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, 
cnx.ctx().channel());
@@ -892,7 +901,7 @@ void connectionOpened(final ClientCnx cnx) {
                         producerCreatedFuture.completeExceptionally(cause);
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //
-                    (cause instanceof PulsarClientException && 
isRetriableError((PulsarClientException) cause)
+                    (cause instanceof PulsarClientException && 
connectionHandler.isRetriableError((PulsarClientException) cause)
                             && System.currentTimeMillis() < 
createProducerTimeout)) {
                         // Either we had already created the producer once 
(producerCreatedFuture.isDone()) or we are
                         // still within the initial timeout budget and we are 
dealing with a retriable error
@@ -908,7 +917,7 @@ void connectionOpened(final ClientCnx cnx) {
     }
 
     @Override
-    void connectionFailed(PulsarClientException exception) {
+    public void connectionFailed(PulsarClientException exception) {
         if (System.currentTimeMillis() > createProducerTimeout
                 && producerCreatedFuture.completeExceptionally(exception)) {
             log.info("[{}] Producer creation failed for producer {}", topic, 
producerId);
@@ -1248,5 +1257,34 @@ public String getProducerName() {
         return producerName;
     }
 
+    // wrapper for connection methods
+    ClientCnx cnx() {
+        return this.connectionHandler.cnx();
+    }
+
+    void resetBackoff() {
+        this.connectionHandler.resetBackoff();
+    }
+
+    void connectionClosed(ClientCnx cnx) {
+        this.connectionHandler.connectionClosed(cnx);
+    }
+
+    ClientCnx getClientCnx() {
+        return this.connectionHandler.getClientCnx();
+    }
+
+    void setClientCnx(ClientCnx clientCnx) {
+        this.connectionHandler.setClientCnx(clientCnx);
+    }
+
+    void reconnectLater(Throwable exception) {
+        this.connectionHandler.reconnectLater(exception);
+    }
+
+    void grabCnx() {
+        this.connectionHandler.grabCnx();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ProducerImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index e8c018cdf..e39de96c1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -189,7 +189,7 @@ private void startReceivingMessages(List<ConsumerImpl<T>> 
newConsumers) throws P
         }
         if (getState() == State.Ready) {
             newConsumers.forEach(consumer -> {
-                consumer.sendFlowPermitsToBroker(consumer.cnx(), 
conf.getReceiverQueueSize());
+                
consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
                 receiveMessageFromConsumer(consumer);
             });
         }
@@ -469,18 +469,6 @@ public boolean isConnected() {
         return consumers.values().stream().allMatch(consumer -> 
consumer.isConnected());
     }
 
-    @Override
-    void connectionFailed(PulsarClientException exception) {
-        // noop
-
-    }
-
-    @Override
-    void connectionOpened(ClientCnx cnx) {
-        // noop
-
-    }
-
     @Override
     String getHandlerName() {
         return subscription;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to