This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ab6b232c897 CAMEL-18863: camel-pulsar - Support chunking to enable
sending a large message (#8987)
ab6b232c897 is described below
commit ab6b232c897b8a6bc1aaa3310f0400e39e7d3095
Author: Kengo Seki <[email protected]>
AuthorDate: Thu Jan 5 18:11:26 2023 +0900
CAMEL-18863: camel-pulsar - Support chunking to enable sending a large
message (#8987)
Currently, Pulsar producer can't send a message larger than maxMessageSize.
This PR enables the producer to chunk such a message for overcoming this
limitation.
---
.../component/pulsar/PulsarConfiguration.java | 14 +++++++++
.../camel/component/pulsar/PulsarProducer.java | 3 +-
.../pulsar/integration/PulsarProducerInIT.java | 35 ++++++++++++++++++++++
3 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index fb5695ce294..d1ae8dc7f93 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -120,6 +120,9 @@ public class PulsarConfiguration implements Cloneable {
private long initialSequenceId = -1;
@UriParam(label = "producer", description = "Compression type to use",
defaultValue = "NONE")
private CompressionType compressionType = CompressionType.NONE;
+ @UriParam(label = "producer", description = "Control whether chunking of
messages is enabled for the producer.",
+ defaultValue = "false")
+ private boolean chunkingEnabled;
@UriParam(label = "producer", description = "Message Routing Mode to use",
defaultValue = "RoundRobinPartition")
private MessageRoutingMode messageRoutingMode =
MessageRoutingMode.RoundRobinPartition;
@UriParam(label = "producer", description = "Custom Message Router to use")
@@ -431,6 +434,17 @@ public class PulsarConfiguration implements Cloneable {
return compressionType;
}
+ /**
+ * Control whether chunking of messages is enabled for the producer.
Default is false.
+ */
+ public void setChunkingEnabled(boolean chunkingEnabled) {
+ this.chunkingEnabled = chunkingEnabled;
+ }
+
+ public boolean isChunkingEnabled() {
+ return chunkingEnabled;
+ }
+
/**
* Set the message routing mode for the producer.
*/
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 100340eef2e..bc28f747efc 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -132,7 +132,8 @@ public class PulsarProducer extends DefaultAsyncProducer {
.batchingMaxMessages(configuration.getMaxPendingMessages())
.enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
.initialSequenceId(configuration.getInitialSequenceId())
- .compressionType(configuration.getCompressionType());
+ .compressionType(configuration.getCompressionType())
+ .enableChunking(configuration.isChunkingEnabled());
if (ObjectHelper.isNotEmpty(configuration.getMessageRouter()))
{
producerBuilder.messageRouter(configuration.getMessageRouter());
} else {
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerInIT.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerInIT.java
index 77c1ae52fef..8790d8f8c0d 100644
---
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerInIT.java
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerInIT.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.pulsar.integration;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
@@ -28,11 +29,16 @@ import org.apache.camel.component.pulsar.PulsarComponent;
import org.apache.camel.component.pulsar.utils.AutoConfiguration;
import org.apache.camel.spi.Registry;
import org.apache.camel.support.SimpleRegistry;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.junit.jupiter.api.Test;
+import static
org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class PulsarProducerInIT extends PulsarITSupport {
private static final String TOPIC_URI =
"persistent://public/default/camel-producer-topic";
@@ -41,14 +47,26 @@ public class PulsarProducerInIT extends PulsarITSupport {
@Produce("direct:start")
private ProducerTemplate producerTemplate;
+ @Produce("direct:start1")
+ private ProducerTemplate producerTemplate1;
+
@EndpointInject("pulsar:" + TOPIC_URI +
"?numberOfConsumers=1&subscriptionType=Exclusive"
+
"&subscriptionName=camel-subscription&consumerQueueSize=1"
+ "&consumerName=camel-consumer" + "&producerName=" +
PRODUCER)
private Endpoint from;
+ @EndpointInject("pulsar:" + TOPIC_URI +
"1?numberOfConsumers=1&subscriptionType=Exclusive"
+ +
"&subscriptionName=camel-subscription&consumerQueueSize=1"
+ + "&batchingEnabled=false" + "&chunkingEnabled=true"
+ + "&consumerName=camel-consumer" + "&producerName=" +
PRODUCER)
+ private Endpoint from1;
+
@EndpointInject("mock:result")
private MockEndpoint to;
+ @EndpointInject("mock:result1")
+ private MockEndpoint to1;
+
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@@ -57,6 +75,9 @@ public class PulsarProducerInIT extends PulsarITSupport {
public void configure() {
from("direct:start").to(from);
from(from).to(to);
+
+ from("direct:start1").to(from1);
+ from(from1).to(to1);
}
};
}
@@ -96,4 +117,18 @@ public class PulsarProducerInIT extends PulsarITSupport {
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
}
+
+ @Test
+ public void testLargeMessageWithChunkingDisabled() {
+ Throwable e = assertThrows(CamelExecutionException.class,
+ () -> producerTemplate.sendBody(new byte[10 * 1024 * 1024]));
+ assertTrue(ExceptionUtils.getThrowableList(e).stream().anyMatch(ex ->
ex instanceof InvalidMessageException));
+ }
+
+ @Test
+ public void testLargeMessageWithChunkingEnabled() throws Exception {
+ to1.expectedMessageCount(1);
+ producerTemplate1.sendBody(new byte[10 * 1024 * 1024]);
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to1);
+ }
}