This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 01cebf6 CAMEL-14692: Add Key_Shared subscription type (#3630)
01cebf6 is described below
commit 01cebf6792accb70743ba2dcbdfe8f4e05f5a90a
Author: William Thompson <[email protected]>
AuthorDate: Fri Mar 13 04:27:27 2020 -0400
CAMEL-14692: Add Key_Shared subscription type (#3630)
* CAMEL-14692: Add Key_Shared subscription type
* CAMEL-14692: Regenerate docs/components
---
.../pulsar/PulsarComponentConfigurer.java | 2 +
.../component/pulsar/PulsarEndpointConfigurer.java | 2 +
.../org/apache/camel/component/pulsar/pulsar.json | 6 ++-
.../src/main/docs/pulsar-component.adoc | 10 ++--
.../component/pulsar/PulsarConfiguration.java | 20 ++++++-
.../camel/component/pulsar/PulsarProducer.java | 3 +-
.../consumers/ConsumerCreationStrategyFactory.java | 2 +
.../utils/consumers/KeySharedConsumerStrategy.java | 63 ++++++++++++++++++++++
.../pulsar/utils/consumers/SubscriptionType.java | 2 +-
.../ConsumerCreationStrategyFactoryTest.java | 9 ++++
.../dsl/PulsarComponentBuilderFactory.java | 19 ++++++-
.../endpoint/dsl/PulsarEndpointBuilderFactory.java | 39 ++++++++++++--
12 files changed, 161 insertions(+), 16 deletions(-)
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
index bcb5191..5b07ccf 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
@@ -32,6 +32,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "autoConfiguration":
target.setAutoConfiguration(property(camelContext,
org.apache.camel.component.pulsar.utils.AutoConfiguration.class, value));
return true;
case "basicpropertybinding":
case "basicPropertyBinding":
target.setBasicPropertyBinding(property(camelContext, boolean.class, value));
return true;
+ case "batcherbuilder":
+ case "batcherBuilder":
getOrCreateConfiguration(target).setBatcherBuilder(property(camelContext,
org.apache.pulsar.client.api.BatcherBuilder.class, value)); return true;
case "batchingenabled":
case "batchingEnabled":
getOrCreateConfiguration(target).setBatchingEnabled(property(camelContext,
boolean.class, value)); return true;
case "batchingmaxmessages":
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index 6bcf208..4e4895d 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -23,6 +23,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "allowManualAcknowledgement":
target.getPulsarConfiguration().setAllowManualAcknowledgement(property(camelContext,
boolean.class, value)); return true;
case "basicpropertybinding":
case "basicPropertyBinding":
target.setBasicPropertyBinding(property(camelContext, boolean.class, value));
return true;
+ case "batcherbuilder":
+ case "batcherBuilder":
target.getPulsarConfiguration().setBatcherBuilder(property(camelContext,
org.apache.pulsar.client.api.BatcherBuilder.class, value)); return true;
case "batchingenabled":
case "batchingEnabled":
target.getPulsarConfiguration().setBatchingEnabled(property(camelContext,
boolean.class, value)); return true;
case "batchingmaxmessages":
diff --git
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index c202ca0..2bdec63 100644
---
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -33,8 +33,9 @@
"numberOfConsumers": { "kind": "property", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": "1", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Number of consumers - defaults to 1" },
"subscriptionInitialPosition": { "kind": "property", "displayName":
"Subscription Initial Position", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
"enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false,
"defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", [...]
"subscriptionName": { "kind": "property", "displayName": "Subscription
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"defaultValue": "subs", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Name of the subscription to use" },
- "subscriptionType": { "kind": "property", "displayName": "Subscription
Type", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionType", "enum": [
"EXCLUSIVE", "SHARED", "FAILOVER" ], "deprecated": false, "secret": false,
"defaultValue": "EXCLUSIVE", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "T [...]
+ "subscriptionType": { "kind": "property", "displayName": "Subscription
Type", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionType", "enum": [
"EXCLUSIVE", "SHARED", "FAILOVER", "KEY_SHARED" ], "deprecated": false,
"secret": false, "defaultValue": "EXCLUSIVE", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "de [...]
"pulsarMessageReceiptFactory": { "kind": "property", "displayName":
"Pulsar Message Receipt Factory", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.component.pulsar.PulsarMessageReceiptFactory", "deprecated":
false, "secret": false, "description": "Provide a factory to create an
alternate implementation of PulsarMessageReceipt." },
+ "batcherBuilder": { "kind": "property", "displayName": "Batcher Builder",
"group": "producer", "label": "producer", "required": false, "type": "object",
"javaType": "org.apache.pulsar.client.api.BatcherBuilder", "deprecated": false,
"secret": false, "defaultValue": "DEFAULT", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Control batching method used by the producer."
},
"batchingEnabled": { "kind": "property", "displayName": "Batching
Enabled", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": "true", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Control whether automatic batching of messages
is enabled for the producer." },
"batchingMaxMessages": { "kind": "property", "displayName": "Batching Max
Messages", "group": "producer", "label": "producer", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": "1000", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "The maximum size to batch messages." },
"batchingMaxPublishDelayMicros": { "kind": "property", "displayName":
"Batching Max Publish Delay Micros", "group": "producer", "label": "producer",
"required": false, "type": "integer", "javaType": "long", "deprecated": false,
"secret": false, "defaultValue": "1000", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "The maximum time period within which the
messages sent will be batched if batchingEna [...]
@@ -70,9 +71,10 @@
"numberOfConsumers": { "kind": "parameter", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": "1", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
"subscriptionInitialPosition": { "kind": "parameter", "displayName":
"Subscription Initial Position", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
"enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false,
"defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfigur [...]
"subscriptionName": { "kind": "parameter", "displayName": "Subscription
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"defaultValue": "subs", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Name of the subscription to use" },
- "subscriptionType": { "kind": "parameter", "displayName": "Subscription
Type", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionType", "enum": [
"EXCLUSIVE", "SHARED", "FAILOVER" ], "deprecated": false, "secret": false,
"defaultValue": "EXCLUSIVE", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "descripti [...]
+ "subscriptionType": { "kind": "parameter", "displayName": "Subscription
Type", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionType", "enum": [
"EXCLUSIVE", "SHARED", "FAILOVER", "KEY_SHARED" ], "deprecated": false,
"secret": false, "defaultValue": "EXCLUSIVE", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguratio [...]
"exceptionHandler": { "kind": "parameter", "displayName": "Exception
Handler", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "secret": false, "description": "To let the consumer use a
custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled
then this option is not in use. By default the consumer will deal with [...]
"exchangePattern": { "kind": "parameter", "displayName": "Exchange
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut",
"InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
+ "batcherBuilder": { "kind": "parameter", "displayName": "Batcher Builder",
"group": "producer", "label": "producer", "required": false, "type": "object",
"javaType": "org.apache.pulsar.client.api.BatcherBuilder", "deprecated": false,
"secret": false, "defaultValue": "DEFAULT", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Control batching method used by the
producer." },
"batchingEnabled": { "kind": "parameter", "displayName": "Batching
Enabled", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": "true", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Control whether automatic batching of
messages is enabled for the producer." },
"batchingMaxMessages": { "kind": "parameter", "displayName": "Batching Max
Messages", "group": "producer", "label": "producer", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": "1000", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "The maximum size to batch messages." },
"batchingMaxPublishDelayMicros": { "kind": "parameter", "displayName":
"Batching Max Publish Delay Micros", "group": "producer", "label": "producer",
"required": false, "type": "integer", "javaType": "long", "deprecated": false,
"secret": false, "defaultValue": "1000", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "The maximum time period within which the
messages sent will be batched if batc [...]
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index e9de8d2..8f78530 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -31,7 +31,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
// component options: START
-The Pulsar component supports 32 options, which are listed below.
+The Pulsar component supports 33 options, which are listed below.
@@ -52,8 +52,9 @@ The Pulsar component supports 32 options, which are listed
below.
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. The value
can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
-| *subscriptionType* (consumer) | Type of the subscription
EXCLUSIVESHAREDFAILOVER, defaults to EXCLUSIVE. The value can be one of:
EXCLUSIVE, SHARED, FAILOVER | EXCLUSIVE | SubscriptionType
+| *subscriptionType* (consumer) | Type of the subscription
EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults to EXCLUSIVE. The value can be one
of: EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED | EXCLUSIVE | SubscriptionType
| *pulsarMessageReceiptFactory* (consumer) | Provide a factory to create an
alternate implementation of PulsarMessageReceipt. | |
PulsarMessageReceiptFactory
+| *batcherBuilder* (producer) | Control batching method used by the producer.
| DEFAULT | BatcherBuilder
| *batchingEnabled* (producer) | Control whether automatic batching of
messages is enabled for the producer. | true | boolean
| *batchingMaxMessages* (producer) | The maximum size to batch messages. |
1000 | int
| *batchingMaxPublishDelayMicros* (producer) | The maximum time period within
which the messages sent will be batched if batchingEnabled is true. | 1000 |
long
@@ -99,7 +100,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (31 parameters):
+=== Query Parameters (32 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -118,9 +119,10 @@ with the following path and query parameters:
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. The value
can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
-| *subscriptionType* (consumer) | Type of the subscription
EXCLUSIVESHAREDFAILOVER, defaults to EXCLUSIVE. The value can be one of:
EXCLUSIVE, SHARED, FAILOVER | EXCLUSIVE | SubscriptionType
+| *subscriptionType* (consumer) | Type of the subscription
EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults to EXCLUSIVE. The value can be one
of: EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED | EXCLUSIVE | SubscriptionType
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. The value can be one of: InOnly, InOut, InOptionalOut | |
ExchangePattern
+| *batcherBuilder* (producer) | Control batching method used by the producer.
| DEFAULT | BatcherBuilder
| *batchingEnabled* (producer) | Control whether automatic batching of
messages is enabled for the producer. | true | boolean
| *batchingMaxMessages* (producer) | The maximum size to batch messages. |
1000 | int
| *batchingMaxPublishDelayMicros* (producer) | The maximum time period within
which the messages sent will be batched if batchingEnabled is true. | 1000 |
long
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index ad7d011..615e9b7 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -23,6 +23,7 @@ import
org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosi
import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -76,6 +77,8 @@ public class PulsarConfiguration implements Cloneable {
private int batchingMaxMessages = 1000;
@UriParam(label = "producer", description = "Control whether automatic
batching of messages is enabled for the producer.", defaultValue = "true")
private boolean batchingEnabled = true;
+ @UriParam(label = "producer", description = "Control batching method used
by the producer.", defaultValue = "DEFAULT")
+ private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
@UriParam(label = "producer", description = "The first message published
will have a sequence Id of initialSequenceId 1.", defaultValue = "-1")
private long initialSequenceId = -1;
@UriParam(label = "producer", description = "Compression type to use",
defaultValue = "NONE")
@@ -113,7 +116,7 @@ public class PulsarConfiguration implements Cloneable {
}
/**
- * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to
+ * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER|KEY_SHARED],
defaults to
* EXCLUSIVE
*/
public void setSubscriptionType(SubscriptionType subscriptionType) {
@@ -308,6 +311,21 @@ public class PulsarConfiguration implements Cloneable {
}
/**
+ * Control batching method of the Pulsar producer.
+ * KEY_BASED batches based on the Pulsar message key.
+ * DEFAULT batches all messages together regardless of key;
+ * this may cause only a single consumer to work when consuming using a
KEY_SHARED subscription.
+ * Default is DEFAULT.
+ */
+ public void setBatcherBuilder(BatcherBuilder batcherBuilder) {
+ this.batcherBuilder = batcherBuilder;
+ }
+
+ public BatcherBuilder getBatcherBuilder() {
+ return batcherBuilder;
+ }
+
+ /**
* Control the initial position in the topic of a newly created
subscription. Default is latest message.
*/
public void setSubscriptionInitialPosition(SubscriptionInitialPosition
subscriptionInitialPosition) {
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index c372fdbc..399ed59 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -88,7 +88,8 @@ public class PulsarProducer extends DefaultProducer {
.sendTimeout(configuration.getSendTimeoutMs(),
TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull())
.maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
.batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(),
TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages())
-
.enableBatching(configuration.isBatchingEnabled()).initialSequenceId(configuration.getInitialSequenceId()).compressionType(configuration.getCompressionType());
+
.enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder())
+
.initialSequenceId(configuration.getInitialSequenceId()).compressionType(configuration.getCompressionType());
if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
producerBuilder.messageRouter(configuration.getMessageRouter());
} else {
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactory.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactory.java
index 90bfd4f..fd3e2d7 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactory.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactory.java
@@ -47,6 +47,8 @@ public final class ConsumerCreationStrategyFactory {
return new ExclusiveConsumerStrategy(pulsarConsumer);
case FAILOVER:
return new FailoverConsumerStrategy(pulsarConsumer);
+ case KEY_SHARED:
+ return new KeySharedConsumerStrategy(pulsarConsumer);
default:
return new ExclusiveConsumerStrategy(pulsarConsumer);
}
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/KeySharedConsumerStrategy.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/KeySharedConsumerStrategy.java
new file mode 100644
index 0000000..2a14f85
--- /dev/null
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/KeySharedConsumerStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar.utils.consumers;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.camel.component.pulsar.PulsarConfiguration;
+import org.apache.camel.component.pulsar.PulsarConsumer;
+import org.apache.camel.component.pulsar.PulsarEndpoint;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KeySharedConsumerStrategy implements ConsumerCreationStrategy {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KeySharedConsumerStrategy.class);
+
+ private final PulsarConsumer pulsarConsumer;
+
+ KeySharedConsumerStrategy(PulsarConsumer pulsarConsumer) {
+ this.pulsarConsumer = pulsarConsumer;
+ }
+
+ @Override
+ public Collection<Consumer<byte[]>> create(final PulsarEndpoint
pulsarEndpoint) {
+ return createMultipleConsumers(pulsarEndpoint);
+ }
+
+ private Collection<Consumer<byte[]>> createMultipleConsumers(final
PulsarEndpoint pulsarEndpoint) {
+ final Collection<Consumer<byte[]>> consumers = new LinkedList<>();
+ final PulsarConfiguration configuration =
pulsarEndpoint.getPulsarConfiguration();
+
+ for (int i = 0; i < configuration.getNumberOfConsumers(); i++) {
+ final String consumerName = configuration.getConsumerNamePrefix()
+ i;
+ try {
+ ConsumerBuilder<byte[]> builder =
CommonCreationStrategyImpl.create(consumerName, pulsarEndpoint, pulsarConsumer);
+
+
consumers.add(builder.subscriptionType(SubscriptionType.Key_Shared).subscribe());
+ } catch (PulsarClientException exception) {
+ LOGGER.error("A PulsarClientException occurred when creating
Consumer {}, {}", consumerName, exception);
+ }
+ }
+ return consumers;
+ }
+}
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/SubscriptionType.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/SubscriptionType.java
index 5543946..800abb7 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/SubscriptionType.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/SubscriptionType.java
@@ -17,5 +17,5 @@
package org.apache.camel.component.pulsar.utils.consumers;
public enum SubscriptionType {
- EXCLUSIVE, SHARED, FAILOVER
+ EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED
}
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactoryTest.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactoryTest.java
index 01ba587..de45ca7 100644
---
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactoryTest.java
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/consumers/ConsumerCreationStrategyFactoryTest.java
@@ -65,6 +65,15 @@ public class ConsumerCreationStrategyFactoryTest {
}
@Test
+ public void verifyKeySharedStrategy() {
+ ConsumerCreationStrategyFactory factory =
ConsumerCreationStrategyFactory.create(mock(PulsarConsumer.class));
+
+ ConsumerCreationStrategy strategy =
factory.getStrategy(SubscriptionType.KEY_SHARED);
+
+ assertEquals(KeySharedConsumerStrategy.class, strategy.getClass());
+ }
+
+ @Test
public void verifyDefaultStrategyIsExclusiveStrategy() {
ConsumerCreationStrategyFactory factory =
ConsumerCreationStrategyFactory.create(mock(PulsarConsumer.class));
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
index 5881705..49920a7 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
@@ -248,8 +248,8 @@ public interface PulsarComponentBuilderFactory {
return this;
}
/**
- * Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to
- * EXCLUSIVE.
+ * Type of the subscription EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults
+ * to EXCLUSIVE.
*
* The option is a:
*
<code>org.apache.camel.component.pulsar.utils.consumers.SubscriptionType</code>
type.
@@ -277,6 +277,20 @@ public interface PulsarComponentBuilderFactory {
return this;
}
/**
+ * Control batching method used by the producer.
+ *
+ * The option is a:
+ * <code>org.apache.pulsar.client.api.BatcherBuilder</code> type.
+ *
+ * Default: DEFAULT
+ * Group: producer
+ */
+ default PulsarComponentBuilder batcherBuilder(
+ org.apache.pulsar.client.api.BatcherBuilder batcherBuilder) {
+ doSetProperty("batcherBuilder", batcherBuilder);
+ return this;
+ }
+ /**
* Control whether automatic batching of messages is enabled for the
* producer.
*
@@ -538,6 +552,7 @@ public interface PulsarComponentBuilderFactory {
case "subscriptionName":
getOrCreateConfiguration((PulsarComponent)
component).setSubscriptionName((java.lang.String) value); return true;
case "subscriptionType":
getOrCreateConfiguration((PulsarComponent)
component).setSubscriptionType((org.apache.camel.component.pulsar.utils.consumers.SubscriptionType)
value); return true;
case "pulsarMessageReceiptFactory": ((PulsarComponent)
component).setPulsarMessageReceiptFactory((org.apache.camel.component.pulsar.PulsarMessageReceiptFactory)
value); return true;
+ case "batcherBuilder": getOrCreateConfiguration((PulsarComponent)
component).setBatcherBuilder((org.apache.pulsar.client.api.BatcherBuilder)
value); return true;
case "batchingEnabled": getOrCreateConfiguration((PulsarComponent)
component).setBatchingEnabled((boolean) value); return true;
case "batchingMaxMessages":
getOrCreateConfiguration((PulsarComponent)
component).setBatchingMaxMessages((int) value); return true;
case "batchingMaxPublishDelayMicros":
getOrCreateConfiguration((PulsarComponent)
component).setBatchingMaxPublishDelayMicros((long) value); return true;
diff --git
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index d5e78fe..e11c7c3 100644
---
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -362,8 +362,8 @@ public interface PulsarEndpointBuilderFactory {
return this;
}
/**
- * Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to
- * EXCLUSIVE.
+ * Type of the subscription EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults
+ * to EXCLUSIVE.
*
* The option is a:
*
<code>org.apache.camel.component.pulsar.utils.consumers.SubscriptionType</code>
type.
@@ -377,8 +377,8 @@ public interface PulsarEndpointBuilderFactory {
return this;
}
/**
- * Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to
- * EXCLUSIVE.
+ * Type of the subscription EXCLUSIVESHAREDFAILOVERKEY_SHARED, defaults
+ * to EXCLUSIVE.
*
* The option will be converted to a
*
<code>org.apache.camel.component.pulsar.utils.consumers.SubscriptionType</code>
type.
@@ -527,6 +527,34 @@ public interface PulsarEndpointBuilderFactory {
return (AdvancedPulsarEndpointProducerBuilder) this;
}
/**
+ * Control batching method used by the producer.
+ *
+ * The option is a:
+ * <code>org.apache.pulsar.client.api.BatcherBuilder</code> type.
+ *
+ * Default: DEFAULT
+ * Group: producer
+ */
+ default PulsarEndpointProducerBuilder batcherBuilder(
+ Object batcherBuilder) {
+ doSetProperty("batcherBuilder", batcherBuilder);
+ return this;
+ }
+ /**
+ * Control batching method used by the producer.
+ *
+ * The option will be converted to a
+ * <code>org.apache.pulsar.client.api.BatcherBuilder</code> type.
+ *
+ * Default: DEFAULT
+ * Group: producer
+ */
+ default PulsarEndpointProducerBuilder batcherBuilder(
+ String batcherBuilder) {
+ doSetProperty("batcherBuilder", batcherBuilder);
+ return this;
+ }
+ /**
* Control whether automatic batching of messages is enabled for the
* producer.
*
@@ -1045,7 +1073,8 @@ public interface PulsarEndpointBuilderFactory {
enum SubscriptionType {
EXCLUSIVE,
SHARED,
- FAILOVER;
+ FAILOVER,
+ KEY_SHARED;
}
/**