This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 11f0002a3c [ISSUE #8806] fix autoBatch bug when connecting multiple
RocketMQ clusters. (#8807)
11f0002a3c is described below
commit 11f0002a3c85ae449d72f911b03b7b74e275b265
Author: luozongle01 <[email protected]>
AuthorDate: Fri Oct 11 15:29:18 2024 +0800
[ISSUE #8806] fix autoBatch bug when connecting multiple RocketMQ clusters.
(#8807)
---
.../impl/producer/DefaultMQProducerImpl.java | 2 +
.../client/producer/DefaultMQProducer.java | 55 +++++--
.../client/producer/DefaultMQProducerTest.java | 168 +++++++++++++++++++--
3 files changed, 196 insertions(+), 29 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 74a2516174..3d4fdbec37 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -250,6 +250,8 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
this.mQClientFactory =
MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer,
rpcHook);
+ defaultMQProducer.initProduceAccumulator();
+
boolean registerOK =
mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(),
this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index f0842de8ba..a8bf7cee85 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -174,6 +174,21 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
*/
private int backPressureForAsyncSendSize = 100 * 1024 * 1024;
+ /**
+ * Maximum hold time of accumulator.
+ */
+ private int batchMaxDelayMs = -1;
+
+ /**
+ * Maximum accumulation message body size for a single messageAccumulation.
+ */
+ private long batchMaxBytes = -1;
+
+ /**
+ * Maximum message body size for produceAccumulator.
+ */
+ private long totalBatchMaxBytes = -1;
+
private RPCHook rpcHook = null;
/**
@@ -293,7 +308,6 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
/**
@@ -320,7 +334,6 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
/**
@@ -1168,10 +1181,10 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
}
public void batchMaxDelayMs(int holdMs) {
- if (this.produceAccumulator == null) {
- throw new UnsupportedOperationException("The currently constructed
producer does not support autoBatch");
+ this.batchMaxDelayMs = holdMs;
+ if (this.produceAccumulator != null) {
+ this.produceAccumulator.batchMaxDelayMs(holdMs);
}
- this.produceAccumulator.batchMaxDelayMs(holdMs);
}
public long getBatchMaxBytes() {
@@ -1182,10 +1195,10 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
}
public void batchMaxBytes(long holdSize) {
- if (this.produceAccumulator == null) {
- throw new UnsupportedOperationException("The currently constructed
producer does not support autoBatch");
+ this.batchMaxBytes = holdSize;
+ if (this.produceAccumulator != null) {
+ this.produceAccumulator.batchMaxBytes(holdSize);
}
- this.produceAccumulator.batchMaxBytes(holdSize);
}
public long getTotalBatchMaxBytes() {
@@ -1196,10 +1209,10 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
}
public void totalBatchMaxBytes(long totalHoldSize) {
- if (this.produceAccumulator == null) {
- throw new UnsupportedOperationException("The currently constructed
producer does not support autoBatch");
+ this.totalBatchMaxBytes = totalHoldSize;
+ if (this.produceAccumulator != null) {
+ this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
}
- this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
}
public boolean getAutoBatch() {
@@ -1210,9 +1223,6 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
}
public void setAutoBatch(boolean autoBatch) {
- if (this.produceAccumulator == null) {
- throw new UnsupportedOperationException("The currently constructed
producer does not support autoBatch");
- }
this.autoBatch = autoBatch;
}
@@ -1439,4 +1449,21 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
public Compressor getCompressor() {
return compressor;
}
+
+ public void initProduceAccumulator() {
+ this.produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+
+ if (this.batchMaxDelayMs > -1) {
+ this.produceAccumulator.batchMaxDelayMs(this.batchMaxDelayMs);
+ }
+
+ if (this.batchMaxBytes > -1) {
+ this.produceAccumulator.batchMaxBytes(this.batchMaxBytes);
+ }
+
+ if (this.totalBatchMaxBytes > -1) {
+
this.produceAccumulator.totalBatchMaxBytes(this.totalBatchMaxBytes);
+ }
+
+ }
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 4cf899f970..33cf0df390 100644
---
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -68,6 +68,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -659,9 +660,9 @@ public class DefaultMQProducerTest {
assertNotNull(producer1);
assertEquals(producerGroupTemp, producer1.getProducerGroup());
assertNotNull(producer1.getDefaultMQProducerImpl());
- assertTrue(producer1.getTotalBatchMaxBytes() > 0);
- assertTrue(producer1.getBatchMaxBytes() > 0);
- assertTrue(producer1.getBatchMaxDelayMs() > 0);
+ assertEquals(0, producer1.getTotalBatchMaxBytes());
+ assertEquals(0, producer1.getBatchMaxBytes());
+ assertEquals(0, producer1.getBatchMaxDelayMs());
assertNull(producer1.getTopics());
assertFalse(producer1.isEnableTrace());
assertTrue(UtilAll.isBlank(producer1.getTraceTopic()));
@@ -669,9 +670,9 @@ public class DefaultMQProducerTest {
assertNotNull(producer2);
assertEquals(producerGroupTemp, producer2.getProducerGroup());
assertNotNull(producer2.getDefaultMQProducerImpl());
- assertTrue(producer2.getTotalBatchMaxBytes() > 0);
- assertTrue(producer2.getBatchMaxBytes() > 0);
- assertTrue(producer2.getBatchMaxDelayMs() > 0);
+ assertEquals(0, producer2.getTotalBatchMaxBytes());
+ assertEquals(0, producer2.getBatchMaxBytes());
+ assertEquals(0, producer2.getBatchMaxDelayMs());
assertNull(producer2.getTopics());
assertFalse(producer2.isEnableTrace());
assertTrue(UtilAll.isBlank(producer2.getTraceTopic()));
@@ -679,9 +680,9 @@ public class DefaultMQProducerTest {
assertNotNull(producer3);
assertEquals(producerGroupTemp, producer3.getProducerGroup());
assertNotNull(producer3.getDefaultMQProducerImpl());
- assertTrue(producer3.getTotalBatchMaxBytes() > 0);
- assertTrue(producer3.getBatchMaxBytes() > 0);
- assertTrue(producer3.getBatchMaxDelayMs() > 0);
+ assertEquals(0, producer3.getTotalBatchMaxBytes());
+ assertEquals(0, producer3.getBatchMaxBytes());
+ assertEquals(0, producer3.getBatchMaxDelayMs());
assertNotNull(producer3.getTopics());
assertEquals(1, producer3.getTopics().size());
assertFalse(producer3.isEnableTrace());
@@ -690,9 +691,9 @@ public class DefaultMQProducerTest {
assertNotNull(producer4);
assertEquals(producerGroupTemp, producer4.getProducerGroup());
assertNotNull(producer4.getDefaultMQProducerImpl());
- assertTrue(producer4.getTotalBatchMaxBytes() > 0);
- assertTrue(producer4.getBatchMaxBytes() > 0);
- assertTrue(producer4.getBatchMaxDelayMs() > 0);
+ assertEquals(0, producer4.getTotalBatchMaxBytes());
+ assertEquals(0, producer4.getBatchMaxBytes());
+ assertEquals(0, producer4.getBatchMaxDelayMs());
assertNull(producer4.getTopics());
assertTrue(producer4.isEnableTrace());
assertEquals("custom_trace_topic", producer4.getTraceTopic());
@@ -700,9 +701,9 @@ public class DefaultMQProducerTest {
assertNotNull(producer5);
assertEquals(producerGroupTemp, producer5.getProducerGroup());
assertNotNull(producer5.getDefaultMQProducerImpl());
- assertTrue(producer5.getTotalBatchMaxBytes() > 0);
- assertTrue(producer5.getBatchMaxBytes() > 0);
- assertTrue(producer5.getBatchMaxDelayMs() > 0);
+ assertEquals(0, producer5.getTotalBatchMaxBytes());
+ assertEquals(0, producer5.getBatchMaxBytes());
+ assertEquals(0, producer5.getBatchMaxDelayMs());
assertNotNull(producer5.getTopics());
assertEquals(1, producer5.getTopics().size());
assertTrue(producer5.isEnableTrace());
@@ -810,6 +811,136 @@ public class DefaultMQProducerTest {
assertEquals(0L, producer.getTotalBatchMaxBytes());
}
+ @Test
+ public void assertProduceAccumulatorStart() throws NoSuchFieldException,
IllegalAccessException, MQClientException {
+ String producerGroupTemp = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
+ assertEquals(0, producer.getTotalBatchMaxBytes());
+ assertEquals(0, producer.getBatchMaxBytes());
+ assertEquals(0, producer.getBatchMaxDelayMs());
+ assertNull(getField(producer, "produceAccumulator",
ProduceAccumulator.class));
+ producer.start();
+ assertTrue(producer.getTotalBatchMaxBytes() > 0);
+ assertTrue(producer.getBatchMaxBytes() > 0);
+ assertTrue(producer.getBatchMaxDelayMs() > 0);
+ assertNotNull(getField(producer, "produceAccumulator",
ProduceAccumulator.class));
+ }
+
+ @Test
+ public void assertProduceAccumulatorBeforeStartSet() throws
NoSuchFieldException, IllegalAccessException, MQClientException {
+ String producerGroupTemp = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
+ producer.totalBatchMaxBytes(64 * 1024 * 100);
+ producer.batchMaxBytes(64 * 1024);
+ producer.batchMaxDelayMs(10);
+
+ producer.start();
+ assertEquals(64 * 1024, producer.getBatchMaxBytes());
+ assertEquals(10, producer.getBatchMaxDelayMs());
+ assertNotNull(getField(producer, "produceAccumulator",
ProduceAccumulator.class));
+ }
+
+ @Test
+ public void assertProduceAccumulatorAfterStartSet() throws
NoSuchFieldException, IllegalAccessException, MQClientException {
+ String producerGroupTemp = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
+ producer.start();
+
+ assertNotNull(getField(producer, "produceAccumulator",
ProduceAccumulator.class));
+
+ producer.totalBatchMaxBytes(64 * 1024 * 100);
+ producer.batchMaxBytes(64 * 1024);
+ producer.batchMaxDelayMs(10);
+
+ assertEquals(64 * 1024, producer.getBatchMaxBytes());
+ assertEquals(10, producer.getBatchMaxDelayMs());
+ }
+
+ @Test
+ public void assertProduceAccumulatorUnit() throws NoSuchFieldException,
IllegalAccessException, MQClientException {
+ String producerGroupTemp = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp);
+ producer1.setUnitName("unit1");
+ DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp);
+ producer2.setUnitName("unit2");
+
+ producer1.start();
+ producer2.start();
+
+ ProduceAccumulator producer1Accumulator = getField(producer1,
"produceAccumulator", ProduceAccumulator.class);
+ ProduceAccumulator producer2Accumulator = getField(producer2,
"produceAccumulator", ProduceAccumulator.class);
+
+ assertNotNull(producer1Accumulator);
+ assertNotNull(producer2Accumulator);
+
+ assertNotEquals(producer1Accumulator, producer2Accumulator);
+ }
+
+ @Test
+ public void assertProduceAccumulator() throws NoSuchFieldException,
IllegalAccessException, MQClientException {
+ String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer1 = new
DefaultMQProducer(producerGroupTemp1);
+ producer1.setInstanceName("instanceName1");
+ String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer2 = new
DefaultMQProducer(producerGroupTemp2);
+ producer2.setInstanceName("instanceName2");
+
+ producer1.start();
+ producer2.start();
+
+ ProduceAccumulator producer1Accumulator = getField(producer1,
"produceAccumulator", ProduceAccumulator.class);
+ ProduceAccumulator producer2Accumulator = getField(producer2,
"produceAccumulator", ProduceAccumulator.class);
+
+ assertNotNull(producer1Accumulator);
+ assertNotNull(producer2Accumulator);
+
+ assertNotEquals(producer1Accumulator, producer2Accumulator);
+ }
+
+ @Test
+ public void assertProduceAccumulatorInstanceEqual() throws
NoSuchFieldException, IllegalAccessException, MQClientException {
+ String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer1 = new
DefaultMQProducer(producerGroupTemp1);
+ producer1.setInstanceName("equalInstance");
+ String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer2 = new
DefaultMQProducer(producerGroupTemp2);
+ producer2.setInstanceName("equalInstance");
+
+ producer1.start();
+ producer2.start();
+
+ ProduceAccumulator producer1Accumulator = getField(producer1,
"produceAccumulator", ProduceAccumulator.class);
+ ProduceAccumulator producer2Accumulator = getField(producer2,
"produceAccumulator", ProduceAccumulator.class);
+
+ assertNotNull(producer1Accumulator);
+ assertNotNull(producer2Accumulator);
+
+ assertEquals(producer1Accumulator, producer2Accumulator);
+ }
+
+ @Test
+ public void assertProduceAccumulatorInstanceAndUnitNameEqual() throws
NoSuchFieldException, IllegalAccessException, MQClientException {
+ String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer1 = new
DefaultMQProducer(producerGroupTemp1);
+ producer1.setInstanceName("equalInstance");
+ producer1.setUnitName("equalUnitName");
+ String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
+ DefaultMQProducer producer2 = new
DefaultMQProducer(producerGroupTemp2);
+ producer2.setInstanceName("equalInstance");
+ producer2.setUnitName("equalUnitName");
+
+ producer1.start();
+ producer2.start();
+
+ ProduceAccumulator producer1Accumulator = getField(producer1,
"produceAccumulator", ProduceAccumulator.class);
+ ProduceAccumulator producer2Accumulator = getField(producer2,
"produceAccumulator", ProduceAccumulator.class);
+
+ assertNotNull(producer1Accumulator);
+ assertNotNull(producer2Accumulator);
+
+ assertEquals(producer1Accumulator, producer2Accumulator);
+ }
+
@Test
public void assertGetRetryResponseCodes() {
assertNotNull(producer.getRetryResponseCodes());
@@ -875,4 +1006,11 @@ public class DefaultMQProducerTest {
field.setAccessible(true);
field.set(target, newValue);
}
+
+ private <T> T getField(final Object target, final String fieldName, final
Class<T> fieldClassType) throws NoSuchFieldException, IllegalAccessException {
+ Class<?> targetClazz = target.getClass();
+ Field field = targetClazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return fieldClassType.cast(field.get(target));
+ }
}