This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new c804906 Upgrade Apache Pulsar
c804906 is described below
commit c80490699134378c49611f66275cba890779b05d
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri May 31 07:19:43 2019 +0200
Upgrade Apache Pulsar
---
.../camel/component/pulsar/PulsarComponent.java | 14 +++-----------
.../pulsar/configuration/PulsarConfiguration.java | 4 ++--
.../camel/component/pulsar/utils/PulsarUtils.java | 20 +++++++++++++++++---
.../camel/component/pulsar/PulsarTestSupport.java | 2 +-
parent/pom.xml | 4 ++--
5 files changed, 25 insertions(+), 19 deletions(-)
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
index e726203..e02de5c 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
@@ -30,19 +30,11 @@ import org.apache.pulsar.client.api.PulsarClient;
@Component("pulsar")
public class PulsarComponent extends DefaultComponent {
- @Metadata
+ @Metadata(label = "advanced")
private AutoConfiguration autoConfiguration;
- @Metadata
+ @Metadata(label = "advanced")
private PulsarClient pulsarClient;
- public PulsarComponent() {
- this(null);
- }
-
- public PulsarComponent(CamelContext context) {
- super(context);
- }
-
@Override
protected Endpoint createEndpoint(final String uri, final String path,
final Map<String, Object> parameters) throws Exception {
final PulsarConfiguration configuration = new PulsarConfiguration();
@@ -64,7 +56,7 @@ public class PulsarComponent extends DefaultComponent {
}
/**
- * The pulsar autoconfiguration
+ * The pulsar auto configuration
*/
public void setAutoConfiguration(AutoConfiguration autoConfiguration) {
this.autoConfiguration = autoConfiguration;
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 5e76604..8834164 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -25,9 +25,9 @@ import static
org.apache.camel.component.pulsar.utils.consumers.SubscriptionType
@UriParams
public class PulsarConfiguration {
- @UriParam(label = "consumer", defaultValue = "subscription")
+ @UriParam(label = "consumer", defaultValue = "subs")
private String subscriptionName = "subs";
- @UriParam(label = "consumer", enums = "EXCLUSIVE, SHARED, FAILOVER",
defaultValue = "EXCLUSIVE")
+ @UriParam(label = "consumer", defaultValue = "EXCLUSIVE")
private SubscriptionType subscriptionType = EXCLUSIVE;
@UriParam(label = "consumer", defaultValue = "1")
private int numberOfConsumers = 1;
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java
index a4bd1b3..7698152 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java
@@ -18,11 +18,16 @@ package org.apache.camel.component.pulsar.utils;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class PulsarUtils {
-
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarUtils.class);
+
private PulsarUtils() {
}
@@ -30,8 +35,17 @@ public final class PulsarUtils {
while (!consumers.isEmpty()) {
Consumer<byte[]> consumer = consumers.poll();
if (consumer != null) {
- consumer.unsubscribe();
- consumer.close();
+ try {
+ consumer.unsubscribe();
+ consumer.close();
+ } catch (Exception e) {
+ // ignore during stopping
+ if (e instanceof
PulsarClientException.AlreadyClosedException) {
+ // ignore
+ } else {
+ LOG.debug("Error stopping consumer: " + consumer + "
due to " + e.getMessage() + ". This exception is ignored", e);
+ }
+ }
}
}
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
index 1826a2c..ad84334 100644
---
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
@@ -22,7 +22,7 @@ import org.testcontainers.containers.GenericContainer;
public class PulsarTestSupport extends ContainerAwareTestSupport {
- public static final String CONTAINER_IMAGE = "apachepulsar/pulsar:2.2.0";
+ public static final String CONTAINER_IMAGE = "apachepulsar/pulsar:2.3.1";
public static final String CONTAINER_NAME = "pulsar";
public static final int BROKER_PORT = 6650;
public static final int BROKER_HTTP_PORT = 8080;
diff --git a/parent/pom.xml b/parent/pom.xml
index f9ebf8a..9a16eaee 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -562,8 +562,8 @@
<protobuf-maven-plugin-version>0.5.1</protobuf-maven-plugin-version>
<protonpack-version>1.8</protonpack-version>
<pubnub-version>4.21.0</pubnub-version>
- <pulsar-version>2.2.1</pulsar-version>
- <pulsar-bundle-version>2.2.1_1</pulsar-bundle-version>
+ <pulsar-version>2.3.1</pulsar-version>
+ <pulsar-bundle-version>2.3.1_1</pulsar-bundle-version>
<qpid-broker-version>7.1.3</qpid-broker-version>
<qpid-proton-j-version>0.33.0</qpid-proton-j-version>
<qpid-jms-client-version>0.42.0</qpid-jms-client-version>