This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch java_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 9c0e2fb438eba3fbad4197127ae4d25bbe1bd973 Author: Aaron Ai <[email protected]> AuthorDate: Wed Jul 13 17:08:55 2022 +0800 Refactor client telemetry --- .../java/exception/InternalErrorException.java | 4 + .../apache/rocketmq/client/java/impl/Client.java | 6 +- .../rocketmq/client/java/impl/ClientImpl.java | 101 ++++++++++----- .../client/java/impl/ClientManagerImpl.java | 8 +- .../client/java/impl/ClientManagerRegistry.java | 4 +- ...elemetrySession.java => ClientSessionImpl.java} | 138 +++++++++------------ .../java/impl/consumer/ProcessQueueImpl.java | 36 +++--- .../java/impl/producer/ClientSessionProcessor.java | 47 +++++++ .../client/java/metrics/ClientMeterProvider.java | 2 +- .../java/metrics/MessageMeterInterceptor.java | 14 +-- .../java/impl/consumer/PushConsumerImplTest.java | 4 +- .../java/impl/consumer/SimpleConsumerImplTest.java | 4 +- .../java/impl/producer/ProducerImplTest.java | 12 +- 13 files changed, 225 insertions(+), 155 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java index e1a44a4..f42d369 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java @@ -27,4 +27,8 @@ public class InternalErrorException extends ClientException { public InternalErrorException(int responseCode, String message) { super(responseCode, message); } + + public InternalErrorException(Throwable cause) { + super(cause); + } } \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java index b106db3..b0ac5f6 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java @@ -25,7 +25,7 @@ public interface Client { * * @return unique client identifier. */ - String getClientId(); + String clientId(); /** * Send heart beat to remote {@link Endpoints}. @@ -33,9 +33,9 @@ public interface Client { void doHeartbeat(); /** - * Voluntary announce settings to remote. + * Sync settings to remote. */ - void telemeterSettings(); + void syncSettings(); /** * Do some stats for client. diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index 3cf8a36..99b292d 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -42,11 +42,13 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.errorprone.annotations.concurrent.GuardedBy; import io.grpc.Metadata; +import io.grpc.stub.StreamObserver; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -67,10 +69,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.java.exception.InternalErrorException; import org.apache.rocketmq.client.java.exception.NotFoundException; import org.apache.rocketmq.client.java.hook.MessageHookPoints; import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus; import org.apache.rocketmq.client.java.hook.MessageInterceptor; +import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor; import org.apache.rocketmq.client.java.message.MessageCommon; import org.apache.rocketmq.client.java.metrics.ClientMeterProvider; import org.apache.rocketmq.client.java.metrics.Metric; @@ -86,7 +90,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"UnstableApiUsage", "NullableProblems"}) -public abstract class ClientImpl extends AbstractIdleService implements Client, MessageInterceptor { +public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionProcessor, + MessageInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(ClientImpl.class); private static final Duration TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP = Duration.ofSeconds(3); @@ -111,9 +116,9 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, private final Map<String /* topic */, Set<SettableFuture<TopicRouteDataResult>>> inflightRouteFutureTable; private final Lock inflightRouteFutureLock; - @GuardedBy("telemetrySessionsLock") - private final ConcurrentMap<Endpoints, TelemetrySession> telemetrySessionTable; - private final ReadWriteLock telemetrySessionsLock; + @GuardedBy("endpointsSessionsLock") + private final Map<Endpoints, ClientSessionImpl> endpointsSessionTable; + private final ReadWriteLock endpointsSessionsLock; @GuardedBy("messageInterceptorsLock") private final List<MessageInterceptor> messageInterceptors; @@ -131,8 +136,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, this.inflightRouteFutureTable = new ConcurrentHashMap<>(); this.inflightRouteFutureLock = new ReentrantLock(); - this.telemetrySessionTable = new ConcurrentHashMap<>(); - this.telemetrySessionsLock = new ReentrantReadWriteLock(); + this.endpointsSessionTable = new HashMap<>(); + this.endpointsSessionsLock = new ReentrantReadWriteLock(); this.isolated = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -274,13 +279,39 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } } + @Override + public TelemetryCommand getSettingsCommand() { + final Settings settings = this.getClientSettings().toProtobuf(); + return TelemetryCommand.newBuilder().setSettings(settings).build(); + } + + @Override + public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, + StreamObserver<TelemetryCommand> observer) throws ClientException { + try { + final Metadata metadata = this.sign(); + return clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), observer); + } catch (ClientException e) { + throw e; + } catch (Throwable t) { + throw new InternalErrorException(t); + } + } + + @Override + public ListenableFuture<Void> register() { + return Futures.transformAsync(this.getClientSettings().arrivedFuture, + (clientSettings) -> Futures.immediateVoidFuture(), clientCallbackExecutor); + } + /** * This method is invoked while request of printing thread stack trace is received from remote. * * @param endpoints remote endpoints. * @param command request of printing thread stack trace from remote. */ - void onPrintThreadStackCommand(Endpoints endpoints, PrintThreadStackTraceCommand command) { + @Override + public void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStackTraceCommand command) { final String nonce = command.getNonce(); Runnable task = () -> { try { @@ -314,6 +345,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, * @param endpoints remote endpoints. * @param settings settings received from remote. */ + @Override public final void onSettingsCommand(Endpoints endpoints, Settings settings) { final Metric metric = new Metric(settings.getMetric()); clientMeterProvider.reset(metric); @@ -322,10 +354,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } /** - * @see Client#telemeterSettings() + * @see Client#syncSettings() */ @Override - public void telemeterSettings() { + public void syncSettings() { final Settings settings = getClientSettings().toProtobuf(); final TelemetryCommand command = TelemetryCommand.newBuilder().setSettings(settings).build(); final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints(); @@ -345,12 +377,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, * @param command command to telemeter. */ public void telemeter(Endpoints endpoints, TelemetryCommand command) { - final ListenableFuture<TelemetrySession> future = registerTelemetrySession(endpoints); - Futures.addCallback(future, new FutureCallback<TelemetrySession>() { + final ListenableFuture<ClientSessionImpl> future = registerTelemetrySession(endpoints); + Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() { @Override - public void onSuccess(TelemetrySession session) { + public void onSuccess(ClientSessionImpl session) { try { - session.telemeter(command); + session.publish(command); } catch (Throwable t) { LOGGER.error("Failed to telemeter command, endpoints={}, command={}", endpoints, command); } @@ -364,43 +396,44 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } private void releaseTelemetrySessions() { - telemetrySessionsLock.readLock().lock(); + endpointsSessionsLock.readLock().lock(); try { - telemetrySessionTable.values().forEach(TelemetrySession::release); + endpointsSessionTable.values().forEach(ClientSessionImpl::release); } finally { - telemetrySessionsLock.readLock().unlock(); + endpointsSessionsLock.readLock().unlock(); } } /** * Try to register telemetry session, return it directly if session is existed already. */ - public ListenableFuture<TelemetrySession> registerTelemetrySession(Endpoints endpoints) { - final SettableFuture<TelemetrySession> future0 = SettableFuture.create(); - telemetrySessionsLock.readLock().lock(); + public ListenableFuture<ClientSessionImpl> registerTelemetrySession(Endpoints endpoints) { + final SettableFuture<ClientSessionImpl> future0 = SettableFuture.create(); + endpointsSessionsLock.readLock().lock(); try { - TelemetrySession telemetrySession = telemetrySessionTable.get(endpoints); + ClientSessionImpl clientSessionImpl = endpointsSessionTable.get(endpoints); // Return is directly if session is existed already. - if (null != telemetrySession) { - future0.set(telemetrySession); + if (null != clientSessionImpl) { + future0.set(clientSessionImpl); return future0; } } finally { - telemetrySessionsLock.readLock().unlock(); + endpointsSessionsLock.readLock().unlock(); } // Future's exception has been logged during the registration. - final ListenableFuture<TelemetrySession> future = TelemetrySession.register(this, clientManager, endpoints); + final ListenableFuture<ClientSessionImpl> future = new ClientSessionImpl(this, endpoints).register(); return Futures.transform(future, session -> { - telemetrySessionsLock.writeLock().lock(); + endpointsSessionsLock.writeLock().lock(); try { - TelemetrySession existed = telemetrySessionTable.get(endpoints); + ClientSessionImpl existed = endpointsSessionTable.get(endpoints); if (null != existed) { + session.release(); return existed; } - telemetrySessionTable.put(endpoints, session); + endpointsSessionTable.put(endpoints, session); return session; } finally { - telemetrySessionsLock.writeLock().unlock(); + endpointsSessionsLock.writeLock().unlock(); } }, MoreExecutors.directExecutor()); } @@ -412,7 +445,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, */ public ListenableFuture<Void> onTopicRouteDataResultFetched(String topic, TopicRouteDataResult topicRouteDataResult) { - final ListenableFuture<List<TelemetrySession>> future = + final ListenableFuture<List<ClientSessionImpl>> future = Futures.allAsList(topicRouteDataResult.getTopicRouteData() .getMessageQueues().stream() .map(mq -> mq.getBroker().getEndpoints()) @@ -420,9 +453,9 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, .stream().map(this::registerTelemetrySession) .collect(Collectors.toList())); SettableFuture<Void> future0 = SettableFuture.create(); - Futures.addCallback(future, new FutureCallback<List<TelemetrySession>>() { + Futures.addCallback(future, new FutureCallback<List<ClientSessionImpl>>() { @Override - public void onSuccess(List<TelemetrySession> sessions) { + public void onSuccess(List<ClientSessionImpl> sessions) { LOGGER.info("Register session successfully, current route will be cached, topic={}, " + "topicRouteDataResult={}", topic, topicRouteDataResult); final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult); @@ -459,6 +492,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, * @param endpoints remote endpoints. * @param command request of message consume verification from remote. */ + @Override public void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command) { LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}", clientId, command); @@ -482,6 +516,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, * @param endpoints remote endpoints. * @param command request of orphaned transaction recovery from remote. */ + @Override public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) { LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, " + "command={}", clientId, command); @@ -532,10 +567,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } /** - * @see Client#getClientId() + * @see Client#clientId() */ @Override - public String getClientId() { + public String clientId() { return clientId; } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java index 337edcc..b564634 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java @@ -131,12 +131,12 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana @Override public void registerClient(Client client) { - clientTable.put(client.getClientId(), client); + clientTable.put(client.clientId(), client); } @Override public void unregisterClient(Client client) { - clientTable.remove(client.getClientId()); + clientTable.remove(client.clientId()); } @Override @@ -189,9 +189,9 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana private void syncSettings() { clientTable.values().forEach(client -> { try { - client.telemeterSettings(); + client.syncSettings(); } catch (Throwable t) { - LOGGER.error("Failed to announce settings, clientId={}", client.getClientId(), t); + LOGGER.error("Failed to announce settings, clientId={}", client.clientId(), t); } }); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java index 42dc30d..80299da 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java @@ -57,7 +57,7 @@ public class ClientManagerRegistry { clientManager.startAsync().awaitRunning(); singletonClientManager = clientManager; } - clientIds.add(client.getClientId()); + clientIds.add(client.clientId()); singletonClientManager.registerClient(client); return singletonClientManager; } finally { @@ -77,7 +77,7 @@ public class ClientManagerRegistry { ClientManagerImpl clientManager = null; clientIdsLock.lock(); try { - clientIds.remove(client.getClientId()); + clientIds.remove(client.clientId()); singletonClientManager.unregisterClient(client); if (clientIds.isEmpty()) { clientManager = singletonClientManager; diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java similarity index 54% rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java index 36d74a5..d6a19c2 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java @@ -26,14 +26,11 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; -import io.grpc.Metadata; import io.grpc.stub.StreamObserver; -import java.io.UnsupportedEncodingException; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; -import java.time.Duration; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor; import org.apache.rocketmq.client.java.route.Endpoints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,51 +39,39 @@ import org.slf4j.LoggerFactory; * Telemetry session is constructed before first communication between client and remote route endpoints. */ @SuppressWarnings({"UnstableApiUsage", "NullableProblems"}) -public class TelemetrySession implements StreamObserver<TelemetryCommand> { - private static final Logger LOGGER = LoggerFactory.getLogger(TelemetrySession.class); +public class ClientSessionImpl implements StreamObserver<TelemetryCommand> { + private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionImpl.class); - private final ClientImpl client; - private final ClientManager clientManager; + private final ClientSessionProcessor processor; private final Endpoints endpoints; - private volatile StreamObserver<TelemetryCommand> requestObserver; + private final ReadWriteLock observerLock; + private StreamObserver<TelemetryCommand> requestObserver = null; - private TelemetrySession(ClientImpl client, ClientManager clientManager, Endpoints endpoints) { - this.client = client; - this.clientManager = clientManager; + protected ClientSessionImpl(ClientSessionProcessor processor, Endpoints endpoints) { + this.processor = processor; this.endpoints = endpoints; + this.observerLock = new ReentrantReadWriteLock(); } - public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager, - Endpoints endpoints) { - return new TelemetrySession(client, clientManager, endpoints).register(); - } - - private ListenableFuture<TelemetrySession> register() { - ListenableFuture<TelemetrySession> future; + protected ListenableFuture<ClientSessionImpl> register() { + ListenableFuture<ClientSessionImpl> future; try { - this.init(); - final ClientSettings clientSettings = client.getClientSettings(); - final Settings settings = clientSettings.toProtobuf(); - final TelemetryCommand settingsCommand = TelemetryCommand.newBuilder().setSettings(settings).build(); - this.telemeter(settingsCommand); - future = Futures.transform(clientSettings.getArrivedFuture(), input -> this, - MoreExecutors.directExecutor()); + final TelemetryCommand command = processor.getSettingsCommand(); + this.publish(command); + future = Futures.transform(processor.register(), input -> this, MoreExecutors.directExecutor()); } catch (Throwable t) { - SettableFuture<TelemetrySession> future0 = SettableFuture.create(); - future0.setException(t); - future = future0; + future = Futures.immediateFailedFuture(t); } - Futures.addCallback(future, new FutureCallback<TelemetrySession>() { + final String clientId = processor.clientId(); + Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() { @Override - public void onSuccess(TelemetrySession session) { - LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints, - client.getClientId()); + public void onSuccess(ClientSessionImpl session) { + LOGGER.info("Register client session successfully, endpoints={}, clientId={}", endpoints, clientId); } @Override public void onFailure(Throwable t) { - LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints, - client.getClientId(), t); + LOGGER.error("Failed to register client session, endpoints={}, clientId={}", endpoints, clientId, t); release(); } }, MoreExecutors.directExecutor()); @@ -96,31 +81,19 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> { /** * Release telemetry session. */ - public synchronized void release() { + public void release() { + this.observerLock.writeLock().lock(); try { if (null != requestObserver) { - requestObserver.onCompleted(); + try { + requestObserver.onCompleted(); + } catch (Throwable ignore) { + // Ignore exception on purpose. + } + requestObserver = null; } - } catch (Throwable ignore) { - // Ignore exception on purpose. - } - } - - /** - * Initialize telemetry session. - */ - private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException, - InvalidKeyException, ClientException { - this.release(); - final Metadata metadata = client.sign(); - this.requestObserver = clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), this); - } - - private void reinit() { - try { - init(); - } catch (Throwable ignore) { - // Ignore exception on purpose. + } finally { + this.observerLock.writeLock().unlock(); } } @@ -129,13 +102,24 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> { * * @param command appointed command to telemeter */ - public void telemeter(TelemetryCommand command) { + public void publish(TelemetryCommand command) throws ClientException { + this.observerLock.readLock().lock(); try { + if (null != requestObserver) { + requestObserver.onNext(command); + return; + } + } finally { + this.observerLock.readLock().unlock(); + } + this.observerLock.writeLock().lock(); + try { + if (null == requestObserver) { + this.requestObserver = processor.telemetry(endpoints, this); + } requestObserver.onNext(command); - } catch (RuntimeException e) { - // Cancel RPC. - requestObserver.onError(e); - throw e; + } finally { + this.observerLock.writeLock().unlock(); } } @@ -146,52 +130,52 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> { case SETTINGS: { final Settings settings = command.getSettings(); LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints, - client.getClientId()); - client.onSettingsCommand(endpoints, settings); + processor.clientId()); + processor.onSettingsCommand(endpoints, settings); break; } case RECOVER_ORPHANED_TRANSACTION_COMMAND: { final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand = command.getRecoverOrphanedTransactionCommand(); LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, " - + "clientId={}", endpoints, client.getClientId()); - client.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand); + + "clientId={}", endpoints, processor.clientId()); + processor.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand); break; } case VERIFY_MESSAGE_COMMAND: { final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand(); LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}", - endpoints, client.getClientId()); - client.onVerifyMessageCommand(endpoints, verifyMessageCommand); + endpoints, processor.clientId()); + processor.onVerifyMessageCommand(endpoints, verifyMessageCommand); break; } case PRINT_THREAD_STACK_TRACE_COMMAND: { final PrintThreadStackTraceCommand printThreadStackTraceCommand = command.getPrintThreadStackTraceCommand(); LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}", - endpoints, client.getClientId()); - client.onPrintThreadStackCommand(endpoints, printThreadStackTraceCommand); + endpoints, processor.clientId()); + processor.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand); break; } default: LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}", - endpoints, command, client.getClientId()); + endpoints, command, processor.clientId()); } } catch (Throwable t) { LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, " - + "clientId={}", command, client.getClientId(), t); + + "clientId={}", command, processor.clientId(), t); } } @Override public void onError(Throwable throwable) { LOGGER.error("Exception raised from stream response observer, clientId={}, endpoints={}", - client.getClientId(), endpoints, throwable); - reinit(); + processor.clientId(), endpoints, throwable); + this.release(); } @Override public void onCompleted() { - reinit(); + this.release(); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java index c914dce..bdc6a6d 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java @@ -128,7 +128,7 @@ class ProcessQueueImpl implements ProcessQueue { return false; } LOGGER.warn("Process queue is idle, idleDuration={}, maxIdleDuration={}, mq={}, clientId={}", idleDuration, - maxIdleDuration, mq, consumer.getClientId()); + maxIdleDuration, mq, consumer.clientId()); return true; } @@ -157,12 +157,12 @@ class ProcessQueueImpl implements ProcessQueue { final MessageId messageId = messageView.getMessageId(); if (consumer.getPushConsumerSettings().isFifo()) { LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, " + - "messageId={}, clientId={}", mq, messageId, consumer.getClientId()); + "messageId={}, clientId={}", mq, messageId, consumer.clientId()); forwardToDeadLetterQueue(messageView); return; } LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq, - messageId, consumer.getClientId()); + messageId, consumer.clientId()); nackMessage(messageView); }); } @@ -194,7 +194,7 @@ class ProcessQueueImpl implements ProcessQueue { } // Should never reach here. LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq, - consumer.getClientId(), t); + consumer.clientId(), t); receiveMessageLater(); } } @@ -202,12 +202,12 @@ class ProcessQueueImpl implements ProcessQueue { public void receiveMessage() { if (dropped) { LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq, - consumer.getClientId()); + consumer.clientId()); return; } if (this.isCacheFull()) { LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq, - consumer.getClientId()); + consumer.clientId()); receiveMessageLater(); return; } @@ -217,7 +217,7 @@ class ProcessQueueImpl implements ProcessQueue { private void receiveMessageImmediately() { if (!consumer.isRunning()) { LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, - consumer.getClientId()); + consumer.clientId()); return; } try { @@ -251,7 +251,7 @@ class ProcessQueueImpl implements ProcessQueue { // Should never reach here. LOGGER.error("[Bug] Exception raised while handling receive result, would receive later," + " mq={}, endpoints={}, clientId={}", - mq, endpoints, consumer.getClientId(), t); + mq, endpoints, consumer.clientId(), t); receiveMessageLater(); } } @@ -264,14 +264,14 @@ class ProcessQueueImpl implements ProcessQueue { MessageHookPointsStatus.ERROR); LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}," + - " clientId={}", mq, endpoints, consumer.getClientId(), t); + " clientId={}", mq, endpoints, consumer.clientId(), t); receiveMessageLater(); } }, MoreExecutors.directExecutor()); consumer.getReceptionTimes().getAndIncrement(); } catch (Throwable t) { LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq, - consumer.getClientId(), t); + consumer.clientId(), t); receiveMessageLater(); } } @@ -282,7 +282,7 @@ class ProcessQueueImpl implements ProcessQueue { if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) { LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}," + " mq={}, clientId={}", - cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.getClientId()); + cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.clientId()); return true; } final int cacheMessageBytesThresholdPerQueue = consumer.cacheMessageBytesThresholdPerQueue(); @@ -290,7 +290,7 @@ class ProcessQueueImpl implements ProcessQueue { if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) { LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes," + " actual={} bytes, mq={}, clientId={}", - cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.getClientId()); + cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.clientId()); return true; } return false; @@ -384,7 +384,7 @@ class ProcessQueueImpl implements ProcessQueue { } private void ackMessage(MessageViewImpl messageView) { - final String clientId = consumer.getClientId(); + final String clientId = consumer.clientId(); final String consumerGroup = consumer.getConsumerGroup(); final MessageId messageId = messageView.getMessageId(); final Endpoints endpoints = messageView.getEndpoints(); @@ -450,7 +450,7 @@ class ProcessQueueImpl implements ProcessQueue { int attempt = messageView.getDeliveryAttempt(); final MessageId messageId = messageView.getMessageId(); final ConsumeService service = consumer.getConsumeService(); - final String clientId = consumer.getClientId(); + final String clientId = consumer.clientId(); if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) { final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt); attempt = messageView.incrementAndGetDeliveryAttempt(); @@ -482,7 +482,7 @@ class ProcessQueueImpl implements ProcessQueue { final SettableFuture<Void> future0) { final ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future = consumer.forwardMessageToDeadLetterQueue(messageView); - final String clientId = consumer.getClientId(); + final String clientId = consumer.clientId(); final String consumerGroup = consumer.getConsumerGroup(); final MessageId messageId = messageView.getMessageId(); final Endpoints endpoints = messageView.getEndpoints(); @@ -528,7 +528,7 @@ class ProcessQueueImpl implements ProcessQueue { private void forwardToDeadLetterQueueLater(final MessageViewImpl messageView, final int attempt, final SettableFuture<Void> future0) { final MessageId messageId = messageView.getMessageId(); - final String clientId = consumer.getClientId(); + final String clientId = consumer.clientId(); // Process queue is dropped, no need to proceed. if (dropped) { LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}," + @@ -558,7 +558,7 @@ class ProcessQueueImpl implements ProcessQueue { private void ackFifoMessage(final MessageViewImpl messageView, final int attempt, final SettableFuture<Void> future0) { - final String clientId = consumer.getClientId(); + final String clientId = consumer.clientId(); final String consumerGroup = consumer.getConsumerGroup(); final MessageId messageId = messageView.getMessageId(); final Endpoints endpoints = messageView.getEndpoints(); @@ -606,7 +606,7 @@ class ProcessQueueImpl implements ProcessQueue { private void ackFifoMessageLater(final MessageViewImpl messageView, final int attempt, final SettableFuture<Void> future0) { final MessageId messageId = messageView.getMessageId(); - final String clientId = consumer.getClientId(); + final String clientId = consumer.clientId(); // Process queue is dropped, no need to proceed. if (dropped) { LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}", diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java new file mode 100644 index 0000000..8bdc019 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.impl.producer; + +import apache.rocketmq.v2.PrintThreadStackTraceCommand; +import apache.rocketmq.v2.RecoverOrphanedTransactionCommand; +import apache.rocketmq.v2.Settings; +import apache.rocketmq.v2.TelemetryCommand; +import apache.rocketmq.v2.VerifyMessageCommand; +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.stub.StreamObserver; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.java.route.Endpoints; + +public interface ClientSessionProcessor { + ListenableFuture<Void> register(); + + String clientId(); + + TelemetryCommand getSettingsCommand(); + + StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, StreamObserver<TelemetryCommand> observer) + throws ClientException; + + void onSettingsCommand(Endpoints endpoints, Settings settings); + + void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command); + + void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command); + + void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStackTraceCommand command); +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java index 9451028..3547a96 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java @@ -73,7 +73,7 @@ public class ClientMeterProvider { } public synchronized void reset(Metric metric) { - final String clientId = client.getClientId(); + final String clientId = client.clientId(); try { if (clientMeter.satisfy(metric)) { LOGGER.debug("Metric settings is satisfied by the current message meter, clientId={}", clientId); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java index d9bd316..6cfb776 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java @@ -55,7 +55,7 @@ public class MessageMeterInterceptor implements MessageInterceptor { InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS : InvocationStatus.FAILURE; Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic()) - .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().getClientId()) + .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().clientId()) .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()).build(); histogram.record(duration.toMillis(), attributes); } @@ -74,7 +74,7 @@ public class MessageMeterInterceptor implements MessageInterceptor { consumerGroup = ((SimpleConsumer) client).getConsumerGroup(); } if (null == consumerGroup) { - LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.getClientId()); + LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.clientId()); return; } final MessageCommon messageCommon = messageCommons.iterator().next(); @@ -92,7 +92,7 @@ public class MessageMeterInterceptor implements MessageInterceptor { final DoubleHistogram histogram = optionalHistogram.get(); final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic()) .put(MetricLabels.CONSUMER_GROUP, consumerGroup) - .put(MetricLabels.CLIENT_ID, client.getClientId()).build(); + .put(MetricLabels.CLIENT_ID, client.clientId()).build(); histogram.record(latency, attributes); } @@ -103,7 +103,7 @@ public class MessageMeterInterceptor implements MessageInterceptor { consumerGroup = ((PushConsumer) client).getConsumerGroup(); } if (null == consumerGroup) { - LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.getClientId()); + LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.clientId()); return; } final MessageCommon messageCommon = messageCommons.iterator().next(); @@ -114,7 +114,7 @@ public class MessageMeterInterceptor implements MessageInterceptor { final Duration durationAfterDecoding = optionalDurationAfterDecoding.get(); Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic()) .put(MetricLabels.CONSUMER_GROUP, consumerGroup) - .put(MetricLabels.CLIENT_ID, client.getClientId()).build(); + .put(MetricLabels.CLIENT_ID, client.clientId()).build(); final Optional<DoubleHistogram> optionalHistogram = clientMeterProvider.getHistogramByName(MetricName.AWAIT_TIME); if (!optionalHistogram.isPresent()) { @@ -129,7 +129,7 @@ public class MessageMeterInterceptor implements MessageInterceptor { final ClientImpl client = clientMeterProvider.getClient(); if (!(client instanceof PushConsumer)) { // Should never reach here. - LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.getClientId()); + LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.clientId()); return; } PushConsumer pushConsumer = (PushConsumer) client; @@ -138,7 +138,7 @@ public class MessageMeterInterceptor implements MessageInterceptor { InvocationStatus.FAILURE; Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic()) .put(MetricLabels.CONSUMER_GROUP, pushConsumer.getConsumerGroup()) - .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().getClientId()) + .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().clientId()) .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()) .build(); final Optional<DoubleHistogram> optionalHistogram = diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java index a7b5357..8b88702 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java @@ -46,7 +46,7 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.MessageListener; import org.apache.rocketmq.client.java.impl.ClientManagerImpl; import org.apache.rocketmq.client.java.impl.ClientManagerRegistry; -import org.apache.rocketmq.client.java.impl.TelemetrySession; +import org.apache.rocketmq.client.java.impl.ClientSessionImpl; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.tool.TestBase; @@ -87,7 +87,7 @@ public class PushConsumerImplTest extends TestBase { any(Duration.class))) .thenReturn(okQueryRouteResponseFuture()); when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), - any(TelemetrySession.class))) + any(ClientSessionImpl.class))) .thenReturn(telemetryRequestObserver); final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl( "TestScheduler")); diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java index 88445d1..e6ede4e 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java @@ -59,7 +59,7 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.ClientManagerImpl; import org.apache.rocketmq.client.java.impl.ClientManagerRegistry; -import org.apache.rocketmq.client.java.impl.TelemetrySession; +import org.apache.rocketmq.client.java.impl.ClientSessionImpl; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.route.Endpoints; @@ -105,7 +105,7 @@ public class SimpleConsumerImplTest extends TestBase { any(Duration.class))) .thenReturn(future0); when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), - any(TelemetrySession.class))) + any(ClientSessionImpl.class))) .thenReturn(telemetryRequestObserver); final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler")); diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java index 619538e..f069b52 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java @@ -58,7 +58,7 @@ import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.java.impl.ClientManagerImpl; import org.apache.rocketmq.client.java.impl.ClientManagerRegistry; -import org.apache.rocketmq.client.java.impl.TelemetrySession; +import org.apache.rocketmq.client.java.impl.ClientSessionImpl; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.rpc.InvocationContext; @@ -112,7 +112,7 @@ public class ProducerImplTest extends TestBase { any(Duration.class))) .thenReturn(future0); when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), - any(TelemetrySession.class))) + any(ClientSessionImpl.class))) .thenReturn(telemetryRequestObserver); final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl( "TestScheduler")); @@ -145,7 +145,7 @@ public class ProducerImplTest extends TestBase { verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)); verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), - any(Duration.class), any(TelemetrySession.class)); + any(Duration.class), any(ClientSessionImpl.class)); final Message message = fakeMessage(FAKE_TOPIC_0); final ListenableFuture<InvocationContext<SendMessageResponse>> future = okSendMessageResponseFutureWithSingleEntry(); @@ -165,7 +165,7 @@ public class ProducerImplTest extends TestBase { verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)); verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), - any(TelemetrySession.class)); + any(ClientSessionImpl.class)); final Message message = fakeMessage(FAKE_TOPIC_0); final ListenableFuture<InvocationContext<SendMessageResponse>> future = okSendMessageResponseFutureWithSingleEntry(); @@ -177,7 +177,7 @@ public class ProducerImplTest extends TestBase { verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)); verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), - any(Duration.class), any(TelemetrySession.class)); + any(Duration.class), any(ClientSessionImpl.class)); final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next(); assertEquals(receipt.getMessageId(), sendReceipt.getMessageId().toString()); shutdown(producerWithoutTopicBinding); @@ -189,7 +189,7 @@ public class ProducerImplTest extends TestBase { verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)); verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), - any(TelemetrySession.class)); + any(ClientSessionImpl.class)); final ListenableFuture<InvocationContext<SendMessageResponse>> future = failureSendMessageResponseFuture(); when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
