Copilot commented on code in PR #10564:
URL: https://github.com/apache/rocketmq/pull/10564#discussion_r3503201859


##########
common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java:
##########
@@ -52,6 +52,7 @@ public class MessageConst {
     public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = 
"TRAN_PREPARED_QUEUE_OFFSET";
     public static final String PROPERTY_TRANSACTION_ID = "__transactionId__";
     public static final String PROPERTY_TRANSACTION_CHECK_TIMES = 
"TRANSACTION_CHECK_TIMES";
+    public static final String PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID = 
"__TXN_PRODUCER_CID__";

Review Comment:
   The new internal transaction property key is not added to 
MessageConst.STRING_HASH_SET. That set is used to reserve system property names 
so user code cannot accidentally set them via putUserProperty. Consider adding 
PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID to STRING_HASH_SET in the static 
initializer to keep it consistent with other transaction/broker-internal 
properties.



##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -1444,6 +1444,7 @@ public TransactionSendResult 
sendMessageInTransaction(final Message msg,
         SendResult sendResult = null;
         MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, 
this.defaultMQProducer.getProducerGroup());
+        MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID, 
this.mQClientFactory.getClientId());

Review Comment:
   sendMessageInTransaction now dereferences mQClientFactory to get clientId, 
but this method does not call makeSureStateOK() first. If the producer is not 
started yet (serviceState != RUNNING), mQClientFactory can be null and this 
will throw NullPointerException instead of the expected MQClientException.



##########
broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java:
##########
@@ -316,6 +316,30 @@ public Channel getAvailableChannel(String groupId) {
         return lastActiveChannel;
     }
 
+    /**
+     * Get an available channel for the given group, preferring the producer 
that originally sent the message.
+     * Falls back to round-robin if the preferred producer is not found or not 
available.
+     *
+     * @param groupId producer group
+     * @param preferredClientId the clientId of the original producer (from 
half message properties), may be null
+     * @return an available channel, or null if none found
+     */
+    public Channel getAvailableChannel(String groupId, String 
preferredClientId) {
+        if (preferredClientId != null) {
+            ConcurrentMap<Channel, ClientChannelInfo> channelMap = 
groupChannelTable.get(groupId);
+            if (channelMap != null) {
+                for (Map.Entry<Channel, ClientChannelInfo> entry : 
channelMap.entrySet()) {
+                    if 
(preferredClientId.equals(entry.getValue().getClientId())
+                        && entry.getKey().isActive() && 
entry.getKey().isWritable()) {
+                        return entry.getKey();
+                    }
+                }
+            }
+        }
+        // Fall back to round-robin selection
+        return getAvailableChannel(groupId);
+    }

Review Comment:
   New overload calls groupChannelTable.get(groupId) when preferredClientId is 
non-null, but unlike the original getAvailableChannel(String) it doesn't guard 
against groupId==null. Because groupChannelTable is a ConcurrentHashMap, 
get(null) will throw NullPointerException. This can happen if the half message 
is missing PGROUP (groupId is null) while the new clientId property is present.



##########
broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java:
##########
@@ -225,4 +225,37 @@ public void testGetAvailableChannel() {
         assertThat(c).isNull();
     }
 
+    @Test
+    public void testGetAvailableChannelWithPreferredClientId() {
+        producerManager.registerProducer(group, clientInfo);
+        when(channel.isActive()).thenReturn(true);
+        when(channel.isWritable()).thenReturn(true);
+
+        // Match: preferred clientId matches registered producer
+        Channel c = producerManager.getAvailableChannel(group, "clientId");
+        assertThat(c).isSameAs(channel);
+    }
+
+    @Test
+    public void testGetAvailableChannelWithPreferredClientIdNotFound() {
+        producerManager.registerProducer(group, clientInfo);
+        when(channel.isActive()).thenReturn(true);
+        when(channel.isWritable()).thenReturn(true);
+
+        // No match: falls back to round-robin (returns some channel from 
group)
+        Channel c = producerManager.getAvailableChannel(group, 
"nonExistentClientId");
+        assertThat(c).isNotNull(); // should fall back to round-robin
+    }
+
+    @Test
+    public void testGetAvailableChannelWithNullPreferredClientId() {
+        producerManager.registerProducer(group, clientInfo);
+        when(channel.isActive()).thenReturn(true);
+        when(channel.isWritable()).thenReturn(true);
+
+        // null clientId: should behave exactly like original 
getAvailableChannel
+        Channel c = producerManager.getAvailableChannel(group, null);
+        assertThat(c).isNotNull();
+    }
+
 }

Review Comment:
   Add a regression test for the null groupId case when preferredClientId is 
non-null. The new ProducerManager#getAvailableChannel(String,String) should 
match the old method's behavior (return null) and must not throw 
(ConcurrentHashMap#get(null) would NPE without a guard).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to