Copilot commented on code in PR #10564:
URL: https://github.com/apache/rocketmq/pull/10564#discussion_r3503201859
##########
common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java:
##########
@@ -52,6 +52,7 @@ public class MessageConst {
public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET =
"TRAN_PREPARED_QUEUE_OFFSET";
public static final String PROPERTY_TRANSACTION_ID = "__transactionId__";
public static final String PROPERTY_TRANSACTION_CHECK_TIMES =
"TRANSACTION_CHECK_TIMES";
+ public static final String PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID =
"__TXN_PRODUCER_CID__";
Review Comment:
The new internal transaction property key is not added to
MessageConst.STRING_HASH_SET. That set is used to reserve system property names
so user code cannot accidentally set them via putUserProperty. Consider adding
PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID to STRING_HASH_SET in the static
initializer to keep it consistent with other transaction/broker-internal
properties.
##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -1444,6 +1444,7 @@ public TransactionSendResult
sendMessageInTransaction(final Message msg,
SendResult sendResult = null;
MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
+ MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID,
this.mQClientFactory.getClientId());
Review Comment:
sendMessageInTransaction now dereferences mQClientFactory to get clientId,
but this method does not call makeSureStateOK() first. If the producer is not
started yet (serviceState != RUNNING), mQClientFactory can be null and this
will throw NullPointerException instead of the expected MQClientException.
##########
broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java:
##########
@@ -316,6 +316,30 @@ public Channel getAvailableChannel(String groupId) {
return lastActiveChannel;
}
+ /**
+ * Get an available channel for the given group, preferring the producer
that originally sent the message.
+ * Falls back to round-robin if the preferred producer is not found or not
available.
+ *
+ * @param groupId producer group
+ * @param preferredClientId the clientId of the original producer (from
half message properties), may be null
+ * @return an available channel, or null if none found
+ */
+ public Channel getAvailableChannel(String groupId, String
preferredClientId) {
+ if (preferredClientId != null) {
+ ConcurrentMap<Channel, ClientChannelInfo> channelMap =
groupChannelTable.get(groupId);
+ if (channelMap != null) {
+ for (Map.Entry<Channel, ClientChannelInfo> entry :
channelMap.entrySet()) {
+ if
(preferredClientId.equals(entry.getValue().getClientId())
+ && entry.getKey().isActive() &&
entry.getKey().isWritable()) {
+ return entry.getKey();
+ }
+ }
+ }
+ }
+ // Fall back to round-robin selection
+ return getAvailableChannel(groupId);
+ }
Review Comment:
New overload calls groupChannelTable.get(groupId) when preferredClientId is
non-null, but unlike the original getAvailableChannel(String) it doesn't guard
against groupId==null. Because groupChannelTable is a ConcurrentHashMap,
get(null) will throw NullPointerException. This can happen if the half message
is missing PGROUP (groupId is null) while the new clientId property is present.
##########
broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java:
##########
@@ -225,4 +225,37 @@ public void testGetAvailableChannel() {
assertThat(c).isNull();
}
+ @Test
+ public void testGetAvailableChannelWithPreferredClientId() {
+ producerManager.registerProducer(group, clientInfo);
+ when(channel.isActive()).thenReturn(true);
+ when(channel.isWritable()).thenReturn(true);
+
+ // Match: preferred clientId matches registered producer
+ Channel c = producerManager.getAvailableChannel(group, "clientId");
+ assertThat(c).isSameAs(channel);
+ }
+
+ @Test
+ public void testGetAvailableChannelWithPreferredClientIdNotFound() {
+ producerManager.registerProducer(group, clientInfo);
+ when(channel.isActive()).thenReturn(true);
+ when(channel.isWritable()).thenReturn(true);
+
+ // No match: falls back to round-robin (returns some channel from
group)
+ Channel c = producerManager.getAvailableChannel(group,
"nonExistentClientId");
+ assertThat(c).isNotNull(); // should fall back to round-robin
+ }
+
+ @Test
+ public void testGetAvailableChannelWithNullPreferredClientId() {
+ producerManager.registerProducer(group, clientInfo);
+ when(channel.isActive()).thenReturn(true);
+ when(channel.isWritable()).thenReturn(true);
+
+ // null clientId: should behave exactly like original
getAvailableChannel
+ Channel c = producerManager.getAvailableChannel(group, null);
+ assertThat(c).isNotNull();
+ }
+
}
Review Comment:
Add a regression test for the null groupId case when preferredClientId is
non-null. The new ProducerManager#getAvailableChannel(String,String) should
match the old method's behavior (return null) and must not throw
(ConcurrentHashMap#get(null) would NPE without a guard).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]