This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 11f0002a3c [ISSUE #8806] fix autoBatch bug when connecting multiple 
RocketMQ clusters. (#8807)
11f0002a3c is described below

commit 11f0002a3c85ae449d72f911b03b7b74e275b265
Author: luozongle01 <[email protected]>
AuthorDate: Fri Oct 11 15:29:18 2024 +0800

    [ISSUE #8806] fix autoBatch bug when connecting multiple RocketMQ clusters. 
(#8807)
---
 .../impl/producer/DefaultMQProducerImpl.java       |   2 +
 .../client/producer/DefaultMQProducer.java         |  55 +++++--
 .../client/producer/DefaultMQProducerTest.java     | 168 +++++++++++++++++++--
 3 files changed, 196 insertions(+), 29 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 74a2516174..3d4fdbec37 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -250,6 +250,8 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
 
                 this.mQClientFactory = 
MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer,
 rpcHook);
 
+                defaultMQProducer.initProduceAccumulator();
+
                 boolean registerOK = 
mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), 
this);
                 if (!registerOK) {
                     this.serviceState = ServiceState.CREATE_JUST;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index f0842de8ba..a8bf7cee85 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -174,6 +174,21 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      */
     private int backPressureForAsyncSendSize = 100 * 1024 * 1024;
 
+    /**
+     * Maximum hold time of accumulator.
+     */
+    private int batchMaxDelayMs = -1;
+
+    /**
+     * Maximum accumulation message body size for a single messageAccumulation.
+     */
+    private long batchMaxBytes = -1;
+
+    /**
+     * Maximum message body size for produceAccumulator.
+     */
+    private long totalBatchMaxBytes = -1;
+
     private RPCHook rpcHook = null;
 
     /**
@@ -293,7 +308,6 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         this.enableTrace = enableMsgTrace;
         this.traceTopic = customizedTraceTopic;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
-        produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
     }
 
     /**
@@ -320,7 +334,6 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         this.producerGroup = producerGroup;
         this.rpcHook = rpcHook;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
-        produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
     }
 
     /**
@@ -1168,10 +1181,10 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     }
 
     public void batchMaxDelayMs(int holdMs) {
-        if (this.produceAccumulator == null) {
-            throw new UnsupportedOperationException("The currently constructed 
producer does not support autoBatch");
+        this.batchMaxDelayMs = holdMs;
+        if (this.produceAccumulator != null) {
+            this.produceAccumulator.batchMaxDelayMs(holdMs);
         }
-        this.produceAccumulator.batchMaxDelayMs(holdMs);
     }
 
     public long getBatchMaxBytes() {
@@ -1182,10 +1195,10 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     }
 
     public void batchMaxBytes(long holdSize) {
-        if (this.produceAccumulator == null) {
-            throw new UnsupportedOperationException("The currently constructed 
producer does not support autoBatch");
+        this.batchMaxBytes = holdSize;
+        if (this.produceAccumulator != null) {
+            this.produceAccumulator.batchMaxBytes(holdSize);
         }
-        this.produceAccumulator.batchMaxBytes(holdSize);
     }
 
     public long getTotalBatchMaxBytes() {
@@ -1196,10 +1209,10 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     }
 
     public void totalBatchMaxBytes(long totalHoldSize) {
-        if (this.produceAccumulator == null) {
-            throw new UnsupportedOperationException("The currently constructed 
producer does not support autoBatch");
+        this.totalBatchMaxBytes = totalHoldSize;
+        if (this.produceAccumulator != null) {
+            this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
         }
-        this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
     }
 
     public boolean getAutoBatch() {
@@ -1210,9 +1223,6 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     }
 
     public void setAutoBatch(boolean autoBatch) {
-        if (this.produceAccumulator == null) {
-            throw new UnsupportedOperationException("The currently constructed 
producer does not support autoBatch");
-        }
         this.autoBatch = autoBatch;
     }
 
@@ -1439,4 +1449,21 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     public Compressor getCompressor() {
         return compressor;
     }
+
+    public void initProduceAccumulator() {
+        this.produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+
+        if (this.batchMaxDelayMs > -1) {
+            this.produceAccumulator.batchMaxDelayMs(this.batchMaxDelayMs);
+        }
+
+        if (this.batchMaxBytes > -1) {
+            this.produceAccumulator.batchMaxBytes(this.batchMaxBytes);
+        }
+
+        if (this.totalBatchMaxBytes > -1) {
+            
this.produceAccumulator.totalBatchMaxBytes(this.totalBatchMaxBytes);
+        }
+
+    }
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 4cf899f970..33cf0df390 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -68,6 +68,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -659,9 +660,9 @@ public class DefaultMQProducerTest {
         assertNotNull(producer1);
         assertEquals(producerGroupTemp, producer1.getProducerGroup());
         assertNotNull(producer1.getDefaultMQProducerImpl());
-        assertTrue(producer1.getTotalBatchMaxBytes() > 0);
-        assertTrue(producer1.getBatchMaxBytes() > 0);
-        assertTrue(producer1.getBatchMaxDelayMs() > 0);
+        assertEquals(0, producer1.getTotalBatchMaxBytes());
+        assertEquals(0, producer1.getBatchMaxBytes());
+        assertEquals(0, producer1.getBatchMaxDelayMs());
         assertNull(producer1.getTopics());
         assertFalse(producer1.isEnableTrace());
         assertTrue(UtilAll.isBlank(producer1.getTraceTopic()));
@@ -669,9 +670,9 @@ public class DefaultMQProducerTest {
         assertNotNull(producer2);
         assertEquals(producerGroupTemp, producer2.getProducerGroup());
         assertNotNull(producer2.getDefaultMQProducerImpl());
-        assertTrue(producer2.getTotalBatchMaxBytes() > 0);
-        assertTrue(producer2.getBatchMaxBytes() > 0);
-        assertTrue(producer2.getBatchMaxDelayMs() > 0);
+        assertEquals(0, producer2.getTotalBatchMaxBytes());
+        assertEquals(0, producer2.getBatchMaxBytes());
+        assertEquals(0, producer2.getBatchMaxDelayMs());
         assertNull(producer2.getTopics());
         assertFalse(producer2.isEnableTrace());
         assertTrue(UtilAll.isBlank(producer2.getTraceTopic()));
@@ -679,9 +680,9 @@ public class DefaultMQProducerTest {
         assertNotNull(producer3);
         assertEquals(producerGroupTemp, producer3.getProducerGroup());
         assertNotNull(producer3.getDefaultMQProducerImpl());
-        assertTrue(producer3.getTotalBatchMaxBytes() > 0);
-        assertTrue(producer3.getBatchMaxBytes() > 0);
-        assertTrue(producer3.getBatchMaxDelayMs() > 0);
+        assertEquals(0, producer3.getTotalBatchMaxBytes());
+        assertEquals(0, producer3.getBatchMaxBytes());
+        assertEquals(0, producer3.getBatchMaxDelayMs());
         assertNotNull(producer3.getTopics());
         assertEquals(1, producer3.getTopics().size());
         assertFalse(producer3.isEnableTrace());
@@ -690,9 +691,9 @@ public class DefaultMQProducerTest {
         assertNotNull(producer4);
         assertEquals(producerGroupTemp, producer4.getProducerGroup());
         assertNotNull(producer4.getDefaultMQProducerImpl());
-        assertTrue(producer4.getTotalBatchMaxBytes() > 0);
-        assertTrue(producer4.getBatchMaxBytes() > 0);
-        assertTrue(producer4.getBatchMaxDelayMs() > 0);
+        assertEquals(0, producer4.getTotalBatchMaxBytes());
+        assertEquals(0, producer4.getBatchMaxBytes());
+        assertEquals(0, producer4.getBatchMaxDelayMs());
         assertNull(producer4.getTopics());
         assertTrue(producer4.isEnableTrace());
         assertEquals("custom_trace_topic", producer4.getTraceTopic());
@@ -700,9 +701,9 @@ public class DefaultMQProducerTest {
         assertNotNull(producer5);
         assertEquals(producerGroupTemp, producer5.getProducerGroup());
         assertNotNull(producer5.getDefaultMQProducerImpl());
-        assertTrue(producer5.getTotalBatchMaxBytes() > 0);
-        assertTrue(producer5.getBatchMaxBytes() > 0);
-        assertTrue(producer5.getBatchMaxDelayMs() > 0);
+        assertEquals(0, producer5.getTotalBatchMaxBytes());
+        assertEquals(0, producer5.getBatchMaxBytes());
+        assertEquals(0, producer5.getBatchMaxDelayMs());
         assertNotNull(producer5.getTopics());
         assertEquals(1, producer5.getTopics().size());
         assertTrue(producer5.isEnableTrace());
@@ -810,6 +811,136 @@ public class DefaultMQProducerTest {
         assertEquals(0L, producer.getTotalBatchMaxBytes());
     }
 
+    @Test
+    public void assertProduceAccumulatorStart() throws NoSuchFieldException, 
IllegalAccessException, MQClientException {
+        String producerGroupTemp = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
+        assertEquals(0, producer.getTotalBatchMaxBytes());
+        assertEquals(0, producer.getBatchMaxBytes());
+        assertEquals(0, producer.getBatchMaxDelayMs());
+        assertNull(getField(producer, "produceAccumulator", 
ProduceAccumulator.class));
+        producer.start();
+        assertTrue(producer.getTotalBatchMaxBytes() > 0);
+        assertTrue(producer.getBatchMaxBytes() > 0);
+        assertTrue(producer.getBatchMaxDelayMs() > 0);
+        assertNotNull(getField(producer, "produceAccumulator", 
ProduceAccumulator.class));
+    }
+
+    @Test
+    public void assertProduceAccumulatorBeforeStartSet() throws 
NoSuchFieldException, IllegalAccessException, MQClientException {
+        String producerGroupTemp = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
+        producer.totalBatchMaxBytes(64 * 1024 * 100);
+        producer.batchMaxBytes(64 * 1024);
+        producer.batchMaxDelayMs(10);
+
+        producer.start();
+        assertEquals(64 * 1024, producer.getBatchMaxBytes());
+        assertEquals(10, producer.getBatchMaxDelayMs());
+        assertNotNull(getField(producer, "produceAccumulator", 
ProduceAccumulator.class));
+    }
+
+    @Test
+    public void assertProduceAccumulatorAfterStartSet() throws 
NoSuchFieldException, IllegalAccessException, MQClientException {
+        String producerGroupTemp = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
+        producer.start();
+
+        assertNotNull(getField(producer, "produceAccumulator", 
ProduceAccumulator.class));
+
+        producer.totalBatchMaxBytes(64 * 1024 * 100);
+        producer.batchMaxBytes(64 * 1024);
+        producer.batchMaxDelayMs(10);
+
+        assertEquals(64 * 1024, producer.getBatchMaxBytes());
+        assertEquals(10, producer.getBatchMaxDelayMs());
+    }
+
+    @Test
+    public void assertProduceAccumulatorUnit() throws NoSuchFieldException, 
IllegalAccessException, MQClientException {
+        String producerGroupTemp = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp);
+        producer1.setUnitName("unit1");
+        DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp);
+        producer2.setUnitName("unit2");
+
+        producer1.start();
+        producer2.start();
+
+        ProduceAccumulator producer1Accumulator = getField(producer1, 
"produceAccumulator", ProduceAccumulator.class);
+        ProduceAccumulator producer2Accumulator = getField(producer2, 
"produceAccumulator", ProduceAccumulator.class);
+
+        assertNotNull(producer1Accumulator);
+        assertNotNull(producer2Accumulator);
+
+        assertNotEquals(producer1Accumulator, producer2Accumulator);
+    }
+
+    @Test
+    public void assertProduceAccumulator() throws NoSuchFieldException, 
IllegalAccessException, MQClientException {
+        String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer1 = new 
DefaultMQProducer(producerGroupTemp1);
+        producer1.setInstanceName("instanceName1");
+        String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer2 = new 
DefaultMQProducer(producerGroupTemp2);
+        producer2.setInstanceName("instanceName2");
+
+        producer1.start();
+        producer2.start();
+
+        ProduceAccumulator producer1Accumulator = getField(producer1, 
"produceAccumulator", ProduceAccumulator.class);
+        ProduceAccumulator producer2Accumulator = getField(producer2, 
"produceAccumulator", ProduceAccumulator.class);
+
+        assertNotNull(producer1Accumulator);
+        assertNotNull(producer2Accumulator);
+
+        assertNotEquals(producer1Accumulator, producer2Accumulator);
+    }
+
+    @Test
+    public void assertProduceAccumulatorInstanceEqual() throws 
NoSuchFieldException, IllegalAccessException, MQClientException {
+        String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer1 = new 
DefaultMQProducer(producerGroupTemp1);
+        producer1.setInstanceName("equalInstance");
+        String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer2 = new 
DefaultMQProducer(producerGroupTemp2);
+        producer2.setInstanceName("equalInstance");
+
+        producer1.start();
+        producer2.start();
+
+        ProduceAccumulator producer1Accumulator = getField(producer1, 
"produceAccumulator", ProduceAccumulator.class);
+        ProduceAccumulator producer2Accumulator = getField(producer2, 
"produceAccumulator", ProduceAccumulator.class);
+
+        assertNotNull(producer1Accumulator);
+        assertNotNull(producer2Accumulator);
+
+        assertEquals(producer1Accumulator, producer2Accumulator);
+    }
+
+    @Test
+    public void assertProduceAccumulatorInstanceAndUnitNameEqual() throws 
NoSuchFieldException, IllegalAccessException, MQClientException {
+        String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer1 = new 
DefaultMQProducer(producerGroupTemp1);
+        producer1.setInstanceName("equalInstance");
+        producer1.setUnitName("equalUnitName");
+        String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
+        DefaultMQProducer producer2 = new 
DefaultMQProducer(producerGroupTemp2);
+        producer2.setInstanceName("equalInstance");
+        producer2.setUnitName("equalUnitName");
+
+        producer1.start();
+        producer2.start();
+
+        ProduceAccumulator producer1Accumulator = getField(producer1, 
"produceAccumulator", ProduceAccumulator.class);
+        ProduceAccumulator producer2Accumulator = getField(producer2, 
"produceAccumulator", ProduceAccumulator.class);
+
+        assertNotNull(producer1Accumulator);
+        assertNotNull(producer2Accumulator);
+
+        assertEquals(producer1Accumulator, producer2Accumulator);
+    }
+
     @Test
     public void assertGetRetryResponseCodes() {
         assertNotNull(producer.getRetryResponseCodes());
@@ -875,4 +1006,11 @@ public class DefaultMQProducerTest {
         field.setAccessible(true);
         field.set(target, newValue);
     }
+
+    private <T> T getField(final Object target, final String fieldName, final 
Class<T> fieldClassType) throws NoSuchFieldException, IllegalAccessException {
+        Class<?> targetClazz = target.getClass();
+        Field field = targetClazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return fieldClassType.cast(field.get(target));
+    }
 }

Reply via email to