This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch java_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 0af0a27819ba3fb8ba38fb592ff773022e3caf14 Author: Aaron Ai <[email protected]> AuthorDate: Thu Jul 28 14:01:22 2022 +0800 Polish code --- .../rocketmq/client/java/impl/ClientImpl.java | 346 ++++++++++----------- .../client/java/impl/ClientSessionImpl.java | 137 ++++---- .../java/impl/consumer/ProcessQueueImpl.java | 12 +- .../java/impl/consumer/PushConsumerImpl.java | 15 +- .../java/impl/consumer/SimpleConsumerImpl.java | 13 +- .../impl/consumer/SubscriptionLoadBalancer.java | 37 +-- ...ionProcessor.java => ClientSessionHandler.java} | 14 +- .../client/java/impl/producer/ProducerImpl.java | 29 +- .../java/impl/producer/PublishingLoadBalancer.java | 42 +-- .../client/java/route/TopicRouteDataResult.java | 125 -------- 10 files changed, 289 insertions(+), 481 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index cc93fe9..bc2f68f 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import apache.rocketmq.v2.Code; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.HeartbeatResponse; +import apache.rocketmq.v2.MessageQueue; import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.PrintThreadStackTraceCommand; import apache.rocketmq.v2.QueryRouteRequest; @@ -69,12 +70,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.java.exception.BadRequestException; import org.apache.rocketmq.client.java.exception.InternalErrorException; import org.apache.rocketmq.client.java.exception.NotFoundException; +import org.apache.rocketmq.client.java.exception.ProxyTimeoutException; +import org.apache.rocketmq.client.java.exception.TooManyRequestsException; +import org.apache.rocketmq.client.java.exception.UnsupportedException; import org.apache.rocketmq.client.java.hook.MessageHookPoints; import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus; import org.apache.rocketmq.client.java.hook.MessageInterceptor; -import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor; +import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler; import org.apache.rocketmq.client.java.message.MessageCommon; import org.apache.rocketmq.client.java.metrics.ClientMeterProvider; import org.apache.rocketmq.client.java.metrics.Metric; @@ -82,18 +87,16 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.misc.Utilities; import org.apache.rocketmq.client.java.route.Endpoints; -import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.route.TopicRouteData; import org.apache.rocketmq.client.java.rpc.RpcInvocation; import org.apache.rocketmq.client.java.rpc.Signature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"UnstableApiUsage", "NullableProblems"}) -public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionProcessor, +public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionHandler, MessageInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(ClientImpl.class); - private static final Duration TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP = Duration.ofSeconds(3); - private static final Duration TELEMETRY_TIMEOUT = Duration.ofDays(102 * 365); protected final ClientManager clientManager; @@ -111,15 +114,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, protected final String clientId; private volatile ScheduledFuture<?> updateRouteCacheFuture; - private final ConcurrentMap<String, TopicRouteDataResult> topicRouteResultCache; + private final ConcurrentMap<String, TopicRouteData> topicRouteCache; @GuardedBy("inflightRouteFutureLock") - private final Map<String /* topic */, Set<SettableFuture<TopicRouteDataResult>>> inflightRouteFutureTable; + private final Map<String /* topic */, Set<SettableFuture<TopicRouteData>>> inflightRouteFutureTable; private final Lock inflightRouteFutureLock; - @GuardedBy("endpointsSessionsLock") - private final Map<Endpoints, ClientSessionImpl> endpointsSessionTable; - private final ReadWriteLock endpointsSessionsLock; + @GuardedBy("sessionsLock") + private final Map<Endpoints, ClientSessionImpl> sessionsTable; + private final ReadWriteLock sessionsLock; @GuardedBy("messageInterceptorsLock") private final List<MessageInterceptor> messageInterceptors; @@ -132,13 +135,13 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, // Generate client id firstly. this.clientId = Utilities.genClientId(); - this.topicRouteResultCache = new ConcurrentHashMap<>(); + this.topicRouteCache = new ConcurrentHashMap<>(); this.inflightRouteFutureTable = new ConcurrentHashMap<>(); this.inflightRouteFutureLock = new ReentrantLock(); - this.endpointsSessionTable = new HashMap<>(); - this.endpointsSessionsLock = new ReentrantReadWriteLock(); + this.sessionsTable = new HashMap<>(); + this.sessionsLock = new ReentrantReadWriteLock(); this.isolated = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -180,21 +183,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, // Fetch topic route from remote. LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}", clientId, topics); - // Aggregate all topic route data futures into a composited future. - final List<ListenableFuture<TopicRouteDataResult>> futures = topics.stream() - .map(this::getRouteDataResult) - .collect(Collectors.toList()); - List<TopicRouteDataResult> results; - try { - results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(), - TimeUnit.NANOSECONDS); - } catch (Throwable t) { - LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, " - + "topics={}", clientId, topics, t); - throw new NotFoundException(t); - } - for (TopicRouteDataResult result : results) { - result.checkAndGetTopicRouteData(); + final List<ListenableFuture<TopicRouteData>> futures = + topics.stream().map(this::fetchTopicRoute).collect(Collectors.toList()); + for (ListenableFuture<TopicRouteData> future : futures) { + future.get(); } LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}", clientId, topics); @@ -282,7 +274,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } @Override - public TelemetryCommand getSettingsCommand() { + public TelemetryCommand settingsCommand() { final Settings settings = this.getClientSettings().toProtobuf(); return TelemetryCommand.newBuilder().setSettings(settings).build(); } @@ -301,7 +293,13 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } @Override - public ListenableFuture<Void> register() { + public boolean isEndpointsDeprecated(Endpoints endpoints) { + final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints(); + return totalRouteEndpoints.contains(endpoints); + } + + @Override + public ListenableFuture<Void> awaitSettingSynchronized() { return Futures.transformAsync(this.getClientSettings().arrivedFuture, (clientSettings) -> Futures.immediateVoidFuture(), clientCallbackExecutor); } @@ -325,7 +323,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, .setThreadStackTrace(threadStackTrace) .setStatus(status) .build(); - telemeter(endpoints, telemetryCommand); + telemetry(endpoints, telemetryCommand); } catch (Throwable t) { LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}", endpoints, nonce, clientId, t); @@ -351,7 +349,6 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, public final void onSettingsCommand(Endpoints endpoints, Settings settings) { final Metric metric = new Metric(settings.getMetric()); clientMeterProvider.reset(metric); - LOGGER.info("Receive settings from remote, endpoints={}", endpoints); this.getClientSettings().applySettingsCommand(settings); } @@ -365,127 +362,81 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints(); for (Endpoints endpoints : totalRouteEndpoints) { try { - telemeter(endpoints, command); + telemetry(endpoints, command); } catch (Throwable t) { LOGGER.error("Failed to telemeter settings, clientId={}, endpoints={}", clientId, endpoints, t); } } } - /** - * Telemeter command to remote endpoints. - * - * @param endpoints remote endpoints to telemeter. - * @param command command to telemeter. - */ - public void telemeter(Endpoints endpoints, TelemetryCommand command) { - final ListenableFuture<ClientSessionImpl> future = registerTelemetrySession(endpoints); - Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() { - @Override - public void onSuccess(ClientSessionImpl session) { - try { - session.publish(command); - } catch (Throwable t) { - LOGGER.error("Failed to telemeter command, endpoints={}, command={}", endpoints, command); - } - } - - @Override - public void onFailure(Throwable t) { - LOGGER.error("Failed to telemeter command to remote, endpoints={}, command={}", endpoints, command, t); - } - }, MoreExecutors.directExecutor()); + public void telemetry(Endpoints endpoints, TelemetryCommand command) { + final ClientSessionImpl clientSession = getClientSession(endpoints); + try { + clientSession.fireWrite(command); + } catch (Throwable t) { + LOGGER.error("Failed to fire write telemetry command, clientId={}, endpoints={}", clientId, endpoints, t); + } } private void releaseClientSessions() { - endpointsSessionsLock.readLock().lock(); + sessionsLock.readLock().lock(); try { - endpointsSessionTable.values().forEach(ClientSessionImpl::release); + sessionsTable.values().forEach(ClientSessionImpl::release); } finally { - endpointsSessionsLock.readLock().unlock(); + sessionsLock.readLock().unlock(); } } - /** - * Try to register telemetry session, return it directly if session is existed already. - */ - public ListenableFuture<ClientSessionImpl> registerTelemetrySession(Endpoints endpoints) { - final SettableFuture<ClientSessionImpl> future0 = SettableFuture.create(); - endpointsSessionsLock.readLock().lock(); + public ClientSessionImpl getClientSession(Endpoints endpoints) { + sessionsLock.readLock().lock(); try { - ClientSessionImpl clientSessionImpl = endpointsSessionTable.get(endpoints); - // Return is directly if session is existed already. - if (null != clientSessionImpl) { - future0.set(clientSessionImpl); - return future0; + final ClientSessionImpl session = sessionsTable.get(endpoints); + if (null != session) { + return session; } } finally { - endpointsSessionsLock.readLock().unlock(); + sessionsLock.readLock().unlock(); } - // Future's exception has been logged during the registration. - final ListenableFuture<ClientSessionImpl> future = new ClientSessionImpl(this, endpoints).register(); - return Futures.transform(future, session -> { - endpointsSessionsLock.writeLock().lock(); - try { - ClientSessionImpl existed = endpointsSessionTable.get(endpoints); - if (null != existed) { - session.release(); - return existed; - } - endpointsSessionTable.put(endpoints, session); + sessionsLock.writeLock().lock(); + try { + ClientSessionImpl session = sessionsTable.get(endpoints); + if (null != session) { return session; - } finally { - endpointsSessionsLock.writeLock().unlock(); } - }, MoreExecutors.directExecutor()); + session = new ClientSessionImpl(this, endpoints); + sessionsTable.put(endpoints, session); + return session; + } finally { + sessionsLock.writeLock().unlock(); + } + } + + public ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) { + final ClientSessionImpl clientSession = getClientSession(endpoints); + return clientSession.syncSettingsSafely(); } /** - * Triggered when {@link TopicRouteDataResult} is fetched from remote. + * Triggered when {@link TopicRouteData} is fetched from remote. * * <p>Never thrown any exception. */ - public ListenableFuture<Void> onTopicRouteDataResultFetched(String topic, - TopicRouteDataResult topicRouteDataResult) { - final ListenableFuture<List<ClientSessionImpl>> future = - Futures.allAsList(topicRouteDataResult.getTopicRouteData() - .getMessageQueues().stream() - .map(mq -> mq.getBroker().getEndpoints()) - .collect(Collectors.toSet()) - .stream().map(this::registerTelemetrySession) - .collect(Collectors.toList())); - SettableFuture<Void> future0 = SettableFuture.create(); - Futures.addCallback(future, new FutureCallback<List<ClientSessionImpl>>() { - @Override - public void onSuccess(List<ClientSessionImpl> sessions) { - LOGGER.info("Register session successfully, current route will be cached, topic={}, " - + "topicRouteDataResult={}", topic, topicRouteDataResult); - final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult); - if (topicRouteDataResult.equals(old)) { - // Log if topic route result remains the same. - LOGGER.info("Topic route result remains the same, topic={}, route={}, clientId={}", topic, old, - clientId); - } else { - // Log if topic route result is updated. - LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId, - old, topicRouteDataResult); - } - future0.setFuture(Futures.immediateVoidFuture()); - onTopicRouteDataResultUpdate0(topic, topicRouteDataResult); - } - - @Override - public void onFailure(Throwable t) { - // Note: Topic route would not be updated if failed to register session. - LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, " - + "topicRouteDataResult={}", topic, topicRouteDataResult); - future0.setException(t); - } - }, MoreExecutors.directExecutor()); - return future0; + public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String topic, TopicRouteData topicRouteData) { + final List<ListenableFuture<Void>> futures = topicRouteData + .getMessageQueues().stream() + .map(mq -> mq.getBroker().getEndpoints()) + .collect(Collectors.toSet()) + .stream().map(this::syncSettingsSafely) + .collect(Collectors.toList()); + // TODO: Record exception. + return Futures.whenAllSucceed(futures).callAsync(() -> { + topicRouteCache.put(topic, topicRouteData); + onTopicRouteDataUpdate0(topic, topicRouteData); + return Futures.immediateFuture(topicRouteData); + }, clientCallbackExecutor); } - public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) { + public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) { } /** @@ -506,7 +457,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, .setStatus(status) .build(); try { - telemeter(endpoints, telemetryCommand); + telemetry(endpoints, telemetryCommand); } catch (Throwable t) { LOGGER.warn("Failed to send message verification result, clientId={}", clientId, t); } @@ -520,20 +471,17 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, */ @Override public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) { - LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, " + LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, clientId={}, " + "command={}", clientId, command); } private void updateRouteCache() { LOGGER.info("Start to update route cache for a new round, clientId={}", clientId); - topicRouteResultCache.keySet().forEach(topic -> { - // Set timeout for future on purpose. - final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic), - TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler()); - Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() { + topicRouteCache.keySet().forEach(topic -> { + final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic); + Futures.addCallback(future, new FutureCallback<TopicRouteData>() { @Override - public void onSuccess(TopicRouteDataResult topicRouteDataResult) { - onTopicRouteDataResultFetched(topic, topicRouteDataResult); + public void onSuccess(TopicRouteData topicRouteData) { } @Override @@ -648,7 +596,25 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, public void doStats() { } - private ListenableFuture<TopicRouteDataResult> fetchTopicRoute(final String topic) { + private ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) { + final ListenableFuture<TopicRouteData> future = Futures.transformAsync(fetchTopicRoute0(topic), + topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData), MoreExecutors.directExecutor()); + Futures.addCallback(future, new FutureCallback<TopicRouteData>() { + @Override + public void onSuccess(TopicRouteData topicRouteData) { + LOGGER.info("Fetch topic route successfully, clientId={}, topic={}, topicRouteData={}", clientId, + topic, topicRouteData); + } + + @Override + public void onFailure(Throwable t) { + LOGGER.error("Failed to fetch topic route, clientId={}, topic={}", clientId, topic, t); + } + }, MoreExecutors.directExecutor()); + return future; + } + + private ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) { try { Resource topicResource = Resource.newBuilder().setName(topic).build(); final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource) @@ -656,17 +622,37 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, final Metadata metadata = sign(); final ListenableFuture<RpcInvocation<QueryRouteResponse>> future = clientManager.queryRoute(endpoints, metadata, request, clientConfiguration.getRequestTimeout()); - return Futures.transform(future, invocation -> { + return Futures.transformAsync(future, invocation -> { final QueryRouteResponse response = invocation.getResponse(); final String requestId = invocation.getContext().getRequestId(); final Status status = response.getStatus(); + final String statusMessage = status.getMessage(); final Code code = status.getCode(); - if (Code.OK != code) { - LOGGER.error("Exception raised while fetch topic route from remote, topic={}, " + - "clientId={}, requestId={}, endpoints={}, code={}, status message=[{}]", topic, clientId, - requestId, endpoints, code, status.getMessage()); + final int codeNumber = code.getNumber(); + switch (code) { + case OK: + break; + case BAD_REQUEST: + case ILLEGAL_ACCESS_POINT: + case ILLEGAL_TOPIC: + case CLIENT_ID_REQUIRED: + throw new BadRequestException(codeNumber, requestId, statusMessage); + case NOT_FOUND: + case TOPIC_NOT_FOUND: + throw new NotFoundException(codeNumber, requestId, statusMessage); + case TOO_MANY_REQUESTS: + throw new TooManyRequestsException(codeNumber, requestId, statusMessage); + case INTERNAL_ERROR: + case INTERNAL_SERVER_ERROR: + throw new InternalErrorException(codeNumber, requestId, statusMessage); + case PROXY_TIMEOUT: + throw new ProxyTimeoutException(codeNumber, requestId, statusMessage); + default: + throw new UnsupportedException(codeNumber, requestId, statusMessage); } - return new TopicRouteDataResult(invocation); + final List<MessageQueue> messageQueuesList = response.getMessageQueuesList(); + final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList); + return Futures.immediateFuture(topicRouteData); }, MoreExecutors.directExecutor()); } catch (Throwable t) { return Futures.immediateFailedFuture(t); @@ -675,29 +661,29 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, protected Set<Endpoints> getTotalRouteEndpoints() { Set<Endpoints> totalRouteEndpoints = new HashSet<>(); - for (TopicRouteDataResult result : topicRouteResultCache.values()) { - totalRouteEndpoints.addAll(result.getTopicRouteData().getTotalEndpoints()); + for (TopicRouteData topicRouteData : topicRouteCache.values()) { + totalRouteEndpoints.addAll(topicRouteData.getTotalEndpoints()); } return totalRouteEndpoints; } - protected ListenableFuture<TopicRouteDataResult> getRouteDataResult(final String topic) { - SettableFuture<TopicRouteDataResult> future0 = SettableFuture.create(); - TopicRouteDataResult topicRouteDataResult = topicRouteResultCache.get(topic); + protected ListenableFuture<TopicRouteData> getRouteData(final String topic) { + SettableFuture<TopicRouteData> future0 = SettableFuture.create(); + TopicRouteData topicRouteData = topicRouteCache.get(topic); // If route result was cached before, get it directly. - if (null != topicRouteDataResult) { - future0.set(topicRouteDataResult); + if (null != topicRouteData) { + future0.set(topicRouteData); return future0; } inflightRouteFutureLock.lock(); try { // If route was fetched by last in-flight request, get it directly. - topicRouteDataResult = topicRouteResultCache.get(topic); - if (null != topicRouteDataResult) { - future0.set(topicRouteDataResult); + topicRouteData = topicRouteCache.get(topic); + if (null != topicRouteData) { + future0.set(topicRouteData); return future0; } - Set<SettableFuture<TopicRouteDataResult>> inflightFutures = inflightRouteFutureTable.get(topic); + Set<SettableFuture<TopicRouteData>> inflightFutures = inflightRouteFutureTable.get(topic); // Request is in-flight, return future directly. if (null != inflightFutures) { inflightFutures.add(future0); @@ -709,52 +695,48 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } finally { inflightRouteFutureLock.unlock(); } - final ListenableFuture<TopicRouteDataResult> future = fetchTopicRoute(topic); - Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() { + final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic); + Futures.addCallback(future, new FutureCallback<TopicRouteData>() { @Override - public void onSuccess(TopicRouteDataResult result) { - final ListenableFuture<Void> updateFuture = onTopicRouteDataResultFetched(topic, result); - // TODO: all succeed? - Futures.whenAllSucceed(updateFuture).run(() -> { - inflightRouteFutureLock.lock(); - try { - final Set<SettableFuture<TopicRouteDataResult>> newFutureSet = - inflightRouteFutureTable.remove(topic); - if (null == newFutureSet) { - // Should never reach here. - LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, - clientId); - return; - } - LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future " - + "size={}, clientId={}", topic, newFutureSet.size(), clientId); - for (SettableFuture<TopicRouteDataResult> newFuture : newFutureSet) { - newFuture.set(result); - } - } catch (Throwable t) { + public void onSuccess(TopicRouteData topicRouteData) { + inflightRouteFutureLock.lock(); + try { + final Set<SettableFuture<TopicRouteData>> newFutureSet = + inflightRouteFutureTable.remove(topic); + if (null == newFutureSet) { // Should never reach here. - LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic, - clientId, t); - } finally { - inflightRouteFutureLock.unlock(); + LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, + clientId); + return; } - }, MoreExecutors.directExecutor()); + LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future " + + "size={}, clientId={}", topic, newFutureSet.size(), clientId); + for (SettableFuture<TopicRouteData> newFuture : newFutureSet) { + newFuture.set(topicRouteData); + } + } catch (Throwable t) { + // Should never reach here. + LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic, + clientId, t); + } finally { + inflightRouteFutureLock.unlock(); + } } @Override public void onFailure(Throwable t) { inflightRouteFutureLock.lock(); try { - final Set<SettableFuture<TopicRouteDataResult>> newFutureSet = + final Set<SettableFuture<TopicRouteData>> newFutureSet = inflightRouteFutureTable.remove(topic); if (null == newFutureSet) { // Should never reach here. LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId); return; } - LOGGER.error("Failed to fetch topic route, topic={}, in-flight route future " + + LOGGER.debug("Failed to fetch topic route, topic={}, in-flight route future " + "size={}, clientId={}", topic, newFutureSet.size(), clientId, t); - for (SettableFuture<TopicRouteDataResult> future : newFutureSet) { + for (SettableFuture<TopicRouteData> future : newFutureSet) { future.setException(t); } } finally { diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java index d6a19c2..90402eb 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java @@ -22,15 +22,11 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand; import apache.rocketmq.v2.Settings; import apache.rocketmq.v2.TelemetryCommand; import apache.rocketmq.v2.VerifyMessageCommand; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import io.grpc.stub.StreamObserver; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.rocketmq.client.apis.ClientException; -import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler; import org.apache.rocketmq.client.java.route.Endpoints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,144 +34,125 @@ import org.slf4j.LoggerFactory; /** * Telemetry session is constructed before first communication between client and remote route endpoints. */ -@SuppressWarnings({"UnstableApiUsage", "NullableProblems"}) public class ClientSessionImpl implements StreamObserver<TelemetryCommand> { private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionImpl.class); - private final ClientSessionProcessor processor; + private final ClientSessionHandler handler; private final Endpoints endpoints; - private final ReadWriteLock observerLock; - private StreamObserver<TelemetryCommand> requestObserver = null; + private volatile StreamObserver<TelemetryCommand> requestObserver; - protected ClientSessionImpl(ClientSessionProcessor processor, Endpoints endpoints) { - this.processor = processor; + protected ClientSessionImpl(ClientSessionHandler handler, Endpoints endpoints) { + this.handler = handler; this.endpoints = endpoints; - this.observerLock = new ReentrantReadWriteLock(); + renewRequestObserver(); } - protected ListenableFuture<ClientSessionImpl> register() { - ListenableFuture<ClientSessionImpl> future; + private void renewRequestObserver() { try { - final TelemetryCommand command = processor.getSettingsCommand(); - this.publish(command); - future = Futures.transform(processor.register(), input -> this, MoreExecutors.directExecutor()); + if (handler.isEndpointsDeprecated(endpoints)) { + LOGGER.info("Endpoints is deprecated, no longer to renew requestObserver, endpoints={}", endpoints); + return; + } + this.requestObserver = handler.telemetry(endpoints, this); } catch (Throwable t) { - future = Futures.immediateFailedFuture(t); + handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS); } - final String clientId = processor.clientId(); - Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() { - @Override - public void onSuccess(ClientSessionImpl session) { - LOGGER.info("Register client session successfully, endpoints={}, clientId={}", endpoints, clientId); - } + } - @Override - public void onFailure(Throwable t) { - LOGGER.error("Failed to register client session, endpoints={}, clientId={}", endpoints, clientId, t); - release(); - } - }, MoreExecutors.directExecutor()); - return future; + protected ListenableFuture<Void> syncSettingsSafely() { + try { + final TelemetryCommand settings = handler.settingsCommand(); + fireWrite(settings); + return handler.awaitSettingSynchronized(); + } catch (Throwable t) { + return Futures.immediateFailedFuture(t); + } } /** * Release telemetry session. */ public void release() { - this.observerLock.writeLock().lock(); + if (null == requestObserver) { + return; + } try { - if (null != requestObserver) { - try { - requestObserver.onCompleted(); - } catch (Throwable ignore) { - // Ignore exception on purpose. - } - requestObserver = null; - } - } finally { - this.observerLock.writeLock().unlock(); + requestObserver.onCompleted(); + } catch (Throwable ignore) { + // Ignore exception on purpose. } } - /** - * Telemeter command to remote. - * - * @param command appointed command to telemeter - */ - public void publish(TelemetryCommand command) throws ClientException { - this.observerLock.readLock().lock(); - try { - if (null != requestObserver) { - requestObserver.onNext(command); - return; - } - } finally { - this.observerLock.readLock().unlock(); - } - this.observerLock.writeLock().lock(); - try { - if (null == requestObserver) { - this.requestObserver = processor.telemetry(endpoints, this); - } - requestObserver.onNext(command); - } finally { - this.observerLock.writeLock().unlock(); + public void fireWrite(TelemetryCommand command) { + if (null == requestObserver) { + LOGGER.error("Request observer does not exist, ignore current command, endpoints={}, command={}", + endpoints, command); + return; } + requestObserver.onNext(command); } @Override public void onNext(TelemetryCommand command) { + final String clientId = handler.clientId(); try { switch (command.getCommandCase()) { case SETTINGS: { final Settings settings = command.getSettings(); - LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints, - processor.clientId()); - processor.onSettingsCommand(endpoints, settings); + LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints, clientId); + handler.onSettingsCommand(endpoints, settings); break; } case RECOVER_ORPHANED_TRANSACTION_COMMAND: { final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand = command.getRecoverOrphanedTransactionCommand(); LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, " - + "clientId={}", endpoints, processor.clientId()); - processor.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand); + + "clientId={}", endpoints, clientId); + handler.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand); break; } case VERIFY_MESSAGE_COMMAND: { final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand(); LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}", - endpoints, processor.clientId()); - processor.onVerifyMessageCommand(endpoints, verifyMessageCommand); + endpoints, clientId); + handler.onVerifyMessageCommand(endpoints, verifyMessageCommand); break; } case PRINT_THREAD_STACK_TRACE_COMMAND: { final PrintThreadStackTraceCommand printThreadStackTraceCommand = command.getPrintThreadStackTraceCommand(); LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}", - endpoints, processor.clientId()); - processor.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand); + endpoints, clientId); + handler.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand); break; } default: LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}", - endpoints, command, processor.clientId()); + endpoints, command, clientId); } } catch (Throwable t) { LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, " - + "clientId={}", command, processor.clientId(), t); + + "clientId={}", command, clientId, t); } } @Override public void onError(Throwable throwable) { LOGGER.error("Exception raised from stream response observer, clientId={}, endpoints={}", - processor.clientId(), endpoints, throwable); - this.release(); + handler.clientId(), endpoints, throwable); + release(); + if (!handler.isRunning()) { + return; + } + handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS); } @Override public void onCompleted() { - this.release(); + release(); + if (!handler.isRunning()) { + return; + } + handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java index d4271bf..965507c 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java @@ -497,18 +497,18 @@ class ProcessQueueImpl implements ProcessQueue { forwardToDeadLetterQueue(messageView, 1 + attempt, future0); return; } + // Set result if message is forwarded successfully. + future0.setFuture(Futures.immediateVoidFuture()); // Log retries. if (1 < attempt) { LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, " + "attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, attempt, messageId, mq, endpoints, requestId); - } else { - LOGGER.debug("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, " - + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, - endpoints, requestId); + return; } - // Set result if message is forwarded successfully. - future0.setFuture(Futures.immediateVoidFuture()); + LOGGER.info("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, " + + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, + endpoints, requestId); } @Override diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java index 2c18a92..86bef80 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java @@ -77,7 +77,7 @@ import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.retry.RetryPolicy; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; -import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.route.TopicRouteData; import org.apache.rocketmq.client.java.rpc.RpcInvocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -234,9 +234,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach this.state(), clientId); throw new IllegalStateException("Push consumer is not running now"); } - final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic); - TopicRouteDataResult topicRouteDataResult = handleClientFuture(future); - topicRouteDataResult.checkAndGetTopicRouteData(); + final ListenableFuture<TopicRouteData> future = getRouteData(topic); + handleClientFuture(future); subscriptionExpressions.put(topic, filterExpression); return this; } @@ -257,9 +256,9 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach } private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic) { - final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic); - return Futures.transformAsync(future, topicRouteDataResult -> { - Endpoints endpoints = topicRouteDataResult.checkAndGetTopicRouteData().pickEndpointsToQueryAssignments(); + final ListenableFuture<TopicRouteData> future = getRouteData(topic); + return Futures.transformAsync(future, topicRouteData -> { + Endpoints endpoints = topicRouteData.pickEndpointsToQueryAssignments(); return Futures.immediateFuture(endpoints); }, MoreExecutors.directExecutor()); } @@ -513,7 +512,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach .setStatus(status) .build(); try { - telemeter(endpoints, command); + telemetry(endpoints, command); } catch (Throwable t) { LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, " + "messageId={}, clientId={}", endpoints, command, messageId, clientId, t); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java index d90edf8..d8ce5d8 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java @@ -55,7 +55,7 @@ import org.apache.rocketmq.client.java.impl.ClientSettings; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.message.protocol.Resource; import org.apache.rocketmq.client.java.route.MessageQueueImpl; -import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.route.TopicRouteData; import org.apache.rocketmq.client.java.rpc.RpcInvocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,9 +130,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { this.state(), clientId); throw new IllegalStateException("Simple consumer is not running now"); } - final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic); - TopicRouteDataResult topicRouteDataResult = handleClientFuture(future); - topicRouteDataResult.checkAndGetTopicRouteData(); + final ListenableFuture<TopicRouteData> future = getRouteData(topic); + handleClientFuture(future); subscriptionExpressions.put(topic, filterExpression); return this; } @@ -359,9 +358,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { return simpleConsumerSettings; } - public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) { + public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) { final SubscriptionLoadBalancer subscriptionLoadBalancer = - new SubscriptionLoadBalancer(topicRouteDataResult); + new SubscriptionLoadBalancer(topicRouteData); subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer); } @@ -372,7 +371,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { future0.set(result); return future0; } - final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic); + final ListenableFuture<TopicRouteData> future = getRouteData(topic); return Futures.transform(future, topicRouteDataResult -> { final SubscriptionLoadBalancer subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteDataResult); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java index 21610b2..012d441 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java @@ -17,20 +17,20 @@ package org.apache.rocketmq.client.java.impl.consumer; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.math.IntMath; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import javax.annotation.concurrent.Immutable; import org.apache.commons.lang3.RandomUtils; -import org.apache.rocketmq.client.apis.ClientException; -import org.apache.rocketmq.client.java.exception.NotFoundException; import org.apache.rocketmq.client.java.misc.Utilities; import org.apache.rocketmq.client.java.route.MessageQueueImpl; -import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.route.TopicRouteData; @Immutable public class SubscriptionLoadBalancer { - private final TopicRouteDataResult topicRouteDataResult; /** * Index for round-robin. */ @@ -40,30 +40,19 @@ public class SubscriptionLoadBalancer { */ private final ImmutableList<MessageQueueImpl> messageQueues; - public SubscriptionLoadBalancer(TopicRouteDataResult topicRouteDataResult) { - this.topicRouteDataResult = topicRouteDataResult; + public SubscriptionLoadBalancer(TopicRouteData topicRouteData) { this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); - final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder(); - if (!topicRouteDataResult.ok()) { - this.messageQueues = builder.build(); - return; + final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream() + .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() && + Utilities.MASTER_BROKER_ID == mq.getBroker().getId()) + .collect(Collectors.toList()); + if (mqs.isEmpty()) { + throw new IllegalArgumentException("No readable message queue found, topiRouteData=" + topicRouteData); } - for (MessageQueueImpl messageQueue : topicRouteDataResult.getTopicRouteData().getMessageQueues()) { - if (!messageQueue.getPermission().isReadable() || - Utilities.MASTER_BROKER_ID != messageQueue.getBroker().getId()) { - continue; - } - builder.add(messageQueue); - } - this.messageQueues = builder.build(); + this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build(); } - public MessageQueueImpl takeMessageQueue() throws ClientException { - topicRouteDataResult.checkAndGetTopicRouteData(); - if (messageQueues.isEmpty()) { - // Should never reach here. - throw new NotFoundException("Failed to take message queue due to readable message queue doesn't exist"); - } + public MessageQueueImpl takeMessageQueue() { final int next = index.getAndIncrement(); return messageQueues.get(IntMath.mod(next, messageQueues.size())); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java similarity index 83% rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java index 8bdc019..817e24f 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java @@ -24,15 +24,23 @@ import apache.rocketmq.v2.TelemetryCommand; import apache.rocketmq.v2.VerifyMessageCommand; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.stub.StreamObserver; +import java.util.concurrent.ScheduledExecutorService; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.java.route.Endpoints; -public interface ClientSessionProcessor { - ListenableFuture<Void> register(); +public interface ClientSessionHandler { + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + boolean isRunning(); + + ScheduledExecutorService getScheduler(); + + boolean isEndpointsDeprecated(Endpoints endpoints); + + ListenableFuture<Void> awaitSettingSynchronized(); String clientId(); - TelemetryCommand getSettingsCommand(); + TelemetryCommand settingsCommand(); StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, StreamObserver<TelemetryCommand> observer) throws ClientException; diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java index bb4ae49..08483f0 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java @@ -72,7 +72,7 @@ import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy; import org.apache.rocketmq.client.java.retry.RetryPolicy; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; -import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.route.TopicRouteData; import org.apache.rocketmq.client.java.rpc.RpcInvocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +89,7 @@ class ProducerImpl extends ClientImpl implements Producer { protected final ProducerSettings producerSettings; private final TransactionChecker checker; - private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataResultCache; + private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataCache; /** * The caller is supposed to have validated the arguments and handled throwing exception or @@ -102,7 +102,7 @@ class ProducerImpl extends ClientImpl implements Producer { this.producerSettings = new ProducerSettings(clientId, endpoints, retryPolicy, clientConfiguration.getRequestTimeout(), topics); this.checker = checker; - this.publishingRouteDataResultCache = new ConcurrentHashMap<>(); + this.publishingRouteDataCache = new ConcurrentHashMap<>(); } @Override @@ -540,26 +540,21 @@ class ProducerImpl extends ClientImpl implements Producer { } @Override - public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) { + public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) { final PublishingLoadBalancer publishingLoadBalancer = - new PublishingLoadBalancer(topicRouteDataResult); - publishingRouteDataResultCache.put(topic, publishingLoadBalancer); + new PublishingLoadBalancer(topicRouteData); + publishingRouteDataCache.put(topic, publishingLoadBalancer); } private ListenableFuture<PublishingLoadBalancer> getPublishingTopicRouteResult(final String topic) { - SettableFuture<PublishingLoadBalancer> future0 = SettableFuture.create(); - final PublishingLoadBalancer result = publishingRouteDataResultCache.get(topic); + final PublishingLoadBalancer result = publishingRouteDataCache.get(topic); if (null != result) { - future0.set(result); - return future0; + return Futures.immediateFuture(result); } - return Futures.transformAsync(getRouteDataResult(topic), topicRouteDataResult -> { - SettableFuture<PublishingLoadBalancer> future = SettableFuture.create(); - final PublishingLoadBalancer publishingLoadBalancer = - new PublishingLoadBalancer(topicRouteDataResult); - publishingRouteDataResultCache.put(topic, publishingLoadBalancer); - future.set(publishingLoadBalancer); - return future; + return Futures.transformAsync(getRouteData(topic), topicRouteDataResult -> { + final PublishingLoadBalancer loadBalancer = new PublishingLoadBalancer(topicRouteDataResult); + publishingRouteDataCache.put(topic, loadBalancer); + return Futures.immediateFuture(loadBalancer); }, MoreExecutors.directExecutor()); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java index 9b326de..feb9616 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.java.impl.producer; import com.google.common.base.Objects; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.hash.Hashing; import com.google.common.math.IntMath; @@ -28,19 +29,17 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import javax.annotation.concurrent.Immutable; import org.apache.commons.lang3.RandomUtils; -import org.apache.rocketmq.client.apis.ClientException; -import org.apache.rocketmq.client.java.exception.NotFoundException; import org.apache.rocketmq.client.java.misc.Utilities; import org.apache.rocketmq.client.java.route.Broker; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; -import org.apache.rocketmq.client.java.route.TopicRouteDataResult; +import org.apache.rocketmq.client.java.route.TopicRouteData; @Immutable public class PublishingLoadBalancer { - private final TopicRouteDataResult topicRouteDataResult; /** * Index for round-robin. */ @@ -50,40 +49,25 @@ public class PublishingLoadBalancer { */ private final ImmutableList<MessageQueueImpl> messageQueues; - public PublishingLoadBalancer(TopicRouteDataResult topicRouteDataResult) { - this.topicRouteDataResult = topicRouteDataResult; + public PublishingLoadBalancer(TopicRouteData topicRouteData) { this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); - final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder(); - if (!topicRouteDataResult.ok()) { - this.messageQueues = builder.build(); - return; - } - for (MessageQueueImpl messageQueue : topicRouteDataResult.getTopicRouteData().getMessageQueues()) { - if (!messageQueue.getPermission().isWritable() || - Utilities.MASTER_BROKER_ID != messageQueue.getBroker().getId()) { - continue; - } - builder.add(messageQueue); - } - this.messageQueues = builder.build(); - } - - private void preconditionCheckBeforeTakingMessageQueue() throws ClientException { - topicRouteDataResult.checkAndGetTopicRouteData(); - if (messageQueues.isEmpty()) { - throw new NotFoundException("Failed to take message due to writable message queue doesn't exist"); + final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream() + .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() && + Utilities.MASTER_BROKER_ID == mq.getBroker().getId()) + .collect(Collectors.toList()); + if (mqs.isEmpty()) { + throw new IllegalArgumentException("No writable message queue found, topiRouteData=" + topicRouteData); } + this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build(); } - public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) throws ClientException { - preconditionCheckBeforeTakingMessageQueue(); + public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) { final long hashCode = Hashing.sipHash24().hashBytes(messageGroup.getBytes(StandardCharsets.UTF_8)).asLong(); final int index = LongMath.mod(hashCode, messageQueues.size()); return messageQueues.get(index); } - public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) throws ClientException { - preconditionCheckBeforeTakingMessageQueue(); + public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) { int next = index.getAndIncrement(); List<MessageQueueImpl> candidates = new ArrayList<>(); Set<String> candidateBrokerNames = new HashSet<>(); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java deleted file mode 100644 index 94adee4..0000000 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.client.java.route; - -import apache.rocketmq.v2.Code; -import apache.rocketmq.v2.MessageQueue; -import apache.rocketmq.v2.QueryRouteResponse; -import apache.rocketmq.v2.Status; -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; -import java.util.List; -import javax.annotation.concurrent.Immutable; -import org.apache.rocketmq.client.apis.ClientException; -import org.apache.rocketmq.client.java.exception.BadRequestException; -import org.apache.rocketmq.client.java.exception.InternalErrorException; -import org.apache.rocketmq.client.java.exception.NotFoundException; -import org.apache.rocketmq.client.java.exception.ProxyTimeoutException; -import org.apache.rocketmq.client.java.exception.TooManyRequestsException; -import org.apache.rocketmq.client.java.exception.UnsupportedException; -import org.apache.rocketmq.client.java.rpc.RpcInvocation; - -/** - * Result topic route data fetched from remote. - */ -@Immutable -public class TopicRouteDataResult { - private final TopicRouteData topicRouteData; - private final ClientException exception; - - public TopicRouteDataResult(RpcInvocation<QueryRouteResponse> invocation) { - final QueryRouteResponse response = invocation.getResponse(); - final String requestId = invocation.getContext().getRequestId(); - final List<MessageQueue> messageQueuesList = response.getMessageQueuesList(); - final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList); - final Status status = response.getStatus(); - this.topicRouteData = topicRouteData; - final Code code = status.getCode(); - final int codeNumber = code.getNumber(); - final String statusMessage = status.getMessage(); - switch (code) { - case OK: - this.exception = null; - break; - case BAD_REQUEST: - case ILLEGAL_ACCESS_POINT: - case ILLEGAL_TOPIC: - case CLIENT_ID_REQUIRED: - this.exception = new BadRequestException(codeNumber, requestId, statusMessage); - break; - case NOT_FOUND: - case TOPIC_NOT_FOUND: - this.exception = new NotFoundException(codeNumber, requestId, statusMessage); - break; - case TOO_MANY_REQUESTS: - this.exception = new TooManyRequestsException(codeNumber, requestId, statusMessage); - break; - case INTERNAL_ERROR: - case INTERNAL_SERVER_ERROR: - this.exception = new InternalErrorException(codeNumber, requestId, statusMessage); - break; - case PROXY_TIMEOUT: - this.exception = new ProxyTimeoutException(codeNumber, requestId, statusMessage); - break; - default: - this.exception = new UnsupportedException(codeNumber, requestId, statusMessage); - } - } - - public TopicRouteData getTopicRouteData() { - return topicRouteData; - } - - public TopicRouteData checkAndGetTopicRouteData() throws ClientException { - if (null != exception) { - throw exception; - } - return topicRouteData; - } - - public boolean ok() { - return null == exception; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TopicRouteDataResult result = (TopicRouteDataResult) o; - return Objects.equal(topicRouteData, result.topicRouteData) && Objects.equal(exception, result.exception); - } - - @Override - public int hashCode() { - return Objects.hashCode(topicRouteData, exception); - } - - @Override - public String toString() { - final MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this) - .add("topicRouteData", this.topicRouteData); - if (null == exception) { - return helper.toString(); - } - return helper.add("exception", this.exception).toString(); - } -}
