This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ab6b232c897 CAMEL-18863: camel-pulsar - Support chunking to enable 
sending a large message (#8987)
ab6b232c897 is described below

commit ab6b232c897b8a6bc1aaa3310f0400e39e7d3095
Author: Kengo Seki <[email protected]>
AuthorDate: Thu Jan 5 18:11:26 2023 +0900

    CAMEL-18863: camel-pulsar - Support chunking to enable sending a large 
message (#8987)
    
    Currently, Pulsar producer can't send a message larger than maxMessageSize.
    This PR enables the producer to chunk such a message for overcoming this 
limitation.
---
 .../component/pulsar/PulsarConfiguration.java      | 14 +++++++++
 .../camel/component/pulsar/PulsarProducer.java     |  3 +-
 .../pulsar/integration/PulsarProducerInIT.java     | 35 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index fb5695ce294..d1ae8dc7f93 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -120,6 +120,9 @@ public class PulsarConfiguration implements Cloneable {
     private long initialSequenceId = -1;
     @UriParam(label = "producer", description = "Compression type to use", 
defaultValue = "NONE")
     private CompressionType compressionType = CompressionType.NONE;
+    @UriParam(label = "producer", description = "Control whether chunking of 
messages is enabled for the producer.",
+              defaultValue = "false")
+    private boolean chunkingEnabled;
     @UriParam(label = "producer", description = "Message Routing Mode to use", 
defaultValue = "RoundRobinPartition")
     private MessageRoutingMode messageRoutingMode = 
MessageRoutingMode.RoundRobinPartition;
     @UriParam(label = "producer", description = "Custom Message Router to use")
@@ -431,6 +434,17 @@ public class PulsarConfiguration implements Cloneable {
         return compressionType;
     }
 
+    /**
+     * Control whether chunking of messages is enabled for the producer. 
Default is false.
+     */
+    public void setChunkingEnabled(boolean chunkingEnabled) {
+        this.chunkingEnabled = chunkingEnabled;
+    }
+
+    public boolean isChunkingEnabled() {
+        return chunkingEnabled;
+    }
+
     /**
      * Set the message routing mode for the producer.
      */
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 100340eef2e..bc28f747efc 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -132,7 +132,8 @@ public class PulsarProducer extends DefaultAsyncProducer {
                         
.batchingMaxMessages(configuration.getMaxPendingMessages())
                         
.enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
                         
.initialSequenceId(configuration.getInitialSequenceId())
-                        .compressionType(configuration.getCompressionType());
+                        .compressionType(configuration.getCompressionType())
+                        .enableChunking(configuration.isChunkingEnabled());
                 if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) 
{
                     
producerBuilder.messageRouter(configuration.getMessageRouter());
                 } else {
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerInIT.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerInIT.java
index 77c1ae52fef..8790d8f8c0d 100644
--- 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerInIT.java
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarProducerInIT.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.pulsar.integration;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Produce;
@@ -28,11 +29,16 @@ import org.apache.camel.component.pulsar.PulsarComponent;
 import org.apache.camel.component.pulsar.utils.AutoConfiguration;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.support.SimpleRegistry;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class PulsarProducerInIT extends PulsarITSupport {
 
     private static final String TOPIC_URI = 
"persistent://public/default/camel-producer-topic";
@@ -41,14 +47,26 @@ public class PulsarProducerInIT extends PulsarITSupport {
     @Produce("direct:start")
     private ProducerTemplate producerTemplate;
 
+    @Produce("direct:start1")
+    private ProducerTemplate producerTemplate1;
+
     @EndpointInject("pulsar:" + TOPIC_URI + 
"?numberOfConsumers=1&subscriptionType=Exclusive"
                     + 
"&subscriptionName=camel-subscription&consumerQueueSize=1"
                     + "&consumerName=camel-consumer" + "&producerName=" + 
PRODUCER)
     private Endpoint from;
 
+    @EndpointInject("pulsar:" + TOPIC_URI + 
"1?numberOfConsumers=1&subscriptionType=Exclusive"
+                    + 
"&subscriptionName=camel-subscription&consumerQueueSize=1"
+                    + "&batchingEnabled=false" + "&chunkingEnabled=true"
+                    + "&consumerName=camel-consumer" + "&producerName=" + 
PRODUCER)
+    private Endpoint from1;
+
     @EndpointInject("mock:result")
     private MockEndpoint to;
 
+    @EndpointInject("mock:result1")
+    private MockEndpoint to1;
+
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
@@ -57,6 +75,9 @@ public class PulsarProducerInIT extends PulsarITSupport {
             public void configure() {
                 from("direct:start").to(from);
                 from(from).to(to);
+
+                from("direct:start1").to(from1);
+                from(from1).to(to1);
             }
         };
     }
@@ -96,4 +117,18 @@ public class PulsarProducerInIT extends PulsarITSupport {
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
     }
+
+    @Test
+    public void testLargeMessageWithChunkingDisabled() {
+        Throwable e = assertThrows(CamelExecutionException.class,
+                () -> producerTemplate.sendBody(new byte[10 * 1024 * 1024]));
+        assertTrue(ExceptionUtils.getThrowableList(e).stream().anyMatch(ex -> 
ex instanceof InvalidMessageException));
+    }
+
+    @Test
+    public void testLargeMessageWithChunkingEnabled() throws Exception {
+        to1.expectedMessageCount(1);
+        producerTemplate1.sendBody(new byte[10 * 1024 * 1024]);
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to1);
+    }
 }

Reply via email to