This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 1a25089 Add more unit tests (#148)
1a25089 is described below
commit 1a25089115669dfa9ce573302d17975a39cfccf3
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Aug 11 15:43:47 2022 +0800
Add more unit tests (#148)
---
.../rocketmq/client/java/impl/ClientImpl.java | 49 ++++----
.../client/java/impl/ClientManagerImpl.java | 47 ++++----
.../client/java/impl/ClientSessionImpl.java | 27 ++---
.../impl/{ClientSettings.java => Settings.java} | 20 +---
.../client/java/impl/consumer/ConsumerImpl.java | 26 ++---
.../java/impl/consumer/ProcessQueueImpl.java | 2 +-
.../java/impl/consumer/PushConsumerImpl.java | 26 ++---
...Settings.java => PushSubscriptionSettings.java} | 29 +++--
.../java/impl/consumer/SimpleConsumerImpl.java | 12 +-
...ttings.java => SimpleSubscriptionSettings.java} | 34 +++---
.../java/impl/producer/ClientSessionHandler.java | 6 -
.../client/java/impl/producer/ProducerImpl.java | 27 +++--
...oducerSettings.java => PublishingSettings.java} | 21 ++--
.../client/java/impl/producer/TransactionImpl.java | 10 +-
.../client/java/message/PublishingMessageImpl.java | 6 +-
.../client/java/message/protocol/Resource.java | 12 +-
.../rocketmq/client/java/misc/Utilities.java | 14 ++-
.../client/java/impl/ClientManagerImplTest.java | 5 +-
.../java/impl/consumer/ConsumeServiceTest.java | 114 +++++++++++++++++++
.../client/java/impl/consumer/ConsumeTaskTest.java | 55 +++++++++
.../java/impl/consumer/ConsumerImplTest.java | 123 +++++++++++++++++++++
...erviceTest.java => FifoConsumeServiceTest.java} | 36 +++---
.../java/impl/consumer/ProcessQueueImplTest.java | 14 +--
.../impl/consumer/PushConsumerBuilderImplTest.java | 4 +-
.../java/impl/consumer/PushConsumerImplTest.java | 4 +-
.../consumer/PushSubscriptionSettingsTest.java | 120 ++++++++++++++++++++
.../impl/consumer/SimpleConsumerBuilderTest.java | 4 +-
.../java/impl/consumer/SimpleConsumerImplTest.java | 20 ++--
.../consumer/SimpleSubscriptionSettingsTest.java | 94 ++++++++++++++++
.../impl/consumer/StandardConsumeServiceTest.java | 10 +-
.../impl/producer/ProducerBuilderImplTest.java | 35 ++++++
.../java/impl/producer/ProducerImplTest.java | 4 +-
.../java/impl/producer/TransactionImplTest.java | 93 ++++++++++++++++
.../protocol/EncodingTest.java} | 34 +++---
.../client/java/message/protocol/ResourceTest.java | 54 +++++++++
.../rocketmq/client/java/misc/UtilitiesTest.java | 64 +++++++++++
.../apache/rocketmq/client/java/tool/TestBase.java | 10 +-
java/style/spotbugs-suppressions.xml | 8 +-
38 files changed, 1003 insertions(+), 270 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 0a3257a..e7e6fd6 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -29,7 +29,6 @@ import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Resource;
-import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.ThreadStackTrace;
@@ -100,7 +99,6 @@ public abstract class ClientImpl extends AbstractIdleService
implements Client,
*/
private static final Duration TELEMETRY_TIMEOUT = Duration.ofDays(60 *
365);
- protected final ClientManager clientManager;
protected final ClientConfiguration clientConfiguration;
protected final Endpoints endpoints;
protected final Set<String> topics;
@@ -114,6 +112,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
protected final ThreadPoolExecutor telemetryCommandExecutor;
protected final String clientId;
+ private final ClientManager clientManager;
private volatile ScheduledFuture<?> updateRouteCacheFuture;
private final ConcurrentMap<String, TopicRouteData> topicRouteCache;
@@ -276,7 +275,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
@Override
public TelemetryCommand settingsCommand() {
- final Settings settings = this.getClientSettings().toProtobuf();
+ final apache.rocketmq.v2.Settings settings =
this.getSettings().toProtobuf();
return TelemetryCommand.newBuilder().setSettings(settings).build();
}
@@ -299,12 +298,6 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
return !totalRouteEndpoints.contains(endpoints);
}
- @Override
- public ListenableFuture<Void> awaitSettingSynchronized() {
- return Futures.transformAsync(this.getClientSettings().arrivedFuture,
- (clientSettings) -> Futures.immediateVoidFuture(),
clientCallbackExecutor);
- }
-
/**
* This method is invoked while request of printing thread stack trace is
received from remote.
*
@@ -338,7 +331,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
}
- public abstract ClientSettings getClientSettings();
+ public abstract Settings getSettings();
/**
* Apply setting from remote.
@@ -347,10 +340,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
* @param settings settings received from remote.
*/
@Override
- public final void onSettingsCommand(Endpoints endpoints, Settings
settings) {
+ public final void onSettingsCommand(Endpoints endpoints,
apache.rocketmq.v2.Settings settings) {
final Metric metric = new Metric(settings.getMetric());
clientMeterProvider.reset(metric);
- this.getClientSettings().applySettingsCommand(settings);
+ this.getSettings().sync(settings);
}
/**
@@ -358,7 +351,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
*/
@Override
public void syncSettings() {
- final Settings settings = getClientSettings().toProtobuf();
+ final apache.rocketmq.v2.Settings settings =
getSettings().toProtobuf();
final TelemetryCommand command =
TelemetryCommand.newBuilder().setSettings(settings).build();
final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
for (Endpoints endpoints : totalRouteEndpoints) {
@@ -373,7 +366,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
public void telemetry(Endpoints endpoints, TelemetryCommand command) {
try {
final ClientSessionImpl clientSession =
getClientSession(endpoints);
- clientSession.fireWrite(command);
+ clientSession.write(command);
} catch (Throwable t) {
LOGGER.error("Failed to fire write telemetry command, clientId={},
endpoints={}", clientId, endpoints, t);
}
@@ -412,15 +405,6 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
}
- private ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) {
- try {
- final ClientSessionImpl clientSession =
getClientSession(endpoints);
- return clientSession.syncSettingsSafely();
- } catch (Throwable t) {
- return Futures.immediateFailedFuture(t);
- }
- }
-
/**
* Triggered when {@link TopicRouteData} is fetched from remote.
*
@@ -432,15 +416,18 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
.map(mq -> mq.getBroker().getEndpoints())
.collect(Collectors.toSet());
final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
- final Set<Endpoints> newEndpoints = new
HashSet<>(Sets.difference(routeEndpoints,
- existRouteEndpoints));
- final List<ListenableFuture<Void>> futures =
-
newEndpoints.stream().map(this::syncSettingsSafely).collect(Collectors.toList());
- return Futures.whenAllSucceed(futures).callAsync(() -> {
+ final Set<Endpoints> newEndpoints = new
HashSet<>(Sets.difference(routeEndpoints, existRouteEndpoints));
+ try {
+ for (Endpoints endpoints : newEndpoints) {
+ final ClientSessionImpl clientSession =
getClientSession(endpoints);
+ clientSession.syncSettings();
+ }
topicRouteCache.put(topic, topicRouteData);
onTopicRouteDataUpdate0(topic, topicRouteData);
return Futures.immediateFuture(topicRouteData);
- }, clientCallbackExecutor);
+ } catch (Throwable t) {
+ return Futures.immediateFailedFuture(t);
+ }
}
public void onTopicRouteDataUpdate0(String topic, TopicRouteData
topicRouteData) {
@@ -523,6 +510,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
}
+ public ClientManager getClientManager() {
+ return clientManager;
+ }
+
/**
* @see Client#clientId()
*/
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index fc3e0eb..e9bc30a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
+import org.apache.rocketmq.client.java.misc.MetadataUtils;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.rpc.RpcClient;
@@ -79,7 +80,7 @@ public class ClientManagerImpl extends ClientManager {
public static final Duration HEART_BEAT_INITIAL_DELAY =
Duration.ofSeconds(1);
public static final Duration HEART_BEAT_PERIOD = Duration.ofSeconds(10);
- public static final Duration LOG_STATS_INITIAL_DELAY =
Duration.ofSeconds(60);
+ public static final Duration LOG_STATS_INITIAL_DELAY =
Duration.ofSeconds(1);
public static final Duration LOG_STATS_PERIOD = Duration.ofSeconds(60);
public static final Duration SYNC_SETTINGS_DELAY = Duration.ofSeconds(1);
@@ -88,6 +89,7 @@ public class ClientManagerImpl extends ClientManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClientManagerImpl.class);
private final Client client;
+
@GuardedBy("rpcClientTableLock")
private final Map<Endpoints, RpcClient> rpcClientTable;
private final ReadWriteLock rpcClientTableLock;
@@ -132,14 +134,15 @@ public class ClientManagerImpl extends ClientManager {
while (it.hasNext()) {
final Map.Entry<Endpoints, RpcClient> entry = it.next();
final Endpoints endpoints = entry.getKey();
- final RpcClient client = entry.getValue();
+ final RpcClient rpcClient = entry.getValue();
- final Duration idleDuration = client.idleDuration();
+ final Duration idleDuration = rpcClient.idleDuration();
if (idleDuration.compareTo(RPC_CLIENT_MAX_IDLE_DURATION) > 0) {
it.remove();
- client.shutdown();
+ rpcClient.shutdown();
LOGGER.info("Rpc client has been idle for a long time,
endpoints={}, idleDuration={}, " +
- "rpcClientMaxIdleDuration={}", endpoints,
idleDuration, RPC_CLIENT_MAX_IDLE_DURATION);
+ "rpcClientMaxIdleDuration={}, clientId={}",
endpoints, idleDuration,
+ RPC_CLIENT_MAX_IDLE_DURATION, client.clientId());
}
}
} finally {
@@ -176,7 +179,7 @@ public class ClientManagerImpl extends ClientManager {
try {
rpcClient = new RpcClientImpl(endpoints);
} catch (SSLException e) {
- LOGGER.error("Failed to get rpc client, endpoints={}",
endpoints);
+ LOGGER.error("Failed to get RPC client, endpoints={},
clientId={}", endpoints, client.clientId(), e);
throw new ClientException("Failed to generate RPC client", e);
}
rpcClientTable.put(endpoints, rpcClient);
@@ -310,13 +313,14 @@ public class ClientManagerImpl extends ClientManager {
@Override
protected void startUp() {
- LOGGER.info("Begin to start the client manager");
+ final String clientId = client.clientId();
+ LOGGER.info("Begin to start the client manager, clientId={}",
clientId);
scheduler.scheduleWithFixedDelay(
() -> {
try {
clearIdleRpcClients();
} catch (Throwable t) {
- LOGGER.error("Exception raised while clear idle rpc
clients.", t);
+ LOGGER.error("Exception raised during the clearing of idle
rpc clients, clientId={}", clientId, t);
}
},
RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY.toNanos(),
@@ -329,7 +333,7 @@ public class ClientManagerImpl extends ClientManager {
try {
client.doHeartbeat();
} catch (Throwable t) {
- LOGGER.error("Exception raised while heartbeat.", t);
+ LOGGER.error("Exception raised during heartbeat,
clientId={}", clientId, t);
}
},
HEART_BEAT_INITIAL_DELAY.toNanos(),
@@ -340,9 +344,11 @@ public class ClientManagerImpl extends ClientManager {
scheduler.scheduleWithFixedDelay(
() -> {
try {
+ LOGGER.info("Start to log statistics, clientVersion={},
clientWrapperVersion={}, clientId={}",
+ MetadataUtils.getVersion(),
MetadataUtils.getWrapperVersion(), clientId);
client.doStats();
} catch (Throwable t) {
- LOGGER.error("Exception raised while log stats.", t);
+ LOGGER.error("Exception raised during statistics logging,
clientId={}", clientId, t);
}
},
LOG_STATS_INITIAL_DELAY.toNanos(),
@@ -355,25 +361,26 @@ public class ClientManagerImpl extends ClientManager {
try {
client.syncSettings();
} catch (Throwable t) {
- LOGGER.error("Exception raised during the setting
synchronizing.", t);
+ LOGGER.error("Exception raised during the setting
synchronization, clientId={}", clientId, t);
}
},
SYNC_SETTINGS_DELAY.toNanos(),
SYNC_SETTINGS_PERIOD.toNanos(),
TimeUnit.NANOSECONDS
);
- LOGGER.info("The client manager starts successfully");
+ LOGGER.info("The client manager starts successfully, clientId={}",
clientId);
}
@Override
protected void shutDown() throws IOException {
- LOGGER.info("Begin to shutdown the client manager");
+ final String clientId = client.clientId();
+ LOGGER.info("Begin to shutdown the client manager, clientId={}",
clientId);
scheduler.shutdown();
try {
if (!ExecutorServices.awaitTerminated(scheduler)) {
- LOGGER.error("[Bug] Timeout to shutdown the client scheduler");
+ LOGGER.error("[Bug] Timeout to shutdown the client scheduler,
clientId={}", clientId);
} else {
- LOGGER.info("Shutdown the client scheduler successfully");
+ LOGGER.info("Shutdown the client scheduler successfully,
clientId={}", clientId);
}
rpcClientTableLock.writeLock().lock();
try {
@@ -387,17 +394,17 @@ public class ClientManagerImpl extends ClientManager {
} finally {
rpcClientTableLock.writeLock().unlock();
}
- LOGGER.info("Shutdown all rpc client(s) successfully");
+ LOGGER.info("Shutdown all rpc client(s) successfully,
clientId={}", clientId);
asyncWorker.shutdown();
if (!ExecutorServices.awaitTerminated(asyncWorker)) {
- LOGGER.error("[Bug] Timeout to shutdown the client async
worker");
+ LOGGER.error("[Bug] Timeout to shutdown the client async
worker, clientId={}", clientId);
} else {
- LOGGER.info("Shutdown the client async worker successfully");
+ LOGGER.info("Shutdown the client async worker successfully,
clientId={}", clientId);
}
} catch (InterruptedException e) {
- LOGGER.error("[Bug] Unexpected exception raised while shutdown
client manager", e);
+ LOGGER.error("[Bug] Unexpected exception raised while shutdown
client manager, clientId={}", clientId, e);
throw new IOException(e);
}
- LOGGER.info("Shutdown the client manager successfully");
+ LOGGER.info("Shutdown the client manager successfully, clientId={}",
clientId);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index b699ab0..f2edc70 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -22,11 +22,12 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -39,14 +40,17 @@ import org.slf4j.LoggerFactory;
public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClientSessionImpl.class);
private static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY =
Duration.ofSeconds(1);
+ private static final long SETTINGS_INITIALIZATION_TIMEOUT_MILLIS = 3000;
private final ClientSessionHandler sessionHandler;
private final Endpoints endpoints;
+ private final SettableFuture<Settings> settingsSettableFuture;
private volatile StreamObserver<TelemetryCommand> requestObserver;
protected ClientSessionImpl(ClientSessionHandler sessionHandler, Endpoints
endpoints) throws ClientException {
this.sessionHandler = sessionHandler;
this.endpoints = endpoints;
+ this.settingsSettableFuture = SettableFuture.create();
this.requestObserver = sessionHandler.telemetry(endpoints, this);
}
@@ -64,21 +68,17 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(),
TimeUnit.NANOSECONDS);
return;
}
- syncSettings();
+ syncSettings0();
}
- protected ListenableFuture<Void> syncSettingsSafely() {
- try {
- this.syncSettings();
- return sessionHandler.awaitSettingSynchronized();
- } catch (Throwable t) {
- return Futures.immediateFailedFuture(t);
- }
+ protected void syncSettings() throws TimeoutException, ExecutionException,
InterruptedException {
+ this.syncSettings0();
+ settingsSettableFuture.get(SETTINGS_INITIALIZATION_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS);
}
- private void syncSettings() {
+ private void syncSettings0() {
final TelemetryCommand settings = sessionHandler.settingsCommand();
- fireWrite(settings);
+ write(settings);
}
/**
@@ -95,7 +95,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
}
}
- void fireWrite(TelemetryCommand command) {
+ void write(TelemetryCommand command) {
if (null == requestObserver) {
LOGGER.error("Request observer does not exist, ignore current
command, endpoints={}, command={}",
endpoints, command);
@@ -113,6 +113,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
final Settings settings = command.getSettings();
LOGGER.info("Receive settings from remote, endpoints={},
clientId={}", endpoints, clientId);
sessionHandler.onSettingsCommand(endpoints, settings);
+ settingsSettableFuture.set(settings);
break;
}
case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
similarity index 73%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index bc50c4f..e18c82c 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -17,47 +17,39 @@
package org.apache.rocketmq.client.java.impl;
-import apache.rocketmq.v2.Settings;
import com.google.common.base.MoreObjects;
-import com.google.common.util.concurrent.SettableFuture;
import java.time.Duration;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
-public abstract class ClientSettings {
+public abstract class Settings {
protected final String clientId;
protected final ClientType clientType;
protected final Endpoints accessPoint;
protected volatile RetryPolicy retryPolicy;
protected final Duration requestTimeout;
- protected final SettableFuture<Void> arrivedFuture;
- public ClientSettings(String clientId, ClientType clientType, Endpoints
accessPoint,
- RetryPolicy retryPolicy, Duration requestTimeout) {
+ public Settings(String clientId, ClientType clientType, Endpoints
accessPoint, RetryPolicy retryPolicy,
+ Duration requestTimeout) {
this.clientId = clientId;
this.clientType = clientType;
this.accessPoint = accessPoint;
this.retryPolicy = retryPolicy;
this.requestTimeout = requestTimeout;
- this.arrivedFuture = SettableFuture.create();
}
- public ClientSettings(String clientId, ClientType clientType, Endpoints
accessPoint, Duration requestTimeout) {
+ public Settings(String clientId, ClientType clientType, Endpoints
accessPoint, Duration requestTimeout) {
this(clientId, clientType, accessPoint, null, requestTimeout);
}
- public abstract Settings toProtobuf();
+ public abstract apache.rocketmq.v2.Settings toProtobuf();
- public abstract void applySettingsCommand(Settings settings);
+ public abstract void sync(apache.rocketmq.v2.Settings settings);
public RetryPolicy getRetryPolicy() {
return retryPolicy;
}
- public SettableFuture<Void> getArrivedFuture() {
- return arrivedFuture;
- }
-
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 791a8f2..58b19b5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -36,7 +36,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import io.grpc.Metadata;
@@ -83,7 +82,7 @@ abstract class ConsumerImpl extends ClientImpl {
final Duration tolerance = clientConfiguration.getRequestTimeout();
final Duration timeout = Duration.ofNanos(awaitDuration.toNanos()
+ tolerance.toNanos());
final
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
- clientManager.receiveMessage(endpoints, metadata, request,
timeout);
+ this.getClientManager().receiveMessage(endpoints, metadata,
request, timeout);
return Futures.transformAsync(future, invocation -> {
final Iterator<ReceiveMessageResponse> it =
invocation.getResponse();
Status status =
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
@@ -151,11 +150,10 @@ abstract class ConsumerImpl extends ClientImpl {
try {
final AckMessageRequest request =
wrapAckMessageRequest(messageView);
final Metadata metadata = sign();
- future = clientManager.ackMessage(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
+ final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
+ future = this.getClientManager().ackMessage(endpoints, metadata,
request, requestTimeout);
} catch (Throwable t) {
- final SettableFuture<RpcInvocation<AckMessageResponse>> future0 =
SettableFuture.create();
- future0.setException(t);
- future = future0;
+ future = Futures.immediateFailedFuture(t);
}
Futures.addCallback(future, new
FutureCallback<RpcInvocation<AckMessageResponse>>() {
@Override
@@ -178,7 +176,7 @@ abstract class ConsumerImpl extends ClientImpl {
return future;
}
- public ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
changeInvisibleDuration(
+ ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
changeInvisibleDuration(
MessageViewImpl messageView, Duration invisibleDuration) {
final Endpoints endpoints = messageView.getEndpoints();
ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
future;
@@ -187,14 +185,12 @@ abstract class ConsumerImpl extends ClientImpl {
final List<MessageCommon> messageCommons =
Collections.singletonList(messageView.getMessageCommon());
doBefore(MessageHookPoints.CHANGE_INVISIBLE_DURATION, messageCommons);
try {
- final ChangeInvisibleDurationRequest request =
wrapChangeInvisibleDuration(messageView, invisibleDuration);
final Metadata metadata = sign();
- future = clientManager.changeInvisibleDuration(endpoints,
metadata, request,
- clientConfiguration.getRequestTimeout());
+ final ChangeInvisibleDurationRequest request =
wrapChangeInvisibleDuration(messageView, invisibleDuration);
+ final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
+ future =
this.getClientManager().changeInvisibleDuration(endpoints, metadata, request,
requestTimeout);
} catch (Throwable t) {
- final
SettableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> future0 =
SettableFuture.create();
- future0.setException(t);
- future = future0;
+ future = Futures.immediateFailedFuture(t);
}
final MessageId messageId = messageView.getMessageId();
Futures.addCallback(future, new
FutureCallback<RpcInvocation<ChangeInvisibleDurationResponse>>() {
@@ -254,14 +250,14 @@ abstract class ConsumerImpl extends ClientImpl {
return expressionBuilder.build();
}
- public ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
+ ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
FilterExpression filterExpression) {
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
.setBatchSize(batchSize).setAutoRenew(true).build();
}
- public ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
+ ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
FilterExpression filterExpression, Duration invisibleDuration) {
final com.google.protobuf.Duration duration =
Durations.fromNanos(invisibleDuration.toNanos());
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index ac9814f..12f6f61 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -133,7 +133,7 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public boolean expired() {
- final PushConsumerSettings settings =
consumer.getPushConsumerSettings();
+ final PushSubscriptionSettings settings =
consumer.getPushConsumerSettings();
Duration maxIdleDuration = Duration.ofNanos(2 *
(settings.getLongPollingTimeout().toNanos()
+
consumer.getClientConfiguration().getRequestTimeout().toNanos()));
final Duration idleDuration = Duration.ofNanos(System.nanoTime() -
activityNanoTime);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index d4f329c..a1e4332 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -61,7 +61,7 @@ import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
+import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
@@ -92,7 +92,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
final AtomicLong consumptionErrorQuantity;
private final ClientConfiguration clientConfiguration;
- private final PushConsumerSettings pushConsumerSettings;
+ private final PushSubscriptionSettings pushSubscriptionSettings;
private final String consumerGroup;
private final Map<String /* topic */, FilterExpression>
subscriptionExpressions;
private final ConcurrentMap<String /* topic */, Assignments>
cacheAssignments;
@@ -125,7 +125,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
super(clientConfiguration, consumerGroup,
subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
Resource groupResource = new Resource(consumerGroup);
- this.pushConsumerSettings = new PushConsumerSettings(clientId,
endpoints, groupResource,
+ this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId,
endpoints, groupResource,
clientConfiguration.getRequestTimeout(), subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
@@ -155,7 +155,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
LOGGER.info("Begin to start the rocketmq push consumer,
clientId={}", clientId);
super.startUp();
clientMeterProvider.setMessageCacheObserver(this);
- final ScheduledExecutorService scheduler =
clientManager.getScheduler();
+ final ScheduledExecutorService scheduler =
this.getClientManager().getScheduler();
this.consumeService = createConsumeService();
this.consumeService.startAsync().awaitRunning();
// Scan assignments periodically.
@@ -188,8 +188,8 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
}
private ConsumeService createConsumeService() {
- final ScheduledExecutorService scheduler =
clientManager.getScheduler();
- if (pushConsumerSettings.isFifo()) {
+ final ScheduledExecutorService scheduler =
this.getClientManager().getScheduler();
+ if (pushSubscriptionSettings.isFifo()) {
return new FifoConsumeService(clientId, processQueueTable,
messageListener,
consumptionExecutor, this, scheduler);
}
@@ -205,8 +205,8 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
return consumerGroup;
}
- public PushConsumerSettings getPushConsumerSettings() {
- return pushConsumerSettings;
+ public PushSubscriptionSettings getPushConsumerSettings() {
+ return pushSubscriptionSettings;
}
/**
@@ -270,7 +270,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
final Metadata metadata = sign();
final QueryAssignmentRequest request =
wrapQueryAssignmentRequest(topic);
final Duration requestTimeout =
clientConfiguration.getRequestTimeout();
- return clientManager.queryAssignment(endpoints, metadata,
request, requestTimeout);
+ return this.getClientManager().queryAssignment(endpoints,
metadata, request, requestTimeout);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(responseFuture, invocation -> {
final QueryAssignmentResponse response = invocation.getResponse();
@@ -415,8 +415,8 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
}
@Override
- public ClientSettings getClientSettings() {
- return pushConsumerSettings;
+ public Settings getSettings() {
+ return pushSubscriptionSettings;
}
/**
@@ -519,7 +519,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
final ForwardMessageToDeadLetterQueueRequest request =
wrapForwardMessageToDeadLetterQueueRequest(messageView);
final Metadata metadata = sign();
- future = clientManager.forwardMessageToDeadLetterQueue(endpoints,
metadata, request,
+ future =
this.getClientManager().forwardMessageToDeadLetterQueue(endpoints, metadata,
request,
clientConfiguration.getRequestTimeout());
} catch (Throwable t) {
future = Futures.immediateFailedFuture(t);
@@ -560,7 +560,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
}
public RetryPolicy getRetryPolicy() {
- return pushConsumerSettings.getRetryPolicy();
+ return pushSubscriptionSettings.getRetryPolicy();
}
public ThreadPoolExecutor getConsumptionExecutor() {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
similarity index 83%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 05bfb2f..5784517 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -19,11 +19,9 @@ package org.apache.rocketmq.client.java.impl.consumer;
import apache.rocketmq.v2.FilterType;
import apache.rocketmq.v2.RetryPolicy;
-import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.Subscription;
import apache.rocketmq.v2.SubscriptionEntry;
import com.google.common.base.MoreObjects;
-import com.google.common.util.concurrent.Futures;
import com.google.protobuf.util.Durations;
import java.time.Duration;
import java.util.ArrayList;
@@ -31,8 +29,8 @@ import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
import org.apache.rocketmq.client.java.impl.ClientType;
+import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.impl.UserAgent;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
@@ -41,8 +39,8 @@ import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PushConsumerSettings extends ClientSettings {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PushConsumerSettings.class);
+public class PushSubscriptionSettings extends Settings {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PushSubscriptionSettings.class);
private final Resource group;
private final Map<String, FilterExpression> subscriptionExpressions;
@@ -50,9 +48,9 @@ public class PushConsumerSettings extends ClientSettings {
private volatile int receiveBatchSize = 32;
private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
- public PushConsumerSettings(String clientId, Endpoints accessPoint,
Resource group, Duration requestTimeout,
- Map<String, FilterExpression> subscriptionExpression) {
- super(clientId, ClientType.PUSH_CONSUMER, accessPoint, requestTimeout);
+ public PushSubscriptionSettings(String clientId, Endpoints endpoints,
Resource group,
+ Duration requestTimeout, Map<String, FilterExpression>
subscriptionExpression) {
+ super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
}
@@ -70,7 +68,7 @@ public class PushConsumerSettings extends ClientSettings {
}
@Override
- public Settings toProtobuf() {
+ public apache.rocketmq.v2.Settings toProtobuf() {
List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
for (Map.Entry<String, FilterExpression> entry :
subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
@@ -95,15 +93,15 @@ public class PushConsumerSettings extends ClientSettings {
}
Subscription subscription =
Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
- return
Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
-
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setSubscription(subscription)
- .setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
+ return
apache.rocketmq.v2.Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf())
+
.setClientType(clientType.toProtobuf()).setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos()))
+
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
}
@Override
- public void applySettingsCommand(Settings settings) {
- final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
- if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
+ public void sync(apache.rocketmq.v2.Settings settings) {
+ final apache.rocketmq.v2.Settings.PubSubCase pubSubCase =
settings.getPubSubCase();
+ if
(!apache.rocketmq.v2.Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
LOGGER.error("[Bug] Issued settings not match with the client
type, clientId={}, pubSubCase={}, "
+ "clientType={}", clientId, pubSubCase, clientType);
return;
@@ -123,7 +121,6 @@ public class PushConsumerSettings extends ClientSettings {
default:
throw new IllegalArgumentException("Unrecognized backoff
policy strategy.");
}
- this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
}
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index e5f9618..bb1500d 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -43,7 +43,7 @@ import
org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.exception.StatusChecker;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
+import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleConsumerImpl.class);
- private final SimpleConsumerSettings simpleConsumerSettings;
+ private final SimpleSubscriptionSettings simpleSubscriptionSettings;
private final String consumerGroup;
private final Duration awaitDuration;
@@ -72,8 +72,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
Map<String, FilterExpression> subscriptionExpressions) {
super(clientConfiguration, consumerGroup,
subscriptionExpressions.keySet());
Resource groupResource = new Resource(consumerGroup);
- this.simpleConsumerSettings = new SimpleConsumerSettings(clientId,
endpoints, groupResource,
- clientConfiguration.getRequestTimeout(), awaitDuration,
subscriptionExpressions);
+ this.simpleSubscriptionSettings = new
SimpleSubscriptionSettings(clientId, endpoints,
+ groupResource, clientConfiguration.getRequestTimeout(),
awaitDuration, subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.awaitDuration = awaitDuration;
@@ -294,8 +294,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
}
@Override
- public ClientSettings getClientSettings() {
- return simpleConsumerSettings;
+ public Settings getSettings() {
+ return simpleSubscriptionSettings;
}
public void onTopicRouteDataUpdate0(String topic, TopicRouteData
topicRouteData) {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
similarity index 76%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 93816c8..0d4b9f4 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -18,11 +18,9 @@
package org.apache.rocketmq.client.java.impl.consumer;
import apache.rocketmq.v2.FilterType;
-import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.Subscription;
import apache.rocketmq.v2.SubscriptionEntry;
import com.google.common.base.MoreObjects;
-import com.google.common.util.concurrent.Futures;
import com.google.protobuf.util.Durations;
import java.time.Duration;
import java.util.ArrayList;
@@ -30,36 +28,36 @@ import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
import org.apache.rocketmq.client.java.impl.ClientType;
+import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.impl.UserAgent;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SimpleConsumerSettings extends ClientSettings {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleConsumerSettings.class);
+public class SimpleSubscriptionSettings extends Settings {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleSubscriptionSettings.class);
private final Resource group;
private final Duration longPollingTimeout;
private final Map<String, FilterExpression> subscriptionExpressions;
- public SimpleConsumerSettings(String clientId, Endpoints accessPoint,
Resource group, Duration requestTimeout,
- Duration longPollingTimeout, Map<String, FilterExpression>
subscriptionExpression) {
- super(clientId, ClientType.SIMPLE_CONSUMER, accessPoint,
requestTimeout);
+ public SimpleSubscriptionSettings(String clientId, Endpoints endpoints,
Resource group,
+ Duration requestTimeout, Duration longPollingTimeout, Map<String,
FilterExpression> subscriptionExpression) {
+ super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
this.longPollingTimeout = longPollingTimeout;
}
@Override
- public Settings toProtobuf() {
+ public apache.rocketmq.v2.Settings toProtobuf() {
List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
for (Map.Entry<String, FilterExpression> entry :
subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
- apache.rocketmq.v2.Resource topic =
-
apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+ apache.rocketmq.v2.Resource topic =
apache.rocketmq.v2.Resource.newBuilder()
+ .setName(entry.getKey()).build();
final apache.rocketmq.v2.FilterExpression.Builder
expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type =
filterExpression.getFilterExpressionType();
@@ -80,20 +78,18 @@ public class SimpleConsumerSettings extends ClientSettings {
Subscription subscription =
Subscription.newBuilder().setGroup(group.toProtobuf())
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.addAllSubscriptions(subscriptionEntries).build();
- return
Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
-
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setSubscription(subscription)
- .setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
+ return
apache.rocketmq.v2.Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf())
+
.setClientType(clientType.toProtobuf()).setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos()))
+
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
}
@Override
- public void applySettingsCommand(Settings settings) {
- final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
- if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
+ public void sync(apache.rocketmq.v2.Settings settings) {
+ final apache.rocketmq.v2.Settings.PubSubCase pubSubCase =
settings.getPubSubCase();
+ if
(!apache.rocketmq.v2.Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
LOGGER.error("[Bug] Issued settings not match with the client
type, clientId={}, pubSubCase={}, "
+ "clientType={}", clientId, pubSubCase, clientType);
- return;
}
- this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
}
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index 231de9c..cf7743c 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -22,7 +22,6 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
-import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.client.apis.ClientException;
@@ -47,11 +46,6 @@ public interface ClientSessionHandler {
*/
boolean isEndpointsDeprecated(Endpoints endpoints);
- /**
- * Await the settings to be synchronized with the server.
- */
- ListenableFuture<Void> awaitSettingSynchronized();
-
/**
* Indicates the client identifier.
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 59a2bd9..39c9391 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -64,7 +64,7 @@ import
org.apache.rocketmq.client.java.exception.TooManyRequestsException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.impl.ClientImpl;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
+import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.message.MessageType;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory;
class ProducerImpl extends ClientImpl implements Producer {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerImpl.class);
- protected final ProducerSettings producerSettings;
+ protected final PublishingSettings publishingSettings;
final ConcurrentMap<String/* topic */, PublishingLoadBalancer>
publishingRouteDataCache;
private final TransactionChecker checker;
@@ -99,7 +99,7 @@ class ProducerImpl extends ClientImpl implements Producer {
TransactionChecker checker) {
super(clientConfiguration, topics);
ExponentialBackoffRetryPolicy retryPolicy =
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
- this.producerSettings = new ProducerSettings(clientId, endpoints,
retryPolicy,
+ this.publishingSettings = new PublishingSettings(clientId, endpoints,
retryPolicy,
clientConfiguration.getRequestTimeout(), topics);
this.checker = checker;
this.publishingRouteDataCache = new ConcurrentHashMap<>();
@@ -177,8 +177,8 @@ class ProducerImpl extends ClientImpl implements Producer {
}
@Override
- public ClientSettings getClientSettings() {
- return producerSettings;
+ public Settings getSettings() {
+ return publishingSettings;
}
@Override
@@ -216,8 +216,7 @@ class ProducerImpl extends ClientImpl implements Producer {
} catch (Throwable t) {
throw new ClientException(t);
}
- final ListenableFuture<List<SendReceiptImpl>> future =
send(Collections.singletonList(publishingMessage),
- true);
+ final ListenableFuture<List<SendReceiptImpl>> future =
send(Collections.singletonList(publishingMessage), true);
final List<SendReceiptImpl> receipts = handleClientFuture(future);
final SendReceiptImpl sendReceipt = receipts.iterator().next();
((TransactionImpl) transaction).tryAddReceipt(publishingMessage,
sendReceipt);
@@ -282,7 +281,7 @@ class ProducerImpl extends ClientImpl implements Producer {
doBefore(messageHookPoints, messageCommons);
final ListenableFuture<RpcInvocation<EndTransactionResponse>> future =
- clientManager.endTransaction(endpoints, metadata, request,
requestTimeout);
+ this.getClientManager().endTransaction(endpoints, metadata,
request, requestTimeout);
Futures.addCallback(future, new
FutureCallback<RpcInvocation<EndTransactionResponse>>() {
@Override
public void onSuccess(RpcInvocation<EndTransactionResponse>
invocation) {
@@ -318,7 +317,7 @@ class ProducerImpl extends ClientImpl implements Producer {
}
private RetryPolicy getRetryPolicy() {
- return producerSettings.getRetryPolicy();
+ return publishingSettings.getRetryPolicy();
}
/**
@@ -343,7 +342,7 @@ class ProducerImpl extends ClientImpl implements Producer {
List<PublishingMessageImpl> pubMessages = new ArrayList<>();
for (Message message : messages) {
try {
- final PublishingMessageImpl pubMessage = new
PublishingMessageImpl(message, producerSettings,
+ final PublishingMessageImpl pubMessage = new
PublishingMessageImpl(message, publishingSettings,
txEnabled);
pubMessages.add(pubMessage);
} catch (Throwable t) {
@@ -428,7 +427,7 @@ class ProducerImpl extends ClientImpl implements Producer {
List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
final SendMessageRequest request = wrapSendMessageRequest(pubMessages,
mq);
final ListenableFuture<RpcInvocation<SendMessageResponse>> future0 =
- clientManager.sendMessage(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
+ this.getClientManager().sendMessage(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
return Futures.transformAsync(future0,
invocation ->
Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq,
invocation)),
MoreExecutors.directExecutor());
@@ -447,7 +446,7 @@ class ProducerImpl extends ClientImpl implements Producer {
// Calculate the current message queue.
final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1,
candidates.size()));
final List<MessageType> acceptMessageTypes =
mq.getAcceptMessageTypes();
- if (producerSettings.isValidateMessageType() &&
!acceptMessageTypes.contains(messageType)) {
+ if (publishingSettings.isValidateMessageType() &&
!acceptMessageTypes.contains(messageType)) {
final IllegalArgumentException e = new
IllegalArgumentException("Current message type not match with "
+ "topic accept message types, topic=" + topic + ",
actualMessageType=" + messageType + ", "
+ "acceptMessageTypes=" + acceptMessageTypes);
@@ -536,8 +535,8 @@ class ProducerImpl extends ClientImpl implements Producer {
LOGGER.warn("Failed to send message due to too many requests,
would attempt to resend after {}, "
+ "maxAttempts={}, attempt={}, topic={},
messageId(s)={}, endpoints={}, clientId={}", delay,
maxAttempts, attempt, topic, messageIds, endpoints,
clientId, t);
- clientManager.getScheduler().schedule(() -> send0(future0,
topic, messageType, candidates, messages,
- nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
+
ProducerImpl.this.getClientManager().getScheduler().schedule(() ->
send0(future0, topic, messageType,
+ candidates, messages, nextAttempt), delay.toNanos(),
TimeUnit.NANOSECONDS);
}
}, clientCallbackExecutor);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
similarity index 84%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index 4919927..2649ea9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -19,15 +19,13 @@ package org.apache.rocketmq.client.java.impl.producer;
import apache.rocketmq.v2.Publishing;
import apache.rocketmq.v2.Resource;
-import apache.rocketmq.v2.Settings;
import com.google.common.base.MoreObjects;
-import com.google.common.util.concurrent.Futures;
import com.google.protobuf.util.Durations;
import java.time.Duration;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.rocketmq.client.java.impl.ClientSettings;
import org.apache.rocketmq.client.java.impl.ClientType;
+import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.impl.UserAgent;
import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
@@ -35,8 +33,8 @@ import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ProducerSettings extends ClientSettings {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerSettings.class);
+public class PublishingSettings extends Settings {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PublishingSettings.class);
private final Set<String> topics;
/**
@@ -45,7 +43,7 @@ public class ProducerSettings extends ClientSettings {
private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
private volatile boolean validateMessageType = true;
- public ProducerSettings(String clientId, Endpoints accessPoint,
ExponentialBackoffRetryPolicy retryPolicy,
+ public PublishingSettings(String clientId, Endpoints accessPoint,
ExponentialBackoffRetryPolicy retryPolicy,
Duration requestTimeout, Set<String> topics) {
super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy,
requestTimeout);
this.topics = topics;
@@ -60,20 +58,20 @@ public class ProducerSettings extends ClientSettings {
}
@Override
- public Settings toProtobuf() {
+ public apache.rocketmq.v2.Settings toProtobuf() {
final Publishing publishing = Publishing.newBuilder()
.addAllTopics(topics.stream().map(name ->
Resource.newBuilder().setName(name).build())
.collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
- final Settings.Builder builder = Settings.newBuilder()
+ final apache.rocketmq.v2.Settings.Builder builder =
apache.rocketmq.v2.Settings.newBuilder()
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
return
builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
}
@Override
- public void applySettingsCommand(Settings settings) {
- final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
- if (!Settings.PubSubCase.PUBLISHING.equals(pubSubCase)) {
+ public void sync(apache.rocketmq.v2.Settings settings) {
+ final apache.rocketmq.v2.Settings.PubSubCase pubSubCase =
settings.getPubSubCase();
+ if
(!apache.rocketmq.v2.Settings.PubSubCase.PUBLISHING.equals(pubSubCase)) {
LOGGER.error("[Bug] Issued settings not match with the client
type, clientId={}, pubSubCase={}, "
+ "clientType={}", clientId, pubSubCase, clientType);
return;
@@ -84,7 +82,6 @@ public class ProducerSettings extends ClientSettings {
this.retryPolicy = exist.inheritBackoff(backoffPolicy);
this.validateMessageType =
settings.getPublishing().getValidateMessageType();
this.maxBodySizeBytes = publishing.getMaxBodySize();
- this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
}
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index aef1098..986e8b4 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -31,11 +31,8 @@ import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
class TransactionImpl implements Transaction {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerImpl.class);
private static final int MAX_MESSAGE_NUM = 1;
private final ProducerImpl producerImpl;
@@ -54,7 +51,7 @@ class TransactionImpl implements Transaction {
public PublishingMessageImpl tryAddMessage(Message message) throws
IOException {
messagesLock.readLock().lock();
try {
- if (messages.size() > MAX_MESSAGE_NUM) {
+ if (messages.size() >= MAX_MESSAGE_NUM) {
throw new IllegalArgumentException("Message in transaction has
exceeded the threshold: " +
MAX_MESSAGE_NUM);
}
@@ -68,7 +65,7 @@ class TransactionImpl implements Transaction {
MAX_MESSAGE_NUM);
}
final PublishingMessageImpl publishingMessage = new
PublishingMessageImpl(message,
- producerImpl.producerSettings, true);
+ producerImpl.publishingSettings, true);
messages.add(publishingMessage);
return publishingMessage;
} finally {
@@ -80,8 +77,7 @@ class TransactionImpl implements Transaction {
messagesLock.readLock().lock();
try {
if (!messages.contains(publishingMessage)) {
- LOGGER.warn("message(s) is not contained in current
transaction");
- return;
+ throw new IllegalArgumentException("Message not in
transaction");
}
messageSendReceiptMap.put(publishingMessage, sendReceipt);
} finally {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 5edfab1..306ff5c 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import java.util.Optional;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
-import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
+import org.apache.rocketmq.client.java.impl.producer.PublishingSettings;
import org.apache.rocketmq.client.java.message.protocol.Encoding;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
@@ -39,12 +39,12 @@ public class PublishingMessageImpl extends MessageImpl {
private final MessageType messageType;
private volatile String traceContext;
- public PublishingMessageImpl(Message message, ProducerSettings
producerSettings, boolean txEnabled)
+ public PublishingMessageImpl(Message message, PublishingSettings
publishingSettings, boolean txEnabled)
throws IOException {
super(message);
this.traceContext = null;
final int length = message.getBody().remaining();
- final int maxBodySizeBytes = producerSettings.getMaxBodySizeBytes();
+ final int maxBodySizeBytes = publishingSettings.getMaxBodySizeBytes();
if (length > maxBodySizeBytes) {
throw new IOException("Message body size exceeds the threshold,
max size=" + maxBodySizeBytes + " bytes");
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
index 6b76d78..d0297eb 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
@@ -22,8 +22,8 @@ import com.google.common.base.Objects;
import org.apache.commons.lang3.StringUtils;
public class Resource {
- private String namespace;
- private String name;
+ private final String namespace;
+ private final String name;
public Resource(String namespace, String name) {
this.namespace = namespace;
@@ -52,14 +52,6 @@ public class Resource {
return this.name;
}
- public void setNamespace(String namespace) {
- this.namespace = namespace;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
index d036c14..ef8803f 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
@@ -56,9 +56,9 @@ public class Utilities {
private static final String HOST_NAME_NOT_FOUND = "HOST_NAME_NOT_FOUND";
- private static String protocolVersion = null;
- private static String hostName = null;
- private static byte[] macAddress = null;
+ private static final ThreadLocal<String> PROTOCOL_VERSION_THREAD_LOCAL =
new ThreadLocal<>();
+ private static final ThreadLocal<String> HOST_NAME_THREAD_LOCAL = new
ThreadLocal<>();
+ private static final ThreadLocal<byte[]> MAC_ADDRESS_THREAD_LOCAL = new
ThreadLocal<>();
/**
* Used to build output as Hex
@@ -78,6 +78,7 @@ public class Utilities {
}
public static byte[] macAddress() {
+ byte[] macAddress = MAC_ADDRESS_THREAD_LOCAL.get();
if (null != macAddress) {
return macAddress.clone();
}
@@ -90,6 +91,7 @@ public class Utilities {
continue;
}
macAddress = mac;
+ MAC_ADDRESS_THREAD_LOCAL.set(macAddress);
return macAddress.clone();
}
} catch (Throwable ignore) {
@@ -98,14 +100,17 @@ public class Utilities {
byte[] randomBytes = new byte[6];
RANDOM.nextBytes(randomBytes);
macAddress = randomBytes;
+ MAC_ADDRESS_THREAD_LOCAL.set(macAddress);
return macAddress.clone();
}
public static String getProtocolVersion() {
+ String protocolVersion = PROTOCOL_VERSION_THREAD_LOCAL.get();
if (null != protocolVersion) {
return protocolVersion;
}
protocolVersion =
ReceiveMessageRequest.class.getName().split("\\.")[2];
+ PROTOCOL_VERSION_THREAD_LOCAL.set(protocolVersion);
return protocolVersion;
}
@@ -125,14 +130,17 @@ public class Utilities {
}
public static String hostName() {
+ String hostName = HOST_NAME_THREAD_LOCAL.get();
if (null != hostName) {
return hostName;
}
try {
hostName = InetAddress.getLocalHost().getHostName();
+ HOST_NAME_THREAD_LOCAL.set(hostName);
return hostName;
} catch (Throwable ignore) {
hostName = HOST_NAME_NOT_FOUND;
+ HOST_NAME_THREAD_LOCAL.set(hostName);
return hostName;
}
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index 6a240cc..5f84699 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -33,12 +33,15 @@ import org.apache.rocketmq.client.java.tool.TestBase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
public class ClientManagerImplTest extends TestBase {
- private static final ClientManagerImpl CLIENT_MANAGER = new
ClientManagerImpl(null);
+ private static final Client CLIENT = Mockito.mock(Client.class);
+ private static final ClientManagerImpl CLIENT_MANAGER = new
ClientManagerImpl(CLIENT);
@BeforeClass
public static void setUp() {
+ Mockito.when(CLIENT.clientId()).thenReturn("clientId");
CLIENT_MANAGER.startAsync().awaitRunning();
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
new file mode 100644
index 0000000..8dd7be5
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.time.Duration;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ConsumeServiceTest extends TestBase {
+ private final String clientId = "clientId";
+ private final ConcurrentMap<MessageQueueImpl, ProcessQueue> table = new
ConcurrentHashMap<>();
+ private final MessageInterceptor interceptor =
Mockito.mock(MessageInterceptor.class);
+ private final ThreadPoolExecutor consumptionExecutor = new
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(), new
ThreadFactoryImpl("TestMessageConsumption"));
+ private final ScheduledExecutorService scheduler = new
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+ "TestScheduler"));
+
+
+ @Test
+ public void testConsumeSuccess() throws ExecutionException,
InterruptedException, TimeoutException {
+ final MessageListener messageListener = messageView ->
ConsumeResult.SUCCESS;
+ final ConsumeService consumeService = new ConsumeService(clientId,
table, messageListener,
+ consumptionExecutor, interceptor, scheduler) {
+ @Override
+ public void dispatch() {
+ }
+ };
+ final MessageViewImpl messageView = fakeMessageViewImpl();
+ final ListenableFuture<ConsumeResult> future =
consumeService.consume(messageView);
+ final ConsumeResult consumeResult = future.get(1000,
TimeUnit.MILLISECONDS);
+ assertEquals(ConsumeResult.SUCCESS, consumeResult);
+ }
+
+ @Test
+ public void testConsumeFailure() throws ExecutionException,
InterruptedException, TimeoutException {
+ final MessageListener messageListener = messageView ->
ConsumeResult.FAILURE;
+ final ConsumeService consumeService = new ConsumeService(clientId,
table, messageListener,
+ consumptionExecutor, interceptor, scheduler) {
+ @Override
+ public void dispatch() {
+ }
+ };
+ final MessageViewImpl messageView = fakeMessageViewImpl();
+ final ListenableFuture<ConsumeResult> future =
consumeService.consume(messageView);
+ final ConsumeResult consumeResult = future.get(1000,
TimeUnit.MILLISECONDS);
+ assertEquals(ConsumeResult.FAILURE, consumeResult);
+ }
+
+ @Test
+ public void testConsumeWithException() throws ExecutionException,
InterruptedException, TimeoutException {
+ final MessageListener messageListener = messageView -> {
+ throw new RuntimeException();
+ };
+ final ConsumeService consumeService = new ConsumeService(clientId,
table, messageListener,
+ consumptionExecutor, interceptor, scheduler) {
+ @Override
+ public void dispatch() {
+ }
+ };
+ final MessageViewImpl messageView = fakeMessageViewImpl();
+ final ListenableFuture<ConsumeResult> future =
consumeService.consume(messageView);
+ final ConsumeResult consumeResult = future.get(1000,
TimeUnit.MILLISECONDS);
+ assertEquals(ConsumeResult.FAILURE, consumeResult);
+ }
+
+ @Test
+ public void testConsumeWithDelay() throws ExecutionException,
InterruptedException {
+ final MessageListener messageListener = messageView ->
ConsumeResult.SUCCESS;
+ final ConsumeService consumeService = new ConsumeService(clientId,
table, messageListener,
+ consumptionExecutor, interceptor, scheduler) {
+ @Override
+ public void dispatch() {
+ }
+ };
+ final MessageViewImpl messageView = fakeMessageViewImpl();
+ final ListenableFuture<ConsumeResult> future =
consumeService.consume(messageView, Duration.ofMillis(500));
+ final ConsumeResult consumeResult = future.get();
+ assertEquals(ConsumeResult.SUCCESS, consumeResult);
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
new file mode 100644
index 0000000..3cc63f0
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ConsumeTaskTest extends TestBase {
+
+ @Test
+ public void testCallWithConsumeSuccess() {
+ String clientId = "foobar";
+ final MessageViewImpl messageView = fakeMessageViewImpl();
+ final MessageListener messageListener =
Mockito.mock(MessageListener.class);
+
Mockito.when(messageListener.consume(messageView)).thenReturn(ConsumeResult.SUCCESS);
+ final MessageInterceptor messageInterceptor =
Mockito.mock(MessageInterceptor.class);
+ final ConsumeTask consumeTask = new ConsumeTask(clientId,
messageListener, messageView, messageInterceptor);
+ final ConsumeResult consumeResult = consumeTask.call();
+ assertEquals(ConsumeResult.SUCCESS, consumeResult);
+ }
+
+ @Test
+ public void testCallWithConsumeException() {
+ String clientId = "foobar";
+ final MessageViewImpl messageView = fakeMessageViewImpl();
+ final MessageListener messageListener =
Mockito.mock(MessageListener.class);
+ Mockito.when(messageListener.consume(messageView)).thenThrow(new
RuntimeException());
+ final MessageInterceptor messageInterceptor =
Mockito.mock(MessageInterceptor.class);
+ final ConsumeTask consumeTask = new ConsumeTask(clientId,
messageListener, messageView, messageInterceptor);
+ final ConsumeResult consumeResult = consumeTask.call();
+ assertEquals(ConsumeResult.FAILURE, consumeResult);
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
new file mode 100644
index 0000000..b115fb2
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+
+import apache.rocketmq.v2.AckMessageRequest;
+import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
+import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
+import apache.rocketmq.v2.ReceiveMessageRequest;
+import apache.rocketmq.v2.ReceiveMessageResponse;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.Metadata;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.impl.ClientManager;
+import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerImplTest extends TestBase {
+ private final Map<String, FilterExpression> subscriptionExpressions =
createSubscriptionExpressions(FAKE_TOPIC_0);
+ private final MessageListener messageListener = messageView ->
ConsumeResult.SUCCESS;
+ private final ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(FAKE_ENDPOINTS).build();
+
+ @Test
+ public void testReceiveMessage() throws ExecutionException,
InterruptedException {
+ int maxCacheMessageCount = 8;
+ int maxCacheMessageSizeInBytes = 1024;
+ int consumptionThreadCount = 4;
+ PushConsumerImpl pushConsumer = Mockito.spy(new
PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0,
+ subscriptionExpressions, messageListener, maxCacheMessageCount,
maxCacheMessageSizeInBytes,
+ consumptionThreadCount));
+ final ClientManager clientManager = Mockito.mock(ClientManager.class);
+ Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
+ int receivedMessageCount = 1;
+ final
ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
+ okReceiveMessageResponsesFuture(FAKE_TOPIC_0,
receivedMessageCount);
+
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
any(Metadata.class),
+ any(ReceiveMessageRequest.class), any(Duration.class));
+ final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
+ final ReceiveMessageRequest request =
pushConsumer.wrapReceiveMessageRequest(1,
+ mq, new FilterExpression(), Duration.ofSeconds(15));
+ final ListenableFuture<ReceiveMessageResult> future0 =
+ pushConsumer.receiveMessage(request, mq, Duration.ofSeconds(15));
+ final ReceiveMessageResult receiveMessageResult = future0.get();
+ Assert.assertEquals(receiveMessageResult.getMessageViews().size(),
receivedMessageCount);
+ }
+
+ @Test
+ public void testAckMessage() throws ExecutionException,
InterruptedException {
+ int maxCacheMessageCount = 8;
+ int maxCacheMessageSizeInBytes = 1024;
+ int consumptionThreadCount = 4;
+ PushConsumerImpl pushConsumer = Mockito.spy(new
PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0,
+ subscriptionExpressions, messageListener, maxCacheMessageCount,
maxCacheMessageSizeInBytes,
+ consumptionThreadCount));
+ final ClientManager clientManager = Mockito.mock(ClientManager.class);
+ Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
+ final ListenableFuture<RpcInvocation<AckMessageResponse>> future =
+ okAckMessageResponseFuture();
+
Mockito.doReturn(future).when(clientManager).ackMessage(any(Endpoints.class),
any(Metadata.class),
+ any(AckMessageRequest.class), any(Duration.class));
+ final MessageViewImpl messageView = fakeMessageViewImpl();
+ final ListenableFuture<RpcInvocation<AckMessageResponse>> future0 =
+ pushConsumer.ackMessage(messageView);
+ final RpcInvocation<AckMessageResponse> rpcInvocation = future0.get();
+ Assert.assertEquals(rpcInvocation, future.get());
+ }
+
+ @Test
+ public void testChangeInvisibleDuration() throws ExecutionException,
InterruptedException {
+ int maxCacheMessageCount = 8;
+ int maxCacheMessageSizeInBytes = 1024;
+ int consumptionThreadCount = 4;
+ PushConsumerImpl pushConsumer = Mockito.spy(new
PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0,
+ subscriptionExpressions, messageListener, maxCacheMessageCount,
maxCacheMessageSizeInBytes,
+ consumptionThreadCount));
+ final ClientManager clientManager = Mockito.mock(ClientManager.class);
+ Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
+ final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
future =
+ okChangeInvisibleDurationCtxFuture();
+
Mockito.doReturn(future).when(clientManager).changeInvisibleDuration(any(Endpoints.class),
any(Metadata.class),
+ any(ChangeInvisibleDurationRequest.class), any(Duration.class));
+ final MessageViewImpl messageView = fakeMessageViewImpl();
+ final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
future0 =
+ pushConsumer.changeInvisibleDuration(messageView,
Duration.ofSeconds(15));
+ final RpcInvocation<ChangeInvisibleDurationResponse> rpcInvocation =
future0.get();
+ Assert.assertEquals(rpcInvocation, future.get());
+ }
+
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeServiceTest.java
similarity index 68%
copy from
java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
copy to
java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeServiceTest.java
index e1298bd..19eb29b 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeServiceTest.java
@@ -24,8 +24,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
@@ -37,14 +37,12 @@ import
org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Ignore;
import org.junit.Test;
-public class StandardConsumeServiceTest extends TestBase {
+public class FifoConsumeServiceTest extends TestBase {
@Test
- @Ignore
- public void testDispatch0() {
+ public void testDispatch() throws InterruptedException {
final ProcessQueue processQueue0 = mock(ProcessQueue.class);
final ProcessQueue processQueue1 = mock(ProcessQueue.class);
@@ -55,14 +53,20 @@ public class StandardConsumeServiceTest extends TestBase {
processQueueTable.put(messageQueue0, processQueue0);
processQueueTable.put(messageQueue1, processQueue1);
- final MessageViewImpl messageView0 =
fakeMessageViewImpl(messageQueue0);
- final MessageViewImpl messageView1 =
fakeMessageViewImpl(messageQueue1);
+ final MessageViewImpl messageView00 =
fakeMessageViewImpl(messageQueue0);
+ final MessageViewImpl messageView01 =
fakeMessageViewImpl(messageQueue0);
+ List<MessageViewImpl> messageViewList0 = new ArrayList<>();
+ messageViewList0.add(messageView00);
+ messageViewList0.add(messageView01);
-
when(processQueue0.tryTakeMessage()).thenReturn(Optional.of(messageView0));
-
when(processQueue1.tryTakeMessage()).thenReturn(Optional.of(messageView1));
+ final MessageViewImpl messageView10 =
fakeMessageViewImpl(messageQueue1);
+ List<MessageViewImpl> messageViewList1 = new ArrayList<>();
+ messageViewList1.add(messageView10);
- MessageListener listener = messageView -> ConsumeResult.SUCCESS;
+
when(processQueue0.tryTakeFifoMessages()).thenReturn(messageViewList0.iterator());
+
when(processQueue1.tryTakeFifoMessages()).thenReturn(messageViewList1.iterator());
+ MessageListener listener = messageView -> ConsumeResult.SUCCESS;
MessageInterceptor interceptor = new MessageInterceptor() {
@Override
public void doBefore(MessageHookPoints messageHookPoints,
List<MessageCommon> messageCommons) {
@@ -73,10 +77,14 @@ public class StandardConsumeServiceTest extends TestBase {
Duration duration, MessageHookPointsStatus status) {
}
};
- final StandardConsumeService service = new
StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
+ final FifoConsumeService service = new
FifoConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
- service.dispatch0();
- verify(processQueue0,
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
- verify(processQueue0,
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
+ service.dispatch();
+ Thread.sleep(1000);
+ verify(processQueue0, times(1)).tryTakeFifoMessages();
+ verify(processQueue1, times(1)).tryTakeFifoMessages();
+ verify(processQueue0,
times(2)).eraseFifoMessage(any(MessageViewImpl.class),
any(ConsumeResult.class));
+ verify(processQueue1,
times(1)).eraseFifoMessage(any(MessageViewImpl.class),
any(ConsumeResult.class));
+
}
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 7af67d5..bc92717 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -67,7 +67,7 @@ public class ProcessQueueImplTest extends TestBase {
@Mock
private ConsumeService consumeService;
@Mock
- private PushConsumerSettings pushConsumerSettings;
+ private PushSubscriptionSettings pushSubscriptionSettings;
@Mock
private RetryPolicy retryPolicy;
@@ -94,7 +94,7 @@ public class ProcessQueueImplTest extends TestBase {
field1.setAccessible(true);
field1.set(pushConsumer, consumptionErrorQuantity);
-
when(pushConsumer.getPushConsumerSettings()).thenReturn(pushConsumerSettings);
+
when(pushConsumer.getPushConsumerSettings()).thenReturn(pushSubscriptionSettings);
when(pushConsumer.getScheduler()).thenReturn(SCHEDULER);
this.receivedMessagesQuantity = new AtomicLong(0);
@@ -104,9 +104,9 @@ public class ProcessQueueImplTest extends TestBase {
@Test
public void testExpired() {
-
when(pushConsumerSettings.getLongPollingTimeout()).thenReturn(Duration.ofSeconds(3));
+
when(pushSubscriptionSettings.getLongPollingTimeout()).thenReturn(Duration.ofSeconds(3));
when(pushConsumer.getClientConfiguration()).thenReturn(ClientConfiguration.newBuilder()
- .setEndpoints(FAKE_ACCESS_POINT).build());
+ .setEndpoints(FAKE_ENDPOINTS).build());
assertFalse(processQueue.expired());
}
@@ -124,7 +124,7 @@ public class ProcessQueueImplTest extends TestBase {
messageViewList.add(corruptedMessageView0);
when(retryPolicy.getNextAttemptDelay(anyInt())).thenReturn(Duration.ofSeconds(1));
when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
- when(pushConsumerSettings.isFifo()).thenReturn(false);
+ when(pushSubscriptionSettings.isFifo()).thenReturn(false);
when(pushConsumer.changeInvisibleDuration(any(MessageViewImpl.class),
any(Duration.class)))
.thenReturn(okChangeInvisibleDurationCtxFuture());
processQueue.cacheMessages(messageViewList);
@@ -137,7 +137,7 @@ public class ProcessQueueImplTest extends TestBase {
final MessageViewImpl corruptedMessageView0 =
fakeMessageViewImpl(true);
List<MessageViewImpl> messageViewList = new ArrayList<>();
messageViewList.add(corruptedMessageView0);
- when(pushConsumerSettings.isFifo()).thenReturn(true);
+ when(pushSubscriptionSettings.isFifo()).thenReturn(true);
when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)))
.thenReturn(okForwardMessageToDeadLetterQueueResponseFuture());
processQueue.cacheMessages(messageViewList);
@@ -167,7 +167,7 @@ public class ProcessQueueImplTest extends TestBase {
future0.set(receiveMessageResult);
when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class),
any(MessageQueueImpl.class),
any(Duration.class))).thenReturn(future0);
- when(pushConsumerSettings.getReceiveBatchSize()).thenReturn(32);
+ when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
ReceiveMessageRequest request =
ReceiveMessageRequest.newBuilder().build();
when(pushConsumer.wrapReceiveMessageRequest(anyInt(),
any(MessageQueueImpl.class),
any(FilterExpression.class))).thenReturn(request);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
index b1251fb..8eaaf52 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
@@ -65,8 +65,8 @@ public class PushConsumerBuilderImplTest extends TestBase {
public void testBuildWithoutExpressions() throws ClientException {
final PushConsumerBuilderImpl builder = new PushConsumerBuilderImpl();
ClientConfiguration clientConfiguration =
-
ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
-
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
+
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_CONSUMER_GROUP_0)
.setMessageListener(messageView -> ConsumeResult.SUCCESS)
.build();
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index ba65c05..673e711 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -52,10 +52,10 @@ public class PushConsumerImplTest extends TestBase {
private final int consumptionThreadCount = 4;
private final ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(FAKE_ACCESS_POINT).build();
+ .setEndpoints(FAKE_ENDPOINTS).build();
@Spy
- private final PushConsumerImpl pushConsumer = new
PushConsumerImpl(clientConfiguration, FAKE_GROUP_0,
+ private final PushConsumerImpl pushConsumer = new
PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0,
subscriptionExpressions, messageListener, maxCacheMessageCount,
maxCacheMessageSizeInBytes,
consumptionThreadCount);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
new file mode 100644
index 0000000..90bbdeb
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import apache.rocketmq.v2.ClientType;
+import apache.rocketmq.v2.CustomizedBackoff;
+import apache.rocketmq.v2.FilterType;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.Subscription;
+import apache.rocketmq.v2.SubscriptionEntry;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.java.message.protocol.Resource;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PushSubscriptionSettingsTest extends TestBase {
+
+ @Test
+ public void testToProtobuf() {
+ Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ String clientId = "clientId";
+ Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+ subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
+ final Duration requestTimeout = Duration.ofSeconds(3);
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(clientId,
+ fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
+ final Settings settings = pushSubscriptionSettings.toProtobuf();
+ Assert.assertEquals(settings.getClientType(),
ClientType.PUSH_CONSUMER);
+ Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
+ Assert.assertTrue(settings.hasSubscription());
+ final Subscription subscription = settings.getSubscription();
+ Assert.assertEquals(subscription.getGroup(),
+
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ Assert.assertFalse(subscription.getFifo());
+ final List<SubscriptionEntry> subscriptionsList =
subscription.getSubscriptionsList();
+ Assert.assertEquals(subscriptionsList.size(), 1);
+ final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
+ Assert.assertEquals(subscriptionEntry.getExpression().getType(),
FilterType.TAG);
+ Assert.assertEquals(subscriptionEntry.getTopic(),
+
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ }
+
+ @Test
+ public void testToProtobufWithSqlExpression() {
+ Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ String clientId = "clientId";
+ Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+ subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10
AND a < 100) OR (b IS NOT NULL AND "
+ + "b=TRUE)", FilterExpressionType.SQL92));
+ final Duration requestTimeout = Duration.ofSeconds(3);
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(clientId,
+ fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
+ final Settings settings = pushSubscriptionSettings.toProtobuf();
+ Assert.assertEquals(settings.getClientType(),
ClientType.PUSH_CONSUMER);
+ Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
+ Assert.assertTrue(settings.hasSubscription());
+ final Subscription subscription = settings.getSubscription();
+ Assert.assertEquals(subscription.getGroup(),
+
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ Assert.assertFalse(subscription.getFifo());
+ final List<SubscriptionEntry> subscriptionsList =
subscription.getSubscriptionsList();
+ Assert.assertEquals(subscriptionsList.size(), 1);
+ final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
+ Assert.assertEquals(subscriptionEntry.getExpression().getType(),
FilterType.SQL);
+ Assert.assertEquals(subscriptionEntry.getTopic(),
+
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ }
+
+ @Test
+ public void testSync() {
+ com.google.protobuf.Duration duration0 = Durations.fromSeconds(1);
+ com.google.protobuf.Duration duration1 = Durations.fromSeconds(2);
+ com.google.protobuf.Duration duration2 = Durations.fromSeconds(3);
+ List<com.google.protobuf.Duration> durations = new ArrayList<>();
+ durations.add(duration0);
+ durations.add(duration1);
+ durations.add(duration2);
+ CustomizedBackoff customizedBackoff =
CustomizedBackoff.newBuilder().addAllNext(durations).build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy =
apache.rocketmq.v2.RetryPolicy.newBuilder()
+ .setCustomizedBackoff(customizedBackoff).setMaxAttempts(3).build();
+ boolean fifo = true;
+ int receiveBatchSize = 96;
+ com.google.protobuf.Duration longPollingTimeout =
Durations.fromSeconds(60);
+ Subscription subscription =
Subscription.newBuilder().setFifo(fifo).setReceiveBatchSize(receiveBatchSize)
+ .setLongPollingTimeout(longPollingTimeout).build();
+ Settings settings =
Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
+ Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ String clientId = "clientId";
+ Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+ subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10
AND a < 100) OR (b IS NOT NULL AND "
+ + "b=TRUE)", FilterExpressionType.SQL92));
+ final Duration requestTimeout = Duration.ofSeconds(3);
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(clientId,
+ fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
+ pushSubscriptionSettings.sync(settings);
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
index f3479e8..3f6002e 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
@@ -40,8 +40,8 @@ public class SimpleConsumerBuilderTest extends TestBase {
public void testBuildWithoutExpressions() throws ClientException {
final SimpleConsumerBuilderImpl builder = new
SimpleConsumerBuilderImpl();
ClientConfiguration clientConfiguration =
-
ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
-
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
+
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_CONSUMER_GROUP_0)
.build();
}
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 00dfe5a..6fc0709 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -56,7 +56,7 @@ import org.mockito.junit.MockitoJUnitRunner;
public class SimpleConsumerImplTest extends TestBase {
@InjectMocks
private final ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(FAKE_ACCESS_POINT).build();
+ .setEndpoints(FAKE_ENDPOINTS).build();
private final Duration awaitDuration = Duration.ofSeconds(3);
private final Map<String, FilterExpression> subExpressions =
createSubscriptionExpressions(FAKE_TOPIC_0);
@@ -65,31 +65,35 @@ public class SimpleConsumerImplTest extends TestBase {
@Test(expected = IllegalStateException.class)
public void testReceiveWithoutStart() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_GROUP_0, awaitDuration, subExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_CONSUMER_GROUP_0, awaitDuration,
+ subExpressions);
simpleConsumer.receive(1, Duration.ofSeconds(1));
}
@Test(expected = IllegalStateException.class)
public void testAckWithoutStart() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_GROUP_0, awaitDuration, subExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_CONSUMER_GROUP_0, awaitDuration,
+ subExpressions);
simpleConsumer.ack(fakeMessageViewImpl());
}
@Test(expected = IllegalStateException.class)
public void testSubscribeWithoutStart() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_GROUP_0, awaitDuration, subExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_CONSUMER_GROUP_0, awaitDuration,
+ subExpressions);
simpleConsumer.subscribe(FAKE_TOPIC_1, FilterExpression.SUB_ALL);
}
@Test(expected = IllegalStateException.class)
public void testUnsubscribeWithoutStart() {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_GROUP_0, awaitDuration, subExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_CONSUMER_GROUP_0, awaitDuration,
+ subExpressions);
simpleConsumer.unsubscribe(FAKE_TOPIC_0);
}
@Test
public void testReceiveAsyncWithZeroMaxMessageNum() throws
InterruptedException {
- simpleConsumer = Mockito.spy(new
SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+ simpleConsumer = Mockito.spy(new
SimpleConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, awaitDuration,
subExpressions));
when(simpleConsumer.isRunning()).thenReturn(true);
final CompletableFuture<List<MessageView>> future =
simpleConsumer.receiveAsync(0,
@@ -104,7 +108,7 @@ public class SimpleConsumerImplTest extends TestBase {
@Test
public void testAckAsync() throws ExecutionException, InterruptedException
{
- simpleConsumer = Mockito.spy(new
SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+ simpleConsumer = Mockito.spy(new
SimpleConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, awaitDuration,
subExpressions));
when(simpleConsumer.isRunning()).thenReturn(true);
final MessageViewImpl messageView = fakeMessageViewImpl(false);
@@ -263,7 +267,7 @@ public class SimpleConsumerImplTest extends TestBase {
@Test
public void testChangeInvisibleDurationAsync() throws ExecutionException,
InterruptedException {
- simpleConsumer = Mockito.spy(new
SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+ simpleConsumer = Mockito.spy(new
SimpleConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, awaitDuration,
subExpressions));
when(simpleConsumer.isRunning()).thenReturn(true);
final MessageViewImpl messageView = fakeMessageViewImpl(false);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
new file mode 100644
index 0000000..ba0acf0
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.consumer;
+
+import apache.rocketmq.v2.ClientType;
+import apache.rocketmq.v2.FilterType;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.Subscription;
+import apache.rocketmq.v2.SubscriptionEntry;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.java.message.protocol.Resource;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SimpleSubscriptionSettingsTest extends TestBase {
+
+ @Test
+ public void testToProtobuf() {
+ Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ String clientId = "clientId";
+ Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+ subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
+ final Duration requestTimeout = Duration.ofSeconds(3);
+ final Duration longPollingTimeout = Duration.ofSeconds(15);
+ final SimpleSubscriptionSettings simpleSubscriptionSettings = new
SimpleSubscriptionSettings(clientId,
+ fakeEndpoints(), groupResource, requestTimeout,
longPollingTimeout, subscriptionExpression);
+ final Settings settings = simpleSubscriptionSettings.toProtobuf();
+ Assert.assertEquals(settings.getClientType(),
ClientType.SIMPLE_CONSUMER);
+ Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
+ Assert.assertTrue(settings.hasSubscription());
+ final Subscription subscription = settings.getSubscription();
+ Assert.assertEquals(subscription.getGroup(),
+
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ Assert.assertFalse(subscription.getFifo());
+ Assert.assertEquals(subscription.getLongPollingTimeout(),
Durations.fromNanos(longPollingTimeout.toNanos()));
+ final List<SubscriptionEntry> subscriptionsList =
subscription.getSubscriptionsList();
+ Assert.assertEquals(subscriptionsList.size(), 1);
+ final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
+ Assert.assertEquals(subscriptionEntry.getExpression().getType(),
FilterType.TAG);
+ Assert.assertEquals(subscriptionEntry.getTopic(),
+
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ }
+
+ @Test
+ public void testToProtobufWithSqlExpression() {
+ Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ String clientId = "clientId";
+ Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
+ subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10
AND a < 100) OR (b IS NOT NULL AND "
+ + "b=TRUE)", FilterExpressionType.SQL92));
+ final Duration requestTimeout = Duration.ofSeconds(3);
+ final Duration longPollingTimeout = Duration.ofSeconds(15);
+ final SimpleSubscriptionSettings simpleSubscriptionSettings = new
SimpleSubscriptionSettings(clientId,
+ fakeEndpoints(), groupResource, requestTimeout,
longPollingTimeout, subscriptionExpression);
+ final Settings settings = simpleSubscriptionSettings.toProtobuf();
+ Assert.assertEquals(settings.getClientType(),
ClientType.SIMPLE_CONSUMER);
+ Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
+ Assert.assertTrue(settings.hasSubscription());
+ final Subscription subscription = settings.getSubscription();
+ Assert.assertEquals(subscription.getGroup(),
+
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ Assert.assertFalse(subscription.getFifo());
+ Assert.assertEquals(subscription.getLongPollingTimeout(),
Durations.fromNanos(longPollingTimeout.toNanos()));
+ final List<SubscriptionEntry> subscriptionsList =
subscription.getSubscriptionsList();
+ Assert.assertEquals(subscriptionsList.size(), 1);
+ final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
+ Assert.assertEquals(subscriptionEntry.getExpression().getType(),
FilterType.SQL);
+ Assert.assertEquals(subscriptionEntry.getTopic(),
+
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ }
+
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
index e1298bd..45d9c91 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
@@ -37,14 +37,12 @@ import
org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Ignore;
import org.junit.Test;
public class StandardConsumeServiceTest extends TestBase {
@Test
- @Ignore
- public void testDispatch0() {
+ public void testDispatch() throws InterruptedException {
final ProcessQueue processQueue0 = mock(ProcessQueue.class);
final ProcessQueue processQueue1 = mock(ProcessQueue.class);
@@ -62,7 +60,6 @@ public class StandardConsumeServiceTest extends TestBase {
when(processQueue1.tryTakeMessage()).thenReturn(Optional.of(messageView1));
MessageListener listener = messageView -> ConsumeResult.SUCCESS;
-
MessageInterceptor interceptor = new MessageInterceptor() {
@Override
public void doBefore(MessageHookPoints messageHookPoints,
List<MessageCommon> messageCommons) {
@@ -76,7 +73,10 @@ public class StandardConsumeServiceTest extends TestBase {
final StandardConsumeService service = new
StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
service.dispatch0();
+ Thread.sleep(1000);
+ verify(processQueue0, times(1)).tryTakeMessage();
+ verify(processQueue1, times(1)).tryTakeMessage();
verify(processQueue0,
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
- verify(processQueue0,
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
+ verify(processQueue1,
times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
}
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
index 6f98f3f..9680a71 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.client.java.impl.producer;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@@ -37,21 +40,53 @@ public class ProducerBuilderImplTest {
builder.setTopics(null);
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetIllegalTopic() {
+ final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+ builder.setTopics("\t");
+ }
+
+ @Test
+ public void testSetTopic() {
+ final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+ builder.setTopics("abc");
+ }
+
@Test(expected = IllegalArgumentException.class)
public void testSetNegativeMaxAttempts() {
final ProducerBuilderImpl builder = new ProducerBuilderImpl();
builder.setMaxAttempts(-1);
}
+ @Test
+ public void testSetMaxAttempts() {
+ final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+ builder.setMaxAttempts(3);
+ }
+
@Test(expected = NullPointerException.class)
public void testSetTransactionCheckerWithNull() {
final ProducerBuilderImpl builder = new ProducerBuilderImpl();
builder.setTransactionChecker(null);
}
+ @Test
+ public void testSetTransactionChecker() {
+ final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+ builder.setTransactionChecker(messageView ->
TransactionResolution.COMMIT);
+ }
+
@Test(expected = NullPointerException.class)
public void testBuildWithoutClientConfiguration() {
final ProducerBuilderImpl builder = new ProducerBuilderImpl();
builder.build();
}
+
+ @Test
+ public void testBuild() throws ClientException {
+ ClientConfiguration clientConfiguration =
+
ClientConfiguration.newBuilder().setEndpoints("foobar.com:8081").build();
+ final ProducerBuilderImpl builder = new ProducerBuilderImpl();
+ builder.setClientConfiguration(clientConfiguration).build();
+ }
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index ef94469..b4dae64 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -51,7 +51,7 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ProducerImplTest extends TestBase {
private final ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(FAKE_ACCESS_POINT).build();
+ .setEndpoints(FAKE_ENDPOINTS).build();
@SuppressWarnings("SameParameterValue")
private ProducerImpl createProducerWithTopic(String topic) {
@@ -106,7 +106,7 @@ public class ProducerImplTest extends TestBase {
Mockito.doReturn(Futures.immediateFailedFuture(exception))
.when(producer).send0(any(Metadata.class), any(Endpoints.class),
anyList(), any(MessageQueueImpl.class));
producer.send(message);
- final int maxAttempts =
producer.producerSettings.getRetryPolicy().getMaxAttempts();
+ final int maxAttempts =
producer.publishingSettings.getRetryPolicy().getMaxAttempts();
verify(producer, times(maxAttempts)).send0(any(Metadata.class),
any(Endpoints.class), anyList(),
any(MessageQueueImpl.class));
producer.close();
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index e7670ed..d7d547f 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -17,11 +17,28 @@
package org.apache.rocketmq.client.java.impl.producer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+import org.apache.rocketmq.client.java.message.MessageCommon;
+import org.apache.rocketmq.client.java.message.MessageType;
+import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
+import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@@ -29,6 +46,52 @@ public class TransactionImplTest extends TestBase {
@Mock
ProducerImpl producer;
+ @Test
+ public void testTryAddMessage() throws IOException {
+ Set<String> set = new HashSet<>();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ final ProducerImpl producer = Mockito.spy(new
ProducerImpl(clientConfiguration, set, 1, null));
+ final TransactionImpl transaction = new TransactionImpl(producer);
+ final Message message = fakeMessage(FAKE_TOPIC_0);
+ final PublishingMessageImpl publishingMessage =
transaction.tryAddMessage(message);
+ Assert.assertEquals(publishingMessage.getMessageType(),
MessageType.TRANSACTION);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTryAddExceededMessages() throws IOException {
+ Set<String> set = new HashSet<>();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ final ProducerImpl producer = Mockito.spy(new
ProducerImpl(clientConfiguration, set, 1, null));
+ final TransactionImpl transaction = new TransactionImpl(producer);
+ final Message message0 = fakeMessage(FAKE_TOPIC_0);
+ transaction.tryAddMessage(message0);
+ final Message message1 = fakeMessage(FAKE_TOPIC_0);
+ transaction.tryAddMessage(message1);
+ }
+
+ @Test
+ public void testTryAddReceipt() throws IOException, ClientException,
ExecutionException, InterruptedException {
+ Set<String> set = new HashSet<>();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ final ProducerImpl producer = Mockito.spy(new
ProducerImpl(clientConfiguration, set, 1, null));
+ final TransactionImpl transaction = new TransactionImpl(producer);
+ final Message message = fakeMessage(FAKE_TOPIC_0);
+ final PublishingMessageImpl publishingMessage =
transaction.tryAddMessage(message);
+ final SendReceiptImpl sendReceipt =
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
+ transaction.tryAddReceipt(publishingMessage, sendReceipt);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTryAddReceiptNotContained() throws ClientException,
ExecutionException, InterruptedException {
+ PublishingMessageImpl publishingMessage =
Mockito.mock(PublishingMessageImpl.class);
+ Set<String> set = new HashSet<>();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ final ProducerImpl producer = Mockito.spy(new
ProducerImpl(clientConfiguration, set, 1, null));
+ final TransactionImpl transaction = new TransactionImpl(producer);
+ final SendReceiptImpl sendReceipt =
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
+ transaction.tryAddReceipt(publishingMessage, sendReceipt);
+ }
+
@Test(expected = IllegalStateException.class)
public void testCommitWithNoReceipts() throws ClientException {
final TransactionImpl transaction = new TransactionImpl(producer);
@@ -40,4 +103,34 @@ public class TransactionImplTest extends TestBase {
final TransactionImpl transaction = new TransactionImpl(producer);
transaction.rollback();
}
+
+ @Test
+ public void testCommit() throws IOException, ClientException,
ExecutionException, InterruptedException {
+ Set<String> set = new HashSet<>();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ final ProducerImpl producer = Mockito.spy(new
ProducerImpl(clientConfiguration, set, 1, null));
+ final TransactionImpl transaction = new TransactionImpl(producer);
+ final Message message = fakeMessage(FAKE_TOPIC_0);
+ final PublishingMessageImpl publishingMessage =
transaction.tryAddMessage(message);
+ final SendReceiptImpl sendReceipt =
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
+ transaction.tryAddReceipt(publishingMessage, sendReceipt);
+
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class),
any(MessageCommon.class),
+ any(MessageId.class), anyString(),
any(TransactionResolution.class));
+ transaction.commit();
+ }
+
+ @Test
+ public void testRollback() throws IOException, ClientException,
ExecutionException, InterruptedException {
+ Set<String> set = new HashSet<>();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(FAKE_ENDPOINTS).build();
+ final ProducerImpl producer = Mockito.spy(new
ProducerImpl(clientConfiguration, set, 1, null));
+ final TransactionImpl transaction = new TransactionImpl(producer);
+ final Message message = fakeMessage(FAKE_TOPIC_0);
+ final PublishingMessageImpl publishingMessage =
transaction.tryAddMessage(message);
+ final SendReceiptImpl sendReceipt =
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
+ transaction.tryAddReceipt(publishingMessage, sendReceipt);
+
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class),
any(MessageCommon.class),
+ any(MessageId.class), anyString(),
any(TransactionResolution.class));
+ transaction.rollback();
+ }
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/EncodingTest.java
similarity index 50%
copy from
java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
copy to
java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/EncodingTest.java
index e7670ed..9030761 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/EncodingTest.java
@@ -15,29 +15,29 @@
* limitations under the License.
*/
-package org.apache.rocketmq.client.java.impl.producer;
+package org.apache.rocketmq.client.java.message.protocol;
-import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-@RunWith(MockitoJUnitRunner.class)
-public class TransactionImplTest extends TestBase {
- @Mock
- ProducerImpl producer;
+public class EncodingTest extends TestBase {
- @Test(expected = IllegalStateException.class)
- public void testCommitWithNoReceipts() throws ClientException {
- final TransactionImpl transaction = new TransactionImpl(producer);
- transaction.commit();
+ @Test
+ public void testToProtobuf() {
+ Assert.assertEquals(Encoding.toProtobuf(Encoding.IDENTITY),
apache.rocketmq.v2.Encoding.IDENTITY);
+ Assert.assertEquals(Encoding.toProtobuf(Encoding.GZIP),
apache.rocketmq.v2.Encoding.GZIP);
}
- @Test(expected = IllegalStateException.class)
- public void testRollbackWithNoReceipts() throws ClientException {
- final TransactionImpl transaction = new TransactionImpl(producer);
- transaction.rollback();
+ @Test
+ public void testFromProtobuf() {
+
Assert.assertEquals(Encoding.fromProtobuf(apache.rocketmq.v2.Encoding.IDENTITY),
Encoding.IDENTITY);
+
Assert.assertEquals(Encoding.fromProtobuf(apache.rocketmq.v2.Encoding.GZIP),
Encoding.GZIP);
+ }
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromProtobufWithUnspecified() {
+
Encoding.fromProtobuf(apache.rocketmq.v2.Encoding.ENCODING_UNSPECIFIED);
}
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/ResourceTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/ResourceTest.java
new file mode 100644
index 0000000..d2638d6
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/protocol/ResourceTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.message.protocol;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResourceTest extends TestBase {
+
+ @Test
+ public void testGetterAndSetter() {
+ Resource resource = new Resource("foobar");
+ Assert.assertEquals(resource.getName(), "foobar");
+ Assert.assertEquals(resource.getNamespace(), StringUtils.EMPTY);
+
+ resource = new Resource("foo", "bar");
+ Assert.assertEquals(resource.getName(), "bar");
+ Assert.assertEquals(resource.getNamespace(), "foo");
+ }
+
+ @Test
+ public void testToProtobuf() {
+ Resource resource = new Resource("foo", "bar");
+ final apache.rocketmq.v2.Resource protobuf = resource.toProtobuf();
+ Assert.assertEquals(protobuf.getResourceNamespace(), "foo");
+ Assert.assertEquals(protobuf.getName(), "bar");
+ }
+
+ @Test
+ public void testEqual() {
+ Resource resource0 = new Resource("foo", "bar");
+ Resource resource1 = new Resource("foo", "bar");
+ Assert.assertEquals(resource0, resource1);
+ Resource resource2 = new Resource("foo0", "bar");
+ Assert.assertNotEquals(resource0, resource2);
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
new file mode 100644
index 0000000..41f408d
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.misc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.NoSuchAlgorithmException;
+import org.junit.Test;
+
+public class UtilitiesTest {
+ private final String body = "foobar";
+
+ @Test
+ public void testCompressAndUncompressByteArray() throws IOException {
+ final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+ final byte[] compressedBytes = Utilities.compressBytesGzip(bytes, 5);
+ final byte[] originalBytes =
Utilities.uncompressBytesGzip(compressedBytes);
+ assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body);
+ }
+
+ @Test
+ public void testCrc32CheckSum() {
+ final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+ assertEquals("9EF61F95", Utilities.crc32CheckSum(bytes));
+ }
+
+ @Test
+ public void testMd5CheckSum() throws NoSuchAlgorithmException {
+ final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+ assertEquals("3858F62230AC3C915F300C664312C63F",
Utilities.md5CheckSum(bytes));
+ }
+
+ @Test
+ public void testSha1CheckSum() throws NoSuchAlgorithmException {
+ final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
+ assertEquals("8843D7F92416211DE9EBB963FF4CE28125932878",
Utilities.sha1CheckSum(bytes));
+ }
+
+ @Test
+ public void testStackTrace() {
+ final String stackTrace = Utilities.stackTrace();
+ assertNotNull(stackTrace);
+ assertTrue(stackTrace.length() > 0);
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index cd4cb85..3c1fd06 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -62,7 +62,7 @@ import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
-import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
+import org.apache.rocketmq.client.java.impl.producer.PublishingSettings;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.java.message.MessageIdCodec;
@@ -94,7 +94,7 @@ public class TestBase {
protected static final String FAKE_RECEIPT_HANDLE_0 = "foo-bar-handle-0";
protected static final String FAKE_RECEIPT_HANDLE_1 = "foo-bar-handle-1";
- protected static final String FAKE_ACCESS_POINT = "127.0.0.1:9876";
+ protected static final String FAKE_ENDPOINTS = "127.0.0.1:9876";
protected static final String FAKE_HOST_0 = "127.0.0.1";
protected static final int FAKE_PORT_0 = 8080;
@@ -102,7 +102,7 @@ public class TestBase {
protected static final String FAKE_HOST_1 = "127.0.0.2";
protected static final int FAKE_PORT_1 = 8081;
- protected static final String FAKE_GROUP_0 = "foo-bar-group-0";
+ protected static final String FAKE_CONSUMER_GROUP_0 = "foo-bar-group-0";
protected static final String FAKE_TRANSACTION_ID =
"foo-bar-transaction-id";
protected static final long FAKE_OFFSET = 1;
@@ -358,8 +358,8 @@ public class TestBase {
return new ExponentialBackoffRetryPolicy(3, Duration.ofMillis(100),
Duration.ofSeconds(3), 2);
}
- protected ProducerSettings fakeProducerSettings() {
- return new ProducerSettings(FAKE_CLIENT_ID, fakeEndpoints(),
fakeExponentialBackoffRetryPolicy(),
+ protected PublishingSettings fakeProducerSettings() {
+ return new PublishingSettings(FAKE_CLIENT_ID, fakeEndpoints(),
fakeExponentialBackoffRetryPolicy(),
Duration.ofSeconds(1), new HashSet<>());
}
diff --git a/java/style/spotbugs-suppressions.xml
b/java/style/spotbugs-suppressions.xml
index 4e28e14..1be718a 100644
--- a/java/style/spotbugs-suppressions.xml
+++ b/java/style/spotbugs-suppressions.xml
@@ -40,25 +40,25 @@
</Match>
<Match>
- <Class
name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" />
+ <Class
name="org.apache.rocketmq.client.java.impl.consumer.PushSubscriptionSettings" />
<Method name="applySettingsCommand" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>
<Match>
- <Class
name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" />
+ <Class
name="org.apache.rocketmq.client.java.impl.consumer.PushSubscriptionSettings" />
<Method name="applySettingsCommand" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>
<Match>
- <Class
name="org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerSettings" />
+ <Class
name="org.apache.rocketmq.client.java.impl.consumer.SimpleSubscriptionSettings"
/>
<Method name="applySettingsCommand" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>
<Match>
- <Class
name="org.apache.rocketmq.client.java.impl.producer.ProducerSettings" />
+ <Class
name="org.apache.rocketmq.client.java.impl.producer.PublishingSettings" />
<Method name="applySettingsCommand" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
</Match>