This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new ae66435 Add module for Jackson (#9)
ae66435 is described below
commit ae664352afc6936ede65dee0f901713f788a6207
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Sep 26 11:18:42 2022 +0200
Add module for Jackson (#9)
---
checkstyle/checkstyle.xml | 1 +
gradle/libs.versions.toml | 3 +
.../api/ImmutableReactiveMessageConsumerSpec.java | 47 ++
.../api/ImmutableReactiveMessageReaderSpec.java | 15 +
.../api/ImmutableReactiveMessageSenderSpec.java | 38 ++
pulsar-client-reactive-jackson/build.gradle | 17 +
.../pulsar/reactive/client/jackson/ClassConf.java | 31 +-
.../reactive/client/jackson/ConverterUtils.java | 131 +++++
.../client/jackson/DeadLetterPolicyConf.java | 62 +++
.../ImmutableReactiveMessageConsumerSpecMixin.java | 76 +++
.../ImmutableReactiveMessageReaderSpecMixin.java | 44 ++
.../ImmutableReactiveMessageSenderSpecMixin.java | 68 +++
.../MutableReactiveMessageConsumerSpecMixin.java | 12 +-
.../MutableReactiveMessageReaderSpecMixin.java | 12 +-
.../MutableReactiveMessageSenderSpecMixin.java | 12 +-
.../client/jackson/PulsarReactiveClientModule.java | 182 +++++++
.../pulsar/reactive/client/jackson/RangeConf.java | 29 +-
.../jackson/PulsarReactiveClientModuleTest.java | 554 +++++++++++++++++++++
settings.gradle | 1 +
19 files changed, 1315 insertions(+), 20 deletions(-)
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 1ad50d8..7534cfd 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -3,6 +3,7 @@
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">
<module name="com.puppycrawl.tools.checkstyle.Checker">
+ <module name="SuppressWithPlainTextCommentFilter"/>
<module name="io.spring.javaformat.checkstyle.SpringChecks">
<property name="excludes"
value="com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocPackageCheck" />
</module>
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index d4defec..dd0a6f8 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -8,6 +8,7 @@ assertj = "3.23.1"
testcontainers = "1.17.3"
jctools = "3.3.0"
caffeine = "2.9.3"
+jackson = "2.13.4"
checkstyle = '8.45.1'
spring-javaformat = '0.0.34'
licenser = "0.6.1"
@@ -27,6 +28,8 @@ slf4j-api = { module = "org.slf4j:slf4j-api", version.ref =
"slf4j" }
testcontainers-pulsar = { module = "org.testcontainers:pulsar", version.ref =
"testcontainers" }
jctools-core = { module = "org.jctools:jctools-core", version.ref = "jctools" }
caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref =
"caffeine" }
+jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind",
version.ref = "jackson" }
+jackson-datatype-jsr310 = { module =
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref =
"jackson" }
spring-javaformat-checkstyle = { module =
"io.spring.javaformat:spring-javaformat-checkstyle", version.ref =
"spring-javaformat" }
spring-javaformat-gradle-plugin = { module =
"io.spring.javaformat:spring-javaformat-gradle-plugin", version.ref =
"spring-javaformat" }
licenser = { module = "gradle.plugin.org.cadixdev.gradle:licenser",
version.ref = "licenser" }
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
index 96da0c3..4f4878e 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
@@ -168,6 +168,53 @@ public class ImmutableReactiveMessageConsumerSpec
implements ReactiveMessageCons
this.expireTimeOfIncompleteChunkedMessage =
consumerSpec.getExpireTimeOfIncompleteChunkedMessage();
}
+ public ImmutableReactiveMessageConsumerSpec(List<String> topicNames,
Pattern topicsPattern,
+ RegexSubscriptionMode topicsPatternSubscriptionMode,
Duration topicsPatternAutoDiscoveryPeriod,
+ String subscriptionName, SubscriptionMode
subscriptionMode, SubscriptionType subscriptionType,
+ KeySharedPolicy keySharedPolicy, Boolean
replicateSubscriptionState,
+ Map<String, String> subscriptionProperties, String
consumerName, Map<String, String> properties,
+ Integer priorityLevel, Boolean readCompacted, Boolean
batchIndexAckEnabled, Duration ackTimeout,
+ Duration ackTimeoutTickTime, Duration
acknowledgementsGroupTime, Boolean acknowledgeAsynchronously,
+ Scheduler acknowledgeScheduler, Duration
negativeAckRedeliveryDelay, DeadLetterPolicy deadLetterPolicy,
+ Boolean retryLetterTopicEnable, Integer
receiverQueueSize,
+ Integer maxTotalReceiverQueueSizeAcrossPartitions,
Boolean autoUpdatePartitions,
+ Duration autoUpdatePartitionsInterval, CryptoKeyReader
cryptoKeyReader,
+ ConsumerCryptoFailureAction cryptoFailureAction,
Integer maxPendingChunkedMessage,
+ Boolean autoAckOldestChunkedMessageOnQueueFull,
Duration expireTimeOfIncompleteChunkedMessage) {
+ this.topicNames = topicNames;
+ this.topicsPattern = topicsPattern;
+ this.topicsPatternSubscriptionMode =
topicsPatternSubscriptionMode;
+ this.topicsPatternAutoDiscoveryPeriod =
topicsPatternAutoDiscoveryPeriod;
+ this.subscriptionName = subscriptionName;
+ this.subscriptionMode = subscriptionMode;
+ this.subscriptionType = subscriptionType;
+ this.keySharedPolicy = keySharedPolicy;
+ this.replicateSubscriptionState = replicateSubscriptionState;
+ this.subscriptionProperties = subscriptionProperties;
+ this.consumerName = consumerName;
+ this.properties = properties;
+ this.priorityLevel = priorityLevel;
+ this.readCompacted = readCompacted;
+ this.batchIndexAckEnabled = batchIndexAckEnabled;
+ this.ackTimeout = ackTimeout;
+ this.ackTimeoutTickTime = ackTimeoutTickTime;
+ this.acknowledgementsGroupTime = acknowledgementsGroupTime;
+ this.acknowledgeAsynchronously = acknowledgeAsynchronously;
+ this.acknowledgeScheduler = acknowledgeScheduler;
+ this.negativeAckRedeliveryDelay = negativeAckRedeliveryDelay;
+ this.deadLetterPolicy = deadLetterPolicy;
+ this.retryLetterTopicEnable = retryLetterTopicEnable;
+ this.receiverQueueSize = receiverQueueSize;
+ this.maxTotalReceiverQueueSizeAcrossPartitions =
maxTotalReceiverQueueSizeAcrossPartitions;
+ this.autoUpdatePartitions = autoUpdatePartitions;
+ this.autoUpdatePartitionsInterval =
autoUpdatePartitionsInterval;
+ this.cryptoKeyReader = cryptoKeyReader;
+ this.cryptoFailureAction = cryptoFailureAction;
+ this.maxPendingChunkedMessage = maxPendingChunkedMessage;
+ this.autoAckOldestChunkedMessageOnQueueFull =
autoAckOldestChunkedMessageOnQueueFull;
+ this.expireTimeOfIncompleteChunkedMessage =
expireTimeOfIncompleteChunkedMessage;
+ }
+
public List<String> getTopicNames() {
return this.topicNames;
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageReaderSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageReaderSpec.java
index 6d75992..b89536a 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageReaderSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageReaderSpec.java
@@ -44,6 +44,21 @@ public class ImmutableReactiveMessageReaderSpec implements
ReactiveMessageReader
private final ConsumerCryptoFailureAction cryptoFailureAction;
+ public ImmutableReactiveMessageReaderSpec(List<String> topicNames,
String readerName, String subscriptionName,
+ String generatedSubscriptionNamePrefix, Integer
receiverQueueSize, Boolean readCompacted,
+ List<Range> keyHashRanges, CryptoKeyReader
cryptoKeyReader,
+ ConsumerCryptoFailureAction cryptoFailureAction) {
+ this.topicNames = topicNames;
+ this.readerName = readerName;
+ this.subscriptionName = subscriptionName;
+ this.generatedSubscriptionNamePrefix =
generatedSubscriptionNamePrefix;
+ this.receiverQueueSize = receiverQueueSize;
+ this.readCompacted = readCompacted;
+ this.keyHashRanges = keyHashRanges;
+ this.cryptoKeyReader = cryptoKeyReader;
+ this.cryptoFailureAction = cryptoFailureAction;
+ }
+
public ImmutableReactiveMessageReaderSpec(ReactiveMessageReaderSpec
readerSpec) {
this.topicNames = (readerSpec.getTopicNames() != null &&
!readerSpec.getTopicNames().isEmpty())
? Collections.unmodifiableList(new
ArrayList<>(readerSpec.getTopicNames())) : null;
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageSenderSpec.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageSenderSpec.java
index d70b847..8b01a2d 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageSenderSpec.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageSenderSpec.java
@@ -86,6 +86,44 @@ public class ImmutableReactiveMessageSenderSpec implements
ReactiveMessageSender
private final Map<String, String> properties;
+ public ImmutableReactiveMessageSenderSpec(String topicName, String
producerName, Duration sendTimeout,
+ Integer maxPendingMessages, Integer
maxPendingMessagesAcrossPartitions,
+ MessageRoutingMode messageRoutingMode, HashingScheme
hashingScheme,
+ ProducerCryptoFailureAction cryptoFailureAction,
MessageRouter messageRouter,
+ Duration batchingMaxPublishDelay, Integer
roundRobinRouterBatchingPartitionSwitchFrequency,
+ Integer batchingMaxMessages, Integer batchingMaxBytes,
Boolean batchingEnabled,
+ BatcherBuilder batcherBuilder, Boolean chunkingEnabled,
CryptoKeyReader cryptoKeyReader,
+ Set<String> encryptionKeys, CompressionType
compressionType, Long initialSequenceId,
+ Boolean autoUpdatePartitions, Duration
autoUpdatePartitionsInterval, Boolean multiSchema,
+ ProducerAccessMode accessMode, Boolean
lazyStartPartitionedProducers, Map<String, String> properties) {
+ this.topicName = topicName;
+ this.producerName = producerName;
+ this.sendTimeout = sendTimeout;
+ this.maxPendingMessages = maxPendingMessages;
+ this.maxPendingMessagesAcrossPartitions =
maxPendingMessagesAcrossPartitions;
+ this.messageRoutingMode = messageRoutingMode;
+ this.hashingScheme = hashingScheme;
+ this.cryptoFailureAction = cryptoFailureAction;
+ this.messageRouter = messageRouter;
+ this.batchingMaxPublishDelay = batchingMaxPublishDelay;
+ this.roundRobinRouterBatchingPartitionSwitchFrequency =
roundRobinRouterBatchingPartitionSwitchFrequency;
+ this.batchingMaxMessages = batchingMaxMessages;
+ this.batchingMaxBytes = batchingMaxBytes;
+ this.batchingEnabled = batchingEnabled;
+ this.batcherBuilder = batcherBuilder;
+ this.chunkingEnabled = chunkingEnabled;
+ this.cryptoKeyReader = cryptoKeyReader;
+ this.encryptionKeys = encryptionKeys;
+ this.compressionType = compressionType;
+ this.initialSequenceId = initialSequenceId;
+ this.autoUpdatePartitions = autoUpdatePartitions;
+ this.autoUpdatePartitionsInterval =
autoUpdatePartitionsInterval;
+ this.multiSchema = multiSchema;
+ this.accessMode = accessMode;
+ this.lazyStartPartitionedProducers =
lazyStartPartitionedProducers;
+ this.properties = properties;
+ }
+
public ImmutableReactiveMessageSenderSpec(ReactiveMessageSenderSpec
senderSpec) {
this.topicName = senderSpec.getTopicName();
this.producerName = senderSpec.getProducerName();
diff --git a/pulsar-client-reactive-jackson/build.gradle
b/pulsar-client-reactive-jackson/build.gradle
new file mode 100644
index 0000000..538af59
--- /dev/null
+++ b/pulsar-client-reactive-jackson/build.gradle
@@ -0,0 +1,17 @@
+plugins {
+ id 'pulsar-client-reactive.codestyle-conventions'
+ id 'pulsar-client-reactive.library-conventions'
+ id 'pulsar-client-reactive.integration-test-conventions'
+}
+
+dependencies {
+ api project(':pulsar-client-reactive-api')
+ api libs.jackson.databind
+ api libs.jackson.datatype.jsr310
+
+ testImplementation libs.pulsar.client.shaded
+ testImplementation libs.junit.jupiter
+ testImplementation libs.assertj.core
+}
+
+description = "Jackson module for the reactive client configuration"
diff --git a/settings.gradle
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ClassConf.java
similarity index 58%
copy from settings.gradle
copy to
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ClassConf.java
index 221fd1c..a18c178 100644
--- a/settings.gradle
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ClassConf.java
@@ -14,8 +14,31 @@
* limitations under the License.
*/
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+class ClassConf {
+
+ private String className;
+
+ private Map<String, Object> args;
+
+ String getClassName() {
+ return this.className;
+ }
+
+ void setClassName(String className) {
+ this.className = className;
+ }
+
+ Map<String, Object> getArgs() {
+ return this.args;
+ }
+
+ void setArgs(Map<String, Object> args) {
+ this.args = args;
+ }
+
+}
diff --git
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
new file mode 100644
index 0000000..864d8a3
--- /dev/null
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.Range;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+abstract class ConverterUtils {
+
+ static DeadLetterPolicy toDeadLetterPolicy(DeadLetterPolicyConf
deadLetterPolicy) {
+ if (deadLetterPolicy == null) {
+ return null;
+ }
+ return
DeadLetterPolicy.builder().maxRedeliverCount(deadLetterPolicy.getMaxRedeliverCount())
+
.deadLetterTopic(deadLetterPolicy.getDeadLetterTopic())
+
.retryLetterTopic(deadLetterPolicy.getRetryLetterTopic())
+
.initialSubscriptionName(deadLetterPolicy.getInitialSubscriptionName()).build();
+ }
+
+ static <T> T toClass(ClassConf conf) {
+ if (conf == null) {
+ return null;
+ }
+ Class<?> klass;
+ try {
+ klass = loadClass(conf.getClassName(),
Thread.currentThread().getContextClassLoader());
+ }
+ catch (ClassNotFoundException ex) {
+ throw new RuntimeException(String.format("Failed to
load class %s", conf.getClassName()));
+ }
+
+ try {
+ if (conf.getArgs() != null) {
+ Constructor<?> ctor =
klass.getConstructor(Map.class);
+ return (T) ctor.newInstance(conf.getArgs());
+ }
+ else {
+ Constructor<?> ctor = klass.getConstructor();
+ return (T) ctor.newInstance();
+ }
+ }
+ catch (NoSuchMethodException ex) {
+ throw new RuntimeException(
+ String.format("Class %s does not have a
no-arg constructor or a constructor that accepts map",
+ klass.getName()),
+ ex);
+ }
+ catch (IllegalAccessException | InstantiationException |
InvocationTargetException ex) {
+ throw new RuntimeException(String.format("Failed to
create instance for %s", klass.getName()), ex);
+ }
+ }
+
+ static KeySharedPolicy toKeySharedPolicy(String keySharedPolicy) {
+ if (keySharedPolicy == null) {
+ return null;
+ }
+ switch (keySharedPolicy) {
+ case "AUTO_SPLIT":
+ return KeySharedPolicy.autoSplitHashRange();
+ case "STICKY":
+ return KeySharedPolicy.stickyHashRange();
+ default:
+ throw new IllegalArgumentException("Unsupported
keySharedPolicy: " + keySharedPolicy);
+ }
+ }
+
+ static Scheduler toScheduler(String scheduler) {
+ if (scheduler == null) {
+ return null;
+ }
+ switch (scheduler) {
+ case "parallel":
+ return Schedulers.parallel();
+ case "single":
+ return Schedulers.single();
+ case "boundedElastic":
+ return Schedulers.boundedElastic();
+ case "elastic":
+ return Schedulers.elastic();
+ case "immediate":
+ return Schedulers.immediate();
+ default:
+ throw new IllegalArgumentException("Unsupported
scheduler: " + scheduler);
+ }
+ }
+
+ static Range toRange(RangeConf rangeConf) {
+ if (rangeConf == null) {
+ return null;
+ }
+ return new Range(rangeConf.getStart(), rangeConf.getEnd());
+ }
+
+ static Class<?> loadClass(String className, ClassLoader classLoader)
throws ClassNotFoundException {
+ Class<?> objectClass;
+ try {
+ objectClass = Class.forName(className);
+ }
+ catch (ClassNotFoundException | NoClassDefFoundError ex) {
+ if (classLoader != null) {
+ objectClass = classLoader.loadClass(className);
+ }
+ else {
+ throw ex;
+ }
+ }
+ return objectClass;
+ }
+
+}
diff --git
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/DeadLetterPolicyConf.java
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/DeadLetterPolicyConf.java
new file mode 100644
index 0000000..1260b0b
--- /dev/null
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/DeadLetterPolicyConf.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+@SuppressWarnings("unused")
+class DeadLetterPolicyConf {
+
+ private int maxRedeliverCount;
+
+ private String retryLetterTopic;
+
+ private String deadLetterTopic;
+
+ private String initialSubscriptionName;
+
+ int getMaxRedeliverCount() {
+ return this.maxRedeliverCount;
+ }
+
+ void setMaxRedeliverCount(int maxRedeliverCount) {
+ this.maxRedeliverCount = maxRedeliverCount;
+ }
+
+ String getRetryLetterTopic() {
+ return this.retryLetterTopic;
+ }
+
+ void setRetryLetterTopic(String retryLetterTopic) {
+ this.retryLetterTopic = retryLetterTopic;
+ }
+
+ String getDeadLetterTopic() {
+ return this.deadLetterTopic;
+ }
+
+ void setDeadLetterTopic(String deadLetterTopic) {
+ this.deadLetterTopic = deadLetterTopic;
+ }
+
+ String getInitialSubscriptionName() {
+ return this.initialSubscriptionName;
+ }
+
+ void setInitialSubscriptionName(String initialSubscriptionName) {
+ this.initialSubscriptionName = initialSubscriptionName;
+ }
+
+}
diff --git
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
new file mode 100644
index 0000000..878cca7
--- /dev/null
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
+import reactor.core.scheduler.Scheduler;
+
+@JsonDeserialize(as = ImmutableReactiveMessageConsumerSpec.class)
+abstract class ImmutableReactiveMessageConsumerSpecMixin {
+
+ @SuppressWarnings("unused")
+ @JsonCreator
+ ImmutableReactiveMessageConsumerSpecMixin(@JsonProperty("topicNames")
List<String> topicNames,
+ @JsonProperty("topicsPattern") Pattern topicsPattern,
+ @JsonProperty("topicsPatternSubscriptionMode")
RegexSubscriptionMode topicsPatternSubscriptionMode,
+ @JsonProperty("topicsPatternAutoDiscoveryPeriod")
Duration topicsPatternAutoDiscoveryPeriod,
+ @JsonProperty("subscriptionName") String
subscriptionName,
+ @JsonProperty("subscriptionMode") SubscriptionMode
subscriptionMode,
+ @JsonProperty("subscriptionType") SubscriptionType
subscriptionType,
+ @JsonProperty("keySharedPolicy") KeySharedPolicy
keySharedPolicy,
+ @JsonProperty("replicateSubscriptionState") Boolean
replicateSubscriptionState,
+ @JsonProperty("subscriptionProperties") Map<String,
String> subscriptionProperties,
+ @JsonProperty("consumerName") String consumerName,
+ @JsonProperty("properties") Map<String, String>
properties,
+ @JsonProperty("priorityLevel") Integer priorityLevel,
@JsonProperty("readCompacted") Boolean readCompacted,
+ @JsonProperty("batchIndexAckEnabled") Boolean
batchIndexAckEnabled,
+ @JsonProperty("ackTimeout") Duration ackTimeout,
+ @JsonProperty("ackTimeoutTickTime") Duration
ackTimeoutTickTime,
+ @JsonProperty("acknowledgementsGroupTime") Duration
acknowledgementsGroupTime,
+ @JsonProperty("acknowledgeAsynchronously") Boolean
acknowledgeAsynchronously,
+ @JsonProperty("acknowledgeScheduler") Scheduler
acknowledgeScheduler,
+ @JsonProperty("negativeAckRedeliveryDelay") Duration
negativeAckRedeliveryDelay,
+ @JsonProperty("deadLetterPolicy") DeadLetterPolicy
deadLetterPolicy,
+ @JsonProperty("retryLetterTopicEnable") Boolean
retryLetterTopicEnable,
+ @JsonProperty("receiverQueueSize") Integer
receiverQueueSize,
+
@JsonProperty("maxTotalReceiverQueueSizeAcrossPartitions") Integer
maxTotalReceiverQueueSizeAcrossPartitions,
+ @JsonProperty("autoUpdatePartitions") Boolean
autoUpdatePartitions,
+ @JsonProperty("autoUpdatePartitionsInterval") Duration
autoUpdatePartitionsInterval,
+ @JsonProperty("cryptoKeyReader") CryptoKeyReader
cryptoKeyReader,
+ @JsonProperty("cryptoFailureAction")
ConsumerCryptoFailureAction cryptoFailureAction,
+ @JsonProperty("maxPendingChunkedMessage") Integer
maxPendingChunkedMessage,
+ @JsonProperty("autoAckOldestChunkedMessageOnQueueFull")
Boolean autoAckOldestChunkedMessageOnQueueFull,
+ @JsonProperty("expireTimeOfIncompleteChunkedMessage")
Duration expireTimeOfIncompleteChunkedMessage) {
+
+ }
+
+}
diff --git
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageReaderSpecMixin.java
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageReaderSpecMixin.java
new file mode 100644
index 0000000..62a8f56
--- /dev/null
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageReaderSpecMixin.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Range;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec;
+
+@JsonDeserialize(as = ImmutableReactiveMessageReaderSpec.class)
+abstract class ImmutableReactiveMessageReaderSpecMixin {
+
+ @SuppressWarnings("unused")
+ @JsonCreator
+ ImmutableReactiveMessageReaderSpecMixin(@JsonProperty("topicNames")
List<String> topicNames,
+ @JsonProperty("readerName") String readerName,
@JsonProperty("subscriptionName") String subscriptionName,
+ @JsonProperty("generatedSubscriptionNamePrefix") String
generatedSubscriptionNamePrefix,
+ @JsonProperty("receiverQueueSize") Integer
receiverQueueSize,
+ @JsonProperty("readCompacted") Boolean readCompacted,
+ @JsonProperty("keyHashRanges") List<Range>
keyHashRanges,
+ @JsonProperty("cryptoKeyReader") CryptoKeyReader
cryptoKeyReader,
+ @JsonProperty("cryptoFailureAction")
ConsumerCryptoFailureAction cryptoFailureAction) {
+ }
+
+}
diff --git
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageSenderSpecMixin.java
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageSenderSpecMixin.java
new file mode 100644
index 0000000..63543ad
--- /dev/null
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageSenderSpecMixin.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
+
+@JsonDeserialize(as = ImmutableReactiveMessageSenderSpec.class)
+abstract class ImmutableReactiveMessageSenderSpecMixin {
+
+ @SuppressWarnings("unused")
+ @JsonCreator
+ ImmutableReactiveMessageSenderSpecMixin(@JsonProperty("topicName")
String topicName,
+ @JsonProperty("producerName") String producerName,
@JsonProperty("sendTimeout") Duration sendTimeout,
+ @JsonProperty("maxPendingMessages") Integer
maxPendingMessages,
+ @JsonProperty("maxPendingMessagesAcrossPartitions")
Integer maxPendingMessagesAcrossPartitions,
+ @JsonProperty("messageRoutingMode") MessageRoutingMode
messageRoutingMode,
+ @JsonProperty("hashingScheme") HashingScheme
hashingScheme,
+ @JsonProperty("cryptoFailureAction")
ProducerCryptoFailureAction cryptoFailureAction,
+ @JsonProperty("messageRouter") MessageRouter
messageRouter,
+ @JsonProperty("batchingMaxPublishDelay") Duration
batchingMaxPublishDelay,
+
@JsonProperty("roundRobinRouterBatchingPartitionSwitchFrequency") Integer
roundRobinRouterBatchingPartitionSwitchFrequency,
+ @JsonProperty("batchingMaxMessages") Integer
batchingMaxMessages,
+ @JsonProperty("batchingMaxBytes") Integer
batchingMaxBytes,
+ @JsonProperty("batchingEnabled") Boolean
batchingEnabled,
+ @JsonProperty("batcherBuilder") BatcherBuilder
batcherBuilder,
+ @JsonProperty("chunkingEnabled") Boolean
chunkingEnabled,
+ @JsonProperty("cryptoKeyReader") CryptoKeyReader
cryptoKeyReader,
+ @JsonProperty("encryptionKeys") Set<String>
encryptionKeys,
+ @JsonProperty("compressionType") CompressionType
compressionType,
+ @JsonProperty("initialSequenceId") Long
initialSequenceId,
+ @JsonProperty("autoUpdatePartitions") Boolean
autoUpdatePartitions,
+ @JsonProperty("autoUpdatePartitionsInterval") Duration
autoUpdatePartitionsInterval,
+ @JsonProperty("multiSchema") Boolean multiSchema,
@JsonProperty("accessMode") ProducerAccessMode accessMode,
+ @JsonProperty("lazyStartPartitionedProducers") Boolean
lazyStartPartitionedProducers,
+ @JsonProperty("properties") Map<String, String>
properties) {
+
+ }
+
+}
diff --git a/settings.gradle
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageConsumerSpecMixin.java
similarity index 66%
copy from settings.gradle
copy to
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageConsumerSpecMixin.java
index 221fd1c..157e65a 100644
--- a/settings.gradle
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageConsumerSpecMixin.java
@@ -14,8 +14,12 @@
* limitations under the License.
*/
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import
org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
+
+@JsonDeserialize(as = MutableReactiveMessageConsumerSpec.class)
+class MutableReactiveMessageConsumerSpecMixin {
+
+}
diff --git a/settings.gradle
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageReaderSpecMixin.java
similarity index 66%
copy from settings.gradle
copy to
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageReaderSpecMixin.java
index 221fd1c..ba2180c 100644
--- a/settings.gradle
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageReaderSpecMixin.java
@@ -14,8 +14,12 @@
* limitations under the License.
*/
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageReaderSpec;
+
+@JsonDeserialize(as = MutableReactiveMessageReaderSpec.class)
+class MutableReactiveMessageReaderSpecMixin {
+
+}
diff --git a/settings.gradle
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageSenderSpecMixin.java
similarity index 66%
copy from settings.gradle
copy to
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageSenderSpecMixin.java
index 221fd1c..1244298 100644
--- a/settings.gradle
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageSenderSpecMixin.java
@@ -14,8 +14,12 @@
* limitations under the License.
*/
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
+
+@JsonDeserialize(as = MutableReactiveMessageSenderSpec.class)
+class MutableReactiveMessageSenderSpecMixin {
+
+}
diff --git
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModule.java
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModule.java
new file mode 100644
index 0000000..4021027
--- /dev/null
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModule.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer;
+import com.fasterxml.jackson.datatype.jsr310.ser.DurationSerializer;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Range;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
+import
org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageReaderSpec;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
+import reactor.core.scheduler.Scheduler;
+
+public class PulsarReactiveClientModule extends SimpleModule {
+
+ public PulsarReactiveClientModule() {
+ super();
+ addDeserializer(KeySharedPolicy.class, new
KeySharedPolicyDeserializer());
+ addSerializer(KeySharedPolicy.class, new
KeySharedPolicySerializer());
+ addDeserializer(Scheduler.class, new SchedulerDeserializer());
+ addSerializer(Scheduler.class, new SchedulerSerializer());
+ addDeserializer(DeadLetterPolicy.class, new
DeadLetterPolicyDeserializer());
+ addDeserializer(CryptoKeyReader.class, new
ClassDeserializer<>());
+ addSerializer(CryptoKeyReader.class, new ClassSerializer<>());
+ addDeserializer(Range.class, new RangeDeserializer());
+ addDeserializer(MessageRouter.class, new ClassDeserializer<>());
+ addSerializer(MessageRouter.class, new ClassSerializer<>());
+ addDeserializer(BatcherBuilder.class, new
ClassDeserializer<>());
+ addDeserializer(Duration.class, DurationDeserializer.INSTANCE);
+ addSerializer(Duration.class, DurationSerializer.INSTANCE);
+ setMixInAnnotation(ReactiveMessageConsumerSpec.class,
ImmutableReactiveMessageConsumerSpecMixin.class);
+ setMixInAnnotation(ImmutableReactiveMessageConsumerSpec.class,
ImmutableReactiveMessageConsumerSpecMixin.class);
+ setMixInAnnotation(MutableReactiveMessageConsumerSpec.class,
MutableReactiveMessageConsumerSpecMixin.class);
+ setMixInAnnotation(ReactiveMessageReaderSpec.class,
ImmutableReactiveMessageReaderSpecMixin.class);
+ setMixInAnnotation(ImmutableReactiveMessageReaderSpec.class,
ImmutableReactiveMessageReaderSpecMixin.class);
+ setMixInAnnotation(MutableReactiveMessageReaderSpec.class,
MutableReactiveMessageReaderSpecMixin.class);
+ setMixInAnnotation(ReactiveMessageSenderSpec.class,
ImmutableReactiveMessageSenderSpecMixin.class);
+ setMixInAnnotation(ImmutableReactiveMessageSenderSpec.class,
ImmutableReactiveMessageSenderSpecMixin.class);
+ setMixInAnnotation(MutableReactiveMessageSenderSpec.class,
MutableReactiveMessageSenderSpecMixin.class);
+ }
+
+ public static class KeySharedPolicyDeserializer extends
JsonDeserializer<KeySharedPolicy> {
+
+ @Override
+ public KeySharedPolicy deserialize(JsonParser p,
DeserializationContext context) throws IOException {
+ return
ConverterUtils.toKeySharedPolicy(p.getValueAsString());
+ }
+
+ }
+
+ public static class KeySharedPolicySerializer extends
JsonSerializer<KeySharedPolicy> {
+
+ @Override
+ public void serialize(KeySharedPolicy keySharedPolicy,
JsonGenerator gen, SerializerProvider serializers)
+ throws IOException {
+ if (keySharedPolicy instanceof
KeySharedPolicy.KeySharedPolicySticky) {
+ gen.writeString("STICKY");
+ }
+ else if (keySharedPolicy instanceof
KeySharedPolicy.KeySharedPolicyAutoSplit) {
+ gen.writeString("AUTO_SPLIT");
+ }
+ else {
+
gen.writeString(keySharedPolicy.getClass().getName());
+ }
+ }
+
+ }
+
+ public static class SchedulerDeserializer extends
JsonDeserializer<Scheduler> {
+
+ @Override
+ public Scheduler deserialize(JsonParser p,
DeserializationContext context) throws IOException {
+ return ConverterUtils.toScheduler(p.getValueAsString());
+ }
+
+ }
+
+ public static class SchedulerSerializer extends
JsonSerializer<Scheduler> {
+
+ @Override
+ public void serialize(Scheduler scheduler, JsonGenerator gen,
SerializerProvider serializers)
+ throws IOException {
+ switch (scheduler.getClass().getName()) {
+ case
"reactor.core.scheduler.BoundedElasticScheduler":
+ gen.writeString("boundedElastic");
+ break;
+ case "reactor.core.scheduler.ParallelScheduler":
+ gen.writeString("parallel");
+ break;
+ case "reactor.core.scheduler.SingleScheduler":
+ gen.writeString("single");
+ break;
+ case
"reactor.core.scheduler.ImmediateScheduler":
+ gen.writeString("immediate");
+ break;
+ case
"reactor.core.scheduler.Schedulers$CachedScheduler":
+
gen.writeString(scheduler.toString().substring("Schedulers.".length(),
+
scheduler.toString().length() - "()".length()));
+ break;
+ default:
+
gen.writeString(scheduler.getClass().getName());
+ }
+ }
+
+ }
+
+ public static class DeadLetterPolicyDeserializer extends
JsonDeserializer<DeadLetterPolicy> {
+
+ @Override
+ public DeadLetterPolicy deserialize(JsonParser p,
DeserializationContext context) throws IOException {
+ DeadLetterPolicyConf deadLetterPolicyConf =
p.readValueAs(DeadLetterPolicyConf.class);
+ return
ConverterUtils.toDeadLetterPolicy(deadLetterPolicyConf);
+ }
+
+ }
+
+ public static class RangeDeserializer extends JsonDeserializer<Range> {
+
+ @Override
+ public Range deserialize(JsonParser p, DeserializationContext
context) throws IOException {
+ RangeConf rangeConf = p.readValueAs(RangeConf.class);
+ return ConverterUtils.toRange(rangeConf);
+ }
+
+ }
+
+ public static class ClassSerializer<T> extends JsonSerializer<T> {
+
+ @Override
+ public void serialize(T value, JsonGenerator gen,
SerializerProvider serializers) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("className",
value.getClass().getName());
+ gen.writeEndObject();
+ }
+
+ }
+
+ public static class ClassDeserializer<T> extends JsonDeserializer<T> {
+
+ @Override
+ public T deserialize(JsonParser p, DeserializationContext
context) throws IOException {
+ ClassConf classConf = p.readValueAs(ClassConf.class);
+ return ConverterUtils.toClass(classConf);
+ }
+
+ }
+
+}
diff --git a/settings.gradle
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/RangeConf.java
similarity index 65%
copy from settings.gradle
copy to
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/RangeConf.java
index 221fd1c..db32526 100644
--- a/settings.gradle
+++
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/RangeConf.java
@@ -14,8 +14,29 @@
* limitations under the License.
*/
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
+@SuppressWarnings("unused")
+class RangeConf {
+
+ private int start;
+
+ private int end;
+
+ int getStart() {
+ return this.start;
+ }
+
+ void setStart(int start) {
+ this.start = start;
+ }
+
+ int getEnd() {
+ return this.end;
+ }
+
+ void setEnd(int end) {
+ this.end = end;
+ }
+
+}
diff --git
a/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
new file mode 100644
index 0000000..d36bea8
--- /dev/null
+++
b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
@@ -0,0 +1,554 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.KeyBasedBatcherBuilder;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec;
+import
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
+import
org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageReaderSpec;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import reactor.core.Disposable;
+import reactor.core.scheduler.Scheduler;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class PulsarReactiveClientModuleTest {
+
+ private static final ObjectMapper MAPPER = new
ObjectMapper().registerModule(new PulsarReactiveClientModule())
+
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ @ParameterizedTest
+ @ValueSource(classes = { ReactiveMessageConsumerSpec.class,
ImmutableReactiveMessageConsumerSpec.class,
+ MutableReactiveMessageConsumerSpec.class })
+ void shouldSerDeserReactiveMessageConsumerSpec(Class<? extends
ReactiveMessageConsumerSpec> klass)
+ throws Exception {
+ // @formatter:off
+ String content = ("{"
+ + "'topicNames': ['my-topic'],"
+ + "'topicsPattern': 'my-topic-*',"
+ + "'topicsPatternSubscriptionMode':
'PersistentOnly',"
+ + "'topicsPatternAutoDiscoveryPeriod': 30,"
+ + "'subscriptionName': 'my-sub',"
+ + "'subscriptionMode': 'Durable',"
+ + "'subscriptionType': 'Exclusive',"
+ + "'keySharedPolicy': 'STICKY',"
+ + "'replicateSubscriptionState': true,"
+ + "'subscriptionProperties': {'my-key':
'my-value'},"
+ + "'consumerName': 'my-consumer',"
+ + "'properties': {'my-key': 'my-value'},"
+ + "'priorityLevel': 42,"
+ + "'readCompacted': true,"
+ + "'batchIndexAckEnabled': true,"
+ + "'ackTimeout': 30,"
+ + "'ackTimeoutTickTime': 30,"
+ + "'acknowledgementsGroupTime': 30,"
+ + "'acknowledgeAsynchronously': true,"
+ + "'acknowledgeScheduler': 'boundedElastic',"
+ + "'negativeAckRedeliveryDelay': 30,"
+ + "'deadLetterPolicy': {"
+ + " 'maxRedeliverCount': 1,"
+ + " 'retryLetterTopic': 'my-retry-topic',"
+ + " 'deadLetterTopic': 'my-dlq',"
+ + " 'initialSubscriptionName': 'my-dlq-sub'"
+ + "},"
+ + "'retryLetterTopicEnable': true,"
+ + "'receiverQueueSize': 42,"
+ + "'maxTotalReceiverQueueSizeAcrossPartitions':
42,"
+ + "'autoUpdatePartitions': true,"
+ + "'autoUpdatePartitionsInterval': 30,"
+ + "'cryptoKeyReader': {"
+ + " 'className':
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader',"
+ + " 'args': {'dummy': 'my-dummy'}"
+ + "},"
+ + "'cryptoFailureAction': 'FAIL',"
+ + "'maxPendingChunkedMessage': 42,"
+ + "'autoAckOldestChunkedMessageOnQueueFull':
true,"
+ + "'expireTimeOfIncompleteChunkedMessage': 30"
+ + "}").replaceAll("'", "\"");
+ // @formatter:on
+
+ ReactiveMessageConsumerSpec spec = MAPPER.readValue(content,
klass);
+
+ assertThat(spec.getTopicNames()).containsExactly("my-topic");
+
assertThat(spec.getTopicsPattern().pattern()).isEqualTo("my-topic-*");
+
assertThat(spec.getTopicsPatternSubscriptionMode()).isEqualTo(RegexSubscriptionMode.PersistentOnly);
+
assertThat(spec.getTopicsPatternAutoDiscoveryPeriod()).hasMillis(30_000);
+ assertThat(spec.getSubscriptionName()).isEqualTo("my-sub");
+
assertThat(spec.getSubscriptionMode()).isEqualTo(SubscriptionMode.Durable);
+
assertThat(spec.getSubscriptionType()).isEqualTo(SubscriptionType.Exclusive);
+
assertThat(spec.getKeySharedPolicy()).isInstanceOf(KeySharedPolicy.KeySharedPolicySticky.class);
+ assertThat(spec.getReplicateSubscriptionState()).isTrue();
+
assertThat(spec.getSubscriptionProperties()).containsOnlyKeys("my-key");
+
assertThat(spec.getSubscriptionProperties()).containsEntry("my-key",
"my-value");
+ assertThat(spec.getConsumerName()).isEqualTo("my-consumer");
+ assertThat(spec.getProperties()).containsOnlyKeys("my-key");
+ assertThat(spec.getProperties()).containsEntry("my-key",
"my-value");
+ assertThat(spec.getPriorityLevel()).isEqualTo(42);
+ assertThat(spec.getReadCompacted()).isTrue();
+ assertThat(spec.getBatchIndexAckEnabled()).isTrue();
+ assertThat(spec.getAckTimeout()).hasMillis(30_000);
+ assertThat(spec.getAckTimeoutTickTime()).hasMillis(30_000);
+
assertThat(spec.getAcknowledgementsGroupTime()).hasMillis(30_000);
+ assertThat(spec.getAcknowledgeAsynchronously()).isTrue();
+
assertThat(spec.getAcknowledgeScheduler().toString()).isEqualTo("Schedulers.boundedElastic()");
+
assertThat(spec.getNegativeAckRedeliveryDelay()).hasMillis(30_000);
+
assertThat(spec.getDeadLetterPolicy().getMaxRedeliverCount()).isEqualTo(1);
+
assertThat(spec.getDeadLetterPolicy().getDeadLetterTopic()).isEqualTo("my-dlq");
+
assertThat(spec.getDeadLetterPolicy().getRetryLetterTopic()).isEqualTo("my-retry-topic");
+
assertThat(spec.getDeadLetterPolicy().getInitialSubscriptionName()).isEqualTo("my-dlq-sub");
+ assertThat(spec.getRetryLetterTopicEnable()).isTrue();
+ assertThat(spec.getReceiverQueueSize()).isEqualTo(42);
+
assertThat(spec.getMaxTotalReceiverQueueSizeAcrossPartitions()).isEqualTo(42);
+ assertThat(spec.getAutoUpdatePartitions()).isTrue();
+
assertThat(spec.getAutoUpdatePartitionsInterval()).hasMillis(30_000);
+
assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class);
+ Map<String, Object> params = ((TestCryptoKeyReader)
spec.getCryptoKeyReader()).params;
+ assertThat(params).containsOnlyKeys("dummy");
+ assertThat(params).containsEntry("dummy", "my-dummy");
+
assertThat(spec.getCryptoFailureAction()).isEqualTo(ConsumerCryptoFailureAction.FAIL);
+ assertThat(spec.getMaxPendingChunkedMessage()).isEqualTo(42);
+
assertThat(spec.getAutoAckOldestChunkedMessageOnQueueFull()).isTrue();
+
assertThat(spec.getExpireTimeOfIncompleteChunkedMessage()).hasMillis(30_000);
+
+ String json =
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec);
+
+ // @formatter:off
+ String expected = ("{\n"
+ + " 'topicNames' : [ 'my-topic' ],\n"
+ + " 'topicsPattern' : 'my-topic-*',\n"
+ + " 'topicsPatternSubscriptionMode' :
'PersistentOnly',\n"
+ + " 'topicsPatternAutoDiscoveryPeriod' :
30.000000000,\n"
+ + " 'subscriptionName' : 'my-sub',\n"
+ + " 'subscriptionMode' : 'Durable',\n"
+ + " 'subscriptionType' : 'Exclusive',\n"
+ + " 'keySharedPolicy' : 'STICKY',\n"
+ + " 'replicateSubscriptionState' : true,\n"
+ + " 'subscriptionProperties' : {\n"
+ + " 'my-key' : 'my-value'\n"
+ + " },\n"
+ + " 'consumerName' : 'my-consumer',\n"
+ + " 'properties' : {\n"
+ + " 'my-key' : 'my-value'\n"
+ + " },\n"
+ + " 'priorityLevel' : 42,\n"
+ + " 'readCompacted' : true,\n"
+ + " 'batchIndexAckEnabled' : true,\n"
+ + " 'ackTimeout' : 30.000000000,\n"
+ + " 'ackTimeoutTickTime' : 30.000000000,\n"
+ + " 'acknowledgementsGroupTime' :
30.000000000,\n"
+ + " 'acknowledgeAsynchronously' : true,\n"
+ + " 'acknowledgeScheduler' :
'boundedElastic',\n"
+ + " 'negativeAckRedeliveryDelay' :
30.000000000,\n"
+ + " 'deadLetterPolicy' : {\n"
+ + " 'maxRedeliverCount' : 1,\n"
+ + " 'retryLetterTopic' : 'my-retry-topic',\n"
+ + " 'deadLetterTopic' : 'my-dlq',\n"
+ + " 'initialSubscriptionName' :
'my-dlq-sub'\n"
+ + " },\n"
+ + " 'retryLetterTopicEnable' : true,\n"
+ + " 'receiverQueueSize' : 42,\n"
+ + "
'maxTotalReceiverQueueSizeAcrossPartitions' : 42,\n"
+ + " 'autoUpdatePartitions' : true,\n"
+ + " 'autoUpdatePartitionsInterval' :
30.000000000,\n"
+ + " 'cryptoKeyReader' : {\n"
+ + " 'className' :
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n"
+ + " },\n"
+ + " 'cryptoFailureAction' : 'FAIL',\n"
+ + " 'maxPendingChunkedMessage' : 42,\n"
+ + " 'autoAckOldestChunkedMessageOnQueueFull' :
true,\n"
+ + " 'expireTimeOfIncompleteChunkedMessage' :
30.000000000\n"
+ + "}").replaceAll("'", "\"");
+ // @formatter:on
+
+ assertThat(json).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {
+ // MutableReactiveMessageConsumerSpec.class,
+ ReactiveMessageConsumerSpec.class,
ImmutableReactiveMessageConsumerSpec.class })
+ void shouldSerDeserEmptyReactiveMessageConsumerSpec(Class<? extends
ReactiveMessageConsumerSpec> klass)
+ throws Exception {
+ String content = "{}";
+ ReactiveMessageConsumerSpec spec = MAPPER.readValue(content,
klass);
+ String json = MAPPER.writeValueAsString(spec);
+ assertThat(json).isEqualTo(content);
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = { ReactiveMessageReaderSpec.class,
ImmutableReactiveMessageReaderSpec.class,
+ MutableReactiveMessageReaderSpec.class })
+ void shouldSerDeserReactiveMessageReaderSpec(Class<? extends
ReactiveMessageReaderSpec> klass) throws Exception {
+ // @formatter:off
+ String content = ("{"
+ + "'topicNames': ['my-topic'],"
+ + "'readerName': 'my-reader',"
+ + "'subscriptionName': 'my-sub',"
+ + "'generatedSubscriptionNamePrefix':
'my-prefix-',"
+ + "'receiverQueueSize': 42,"
+ + "'readCompacted': true,"
+ + "'keyHashRanges': [{"
+ + " 'start': 42,"
+ + " 'end': 43"
+ + "}],"
+ + "'cryptoKeyReader': {"
+ + " 'className':
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader',"
+ + " 'args': {'dummy': 'my-dummy'}"
+ + "},"
+ + "'cryptoFailureAction': 'FAIL'"
+ + "}").replaceAll("'", "\"");
+ // @formatter:on
+
+ ReactiveMessageReaderSpec spec = MAPPER.readValue(content,
klass);
+
+ assertThat(spec.getTopicNames()).containsExactly("my-topic");
+ assertThat(spec.getReaderName()).isEqualTo("my-reader");
+ assertThat(spec.getSubscriptionName()).isEqualTo("my-sub");
+
assertThat(spec.getGeneratedSubscriptionNamePrefix()).isEqualTo("my-prefix-");
+ assertThat(spec.getReceiverQueueSize()).isEqualTo(42);
+ assertThat(spec.getReadCompacted()).isTrue();
+
assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class);
+ assertThat(spec.getKeyHashRanges()).containsExactly(new
Range(42, 43));
+
assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class);
+ Map<String, Object> params = ((TestCryptoKeyReader)
spec.getCryptoKeyReader()).params;
+ assertThat(params).containsOnlyKeys("dummy");
+ assertThat(params).containsEntry("dummy", "my-dummy");
+
assertThat(spec.getCryptoFailureAction()).isEqualTo(ConsumerCryptoFailureAction.FAIL);
+
+ String json =
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec);
+
+ // @formatter:off
+ String expected = ("{\n"
+ + " 'topicNames' : [ 'my-topic' ],\n"
+ + " 'readerName' : 'my-reader',\n"
+ + " 'subscriptionName' : 'my-sub',\n"
+ + " 'generatedSubscriptionNamePrefix' :
'my-prefix-',\n"
+ + " 'receiverQueueSize' : 42,\n"
+ + " 'readCompacted' : true,\n"
+ + " 'keyHashRanges' : [ {\n"
+ + " 'start' : 42,\n"
+ + " 'end' : 43\n"
+ + " } ],\n"
+ + " 'cryptoKeyReader' : {\n"
+ + " 'className' :
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n"
+ + " },\n"
+ + " 'cryptoFailureAction' : 'FAIL'\n"
+ + "}").replaceAll("'", "\"");
+ // @formatter:on
+
+ assertThat(json).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = { MutableReactiveMessageSenderSpec.class,
ReactiveMessageSenderSpec.class,
+ ImmutableReactiveMessageSenderSpec.class })
+ void shouldSerDeserEmptyReactiveMessageSenderSpec(Class<? extends
ReactiveMessageSenderSpec> klass)
+ throws Exception {
+ String content = "{}";
+ ReactiveMessageSenderSpec spec = MAPPER.readValue(content,
klass);
+ String json = MAPPER.writeValueAsString(spec);
+ assertThat(json).isEqualTo(content);
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {
+ // MutableReactiveMessageReaderSpec.class,
+ ReactiveMessageReaderSpec.class,
ImmutableReactiveMessageReaderSpec.class })
+ void shouldSerDeserEmptyReactiveMessageReaderSpec(Class<? extends
ReactiveMessageReaderSpec> klass)
+ throws Exception {
+ String content = "{}";
+ ReactiveMessageReaderSpec spec = MAPPER.readValue(content,
klass);
+ String json = MAPPER.writeValueAsString(spec);
+ assertThat(json).isEqualTo(content);
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = { ReactiveMessageSenderSpec.class,
ImmutableReactiveMessageSenderSpec.class,
+ MutableReactiveMessageSenderSpec.class })
+ void shouldSerDeserReactiveMessageSenderSpec(Class<? extends
ReactiveMessageSenderSpec> klass) throws Exception {
+ // @formatter:off
+ String content = ("{"
+ + "'topicName': 'my-topic',"
+ + "'producerName': 'my-producer',"
+ + "'sendTimeout': 30,"
+ + "'maxPendingMessages': 42,"
+ + "'maxPendingMessagesAcrossPartitions': 42,"
+ + "'messageRoutingMode': 'SinglePartition',"
+ + "'hashingScheme': 'JavaStringHash',"
+ + "'cryptoFailureAction': 'FAIL',"
+ + "'messageRouter': {"
+ + " 'className':
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestMessageRouter',"
+ + " 'args': {'dummy': 'my-dummy'}"
+ + "},"
+ + "'batchingMaxPublishDelay': 30,"
+ +
"'roundRobinRouterBatchingPartitionSwitchFrequency': 42,"
+ + "'batchingMaxMessages': 42,"
+ + "'batchingMaxBytes': 42,"
+ + "'batchingEnabled': true,"
+ + "'batcherBuilder': {"
+ + " 'className':
'org.apache.pulsar.client.impl.KeyBasedBatcherBuilder'"
+ + "},"
+ + "'chunkingEnabled': true,"
+ + "'cryptoKeyReader': {"
+ + " 'className':
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader',"
+ + " 'args': {'dummy': 'my-dummy'}"
+ + "},"
+ + "'encryptionKeys': ['my-encryption-key'],"
+ + "'compressionType': 'LZ4',"
+ + "'initialSequenceId': 42,"
+ + "'autoUpdatePartitions': true,"
+ + "'autoUpdatePartitionsInterval': 30,"
+ + "'multiSchema': true,"
+ + "'accessMode': 'Shared',"
+ + "'lazyStartPartitionedProducers': true,"
+ + "'properties' : {"
+ + " 'my-key' : 'my-value'"
+ + "}"
+ + "}").replaceAll("'", "\"");
+ // @formatter:on
+
+ ReactiveMessageSenderSpec spec = MAPPER.readValue(content,
klass);
+
+ assertThat(spec.getTopicName()).isEqualTo("my-topic");
+ assertThat(spec.getProducerName()).isEqualTo("my-producer");
+ assertThat(spec.getSendTimeout()).hasMillis(30_000);
+ assertThat(spec.getMaxPendingMessages()).isEqualTo(42);
+
assertThat(spec.getMaxPendingMessagesAcrossPartitions()).isEqualTo(42);
+
assertThat(spec.getMessageRoutingMode()).isEqualTo(MessageRoutingMode.SinglePartition);
+
assertThat(spec.getHashingScheme()).isEqualTo(HashingScheme.JavaStringHash);
+
assertThat(spec.getCryptoFailureAction()).isEqualTo(ProducerCryptoFailureAction.FAIL);
+
assertThat(spec.getMessageRouter()).isInstanceOf(TestMessageRouter.class);
+ Map<String, Object> params = ((TestMessageRouter)
spec.getMessageRouter()).params;
+ assertThat(params).containsOnlyKeys("dummy");
+ assertThat(params).containsEntry("dummy", "my-dummy");
+ assertThat(spec.getBatchingMaxPublishDelay()).hasMillis(30_000);
+
assertThat(spec.getRoundRobinRouterBatchingPartitionSwitchFrequency()).isEqualTo(42);
+ assertThat(spec.getBatchingMaxMessages()).isEqualTo(42);
+ assertThat(spec.getBatchingMaxBytes()).isEqualTo(42);
+ assertThat(spec.getBatchingEnabled()).isTrue();
+
assertThat(spec.getBatcherBuilder()).isInstanceOf(KeyBasedBatcherBuilder.class);
+ assertThat(spec.getChunkingEnabled()).isTrue();
+
assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class);
+ params = ((TestCryptoKeyReader)
spec.getCryptoKeyReader()).params;
+ assertThat(params).containsOnlyKeys("dummy");
+ assertThat(params).containsEntry("dummy", "my-dummy");
+
assertThat(spec.getEncryptionKeys()).containsExactly("my-encryption-key");
+
assertThat(spec.getCompressionType()).isEqualTo(CompressionType.LZ4);
+ assertThat(spec.getInitialSequenceId()).isEqualTo(42);
+ assertThat(spec.getAutoUpdatePartitions()).isTrue();
+
assertThat(spec.getAutoUpdatePartitionsInterval()).hasMillis(30_000);
+ assertThat(spec.getMultiSchema()).isTrue();
+
assertThat(spec.getAccessMode()).isEqualTo(ProducerAccessMode.Shared);
+ assertThat(spec.getLazyStartPartitionedProducers()).isTrue();
+ assertThat(spec.getProperties()).containsEntry("my-key",
"my-value");
+
+ String json =
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec);
+
+ // @formatter:off
+ String expected = ("{\n"
+ + " 'topicName' : 'my-topic',\n"
+ + " 'producerName' : 'my-producer',\n"
+ + " 'sendTimeout' : 30.000000000,\n"
+ + " 'maxPendingMessages' : 42,\n"
+ + " 'maxPendingMessagesAcrossPartitions' :
42,\n"
+ + " 'messageRoutingMode' :
'SinglePartition',\n"
+ + " 'hashingScheme' : 'JavaStringHash',\n"
+ + " 'cryptoFailureAction' : 'FAIL',\n"
+ + " 'messageRouter' : {\n"
+ + " 'className' :
'org.apache.pulsar.reactive.client.jackson"
+ +
".PulsarReactiveClientModuleTest$TestMessageRouter'\n"
+ + " },\n"
+ + " 'batchingMaxPublishDelay' :
30.000000000,\n"
+ + "
'roundRobinRouterBatchingPartitionSwitchFrequency' : 42,\n"
+ + " 'batchingMaxMessages' : 42,\n"
+ + " 'batchingMaxBytes' : 42,\n"
+ + " 'batchingEnabled' : true,\n"
+ + " 'batcherBuilder' : { },\n"
+ + " 'chunkingEnabled' : true,\n"
+ + " 'cryptoKeyReader' : {\n"
+ + " 'className' :
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n"
+ + " },\n"
+ + " 'encryptionKeys' : [ 'my-encryption-key'
],\n"
+ + " 'compressionType' : 'LZ4',\n"
+ + " 'initialSequenceId' : 42,\n"
+ + " 'autoUpdatePartitions' : true,\n"
+ + " 'autoUpdatePartitionsInterval' :
30.000000000,\n"
+ + " 'multiSchema' : true,\n"
+ + " 'accessMode' : 'Shared',\n"
+ + " 'lazyStartPartitionedProducers' : true,\n"
+ + " 'properties' : {\n"
+ + " 'my-key' : 'my-value'\n"
+ + " }\n"
+ + "}").replaceAll("'", "\"");
+ // @formatter:on
+
+ assertThat(json).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { "AUTO_SPLIT", "STICKY" })
+ void shouldSerDeserKeySharedPolicy(String keySharedPolicy) throws
Exception {
+ String content = (String.format("\"%s\"", keySharedPolicy));
+ KeySharedPolicy policy = MAPPER.readValue(content,
KeySharedPolicy.class);
+ String json = MAPPER.writeValueAsString(policy);
+ assertThat(json).isEqualTo(content);
+ }
+
+ @Test
+ void shouldSerializeCustomKeySharedPolicy() throws Exception {
+ String json = MAPPER.writeValueAsString(new
TestKeySharedPolicy());
+ String expected =
"\"org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestKeySharedPolicy\"";
+ assertThat(json).isEqualTo(expected);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { "parallel", "elastic", "boundedElastic",
"immediate", "single" })
+ void shouldSerDeserScheduler(String scheduler) throws Exception {
+ String content = (String.format("\"%s\"", scheduler));
+ Scheduler policy = MAPPER.readValue(content, Scheduler.class);
+ String json = MAPPER.writeValueAsString(policy);
+ assertThat(json).isEqualTo(content);
+ }
+
+ @Test
+ void shouldSerializeCustomScheduler() throws Exception {
+ String json = MAPPER.writeValueAsString(new TestScheduler());
+ String expected =
"\"org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestScheduler\"";
+ assertThat(json).isEqualTo(expected);
+ }
+
+ @Test
+ void shouldSerDeserDeadLetterPolicy() throws Exception {
+ // @formatter:off
+ String content = ("{\n"
+ + " 'maxRedeliverCount' : 0,\n"
+ + " 'retryLetterTopic' : 'my-retry-topic',\n"
+ + " 'deadLetterTopic' : 'my-dlq',\n"
+ + " 'initialSubscriptionName' : 'my-dlq-sub'\n"
+ + "}").replaceAll("'", "\"");
+ // @formatter:on
+ DeadLetterPolicy policy = MAPPER.readValue(content,
DeadLetterPolicy.class);
+ String json =
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(policy);
+ assertThat(json).isEqualTo(content);
+ }
+
+ @Test
+ void shouldSerDeserEmptyDeadLetterPolicy() throws Exception {
+ DeadLetterPolicy policy = MAPPER.readValue("{}",
DeadLetterPolicy.class);
+ String json = MAPPER.writeValueAsString(policy);
+ assertThat(json).isEqualTo("{\"maxRedeliverCount\":0}");
+ }
+
+ @Test
+ void shouldSerDeserCryptoKeyReader() throws Exception {
+ // @formatter:off
+ String content = ("{"
+ + " 'className':
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader',"
+ + " 'args': {'dummy': 'my-dummy'}"
+ + "}").replaceAll("'", "\"");
+ // @formatter:on
+ CryptoKeyReader cryptoKeyReader = MAPPER.readValue(content,
CryptoKeyReader.class);
+ String json = MAPPER.writeValueAsString(cryptoKeyReader);
+ String expected =
("{'className':'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'}")
+ .replaceAll("'", "\"");
+ assertThat(json).isEqualTo(expected);
+ }
+
+ public static class TestScheduler implements Scheduler {
+
+ @Override
+ public Disposable schedule(Runnable task) {
+ return null;
+ }
+
+ @Override
+ public Worker createWorker() {
+ return null;
+ }
+
+ }
+
+ public static class TestKeySharedPolicy extends KeySharedPolicy {
+
+ @Override
+ public void validate() {
+ }
+
+ }
+
+ static class TestCryptoKeyReader implements CryptoKeyReader {
+
+ private final Map<String, Object> params;
+
+ // CHECKSTYLE:OFF
+ public TestCryptoKeyReader(Map<String, Object> params) {
+ this.params = params;
+ }
+ // CHECKSTYLE:ON
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName,
Map<String, String> metadata) {
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> metadata) {
+ return null;
+ }
+
+ }
+
+ static class TestMessageRouter implements MessageRouter {
+
+ private final Map<String, Object> params;
+
+ // CHECKSTYLE:OFF
+ public TestMessageRouter(Map<String, Object> params) {
+ this.params = params;
+ }
+ // CHECKSTYLE:ON
+
+ }
+
+}
diff --git a/settings.gradle b/settings.gradle
index 221fd1c..52a1478 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,4 +18,5 @@ rootProject.name = 'pulsar-client-reactive'
include 'pulsar-client-reactive-api'
include 'pulsar-client-reactive-adapter'
include 'pulsar-client-reactive-producer-cache-caffeine'
+include 'pulsar-client-reactive-jackson'