This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new ae66435  Add module for Jackson (#9)
ae66435 is described below

commit ae664352afc6936ede65dee0f901713f788a6207
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Sep 26 11:18:42 2022 +0200

    Add module for Jackson (#9)
---
 checkstyle/checkstyle.xml                          |   1 +
 gradle/libs.versions.toml                          |   3 +
 .../api/ImmutableReactiveMessageConsumerSpec.java  |  47 ++
 .../api/ImmutableReactiveMessageReaderSpec.java    |  15 +
 .../api/ImmutableReactiveMessageSenderSpec.java    |  38 ++
 pulsar-client-reactive-jackson/build.gradle        |  17 +
 .../pulsar/reactive/client/jackson/ClassConf.java  |  31 +-
 .../reactive/client/jackson/ConverterUtils.java    | 131 +++++
 .../client/jackson/DeadLetterPolicyConf.java       |  62 +++
 .../ImmutableReactiveMessageConsumerSpecMixin.java |  76 +++
 .../ImmutableReactiveMessageReaderSpecMixin.java   |  44 ++
 .../ImmutableReactiveMessageSenderSpecMixin.java   |  68 +++
 .../MutableReactiveMessageConsumerSpecMixin.java   |  12 +-
 .../MutableReactiveMessageReaderSpecMixin.java     |  12 +-
 .../MutableReactiveMessageSenderSpecMixin.java     |  12 +-
 .../client/jackson/PulsarReactiveClientModule.java | 182 +++++++
 .../pulsar/reactive/client/jackson/RangeConf.java  |  29 +-
 .../jackson/PulsarReactiveClientModuleTest.java    | 554 +++++++++++++++++++++
 settings.gradle                                    |   1 +
 19 files changed, 1315 insertions(+), 20 deletions(-)

diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 1ad50d8..7534cfd 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -3,6 +3,7 @@
                "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
                "https://checkstyle.org/dtds/configuration_1_3.dtd";>
 <module name="com.puppycrawl.tools.checkstyle.Checker">
+       <module name="SuppressWithPlainTextCommentFilter"/>
        <module name="io.spring.javaformat.checkstyle.SpringChecks">
                <property name="excludes" 
value="com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocPackageCheck" />
        </module>
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index d4defec..dd0a6f8 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -8,6 +8,7 @@ assertj = "3.23.1"
 testcontainers = "1.17.3"
 jctools = "3.3.0"
 caffeine = "2.9.3"
+jackson = "2.13.4"
 checkstyle = '8.45.1'
 spring-javaformat = '0.0.34'
 licenser = "0.6.1"
@@ -27,6 +28,8 @@ slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = 
"slf4j" }
 testcontainers-pulsar = { module = "org.testcontainers:pulsar", version.ref = 
"testcontainers" }
 jctools-core = { module = "org.jctools:jctools-core", version.ref = "jctools" }
 caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = 
"caffeine" }
+jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", 
version.ref = "jackson" }
+jackson-datatype-jsr310 = { module = 
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = 
"jackson" }
 spring-javaformat-checkstyle = { module = 
"io.spring.javaformat:spring-javaformat-checkstyle", version.ref = 
"spring-javaformat" }
 spring-javaformat-gradle-plugin = { module = 
"io.spring.javaformat:spring-javaformat-gradle-plugin", version.ref = 
"spring-javaformat" }
 licenser = { module = "gradle.plugin.org.cadixdev.gradle:licenser", 
version.ref = "licenser" }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
index 96da0c3..4f4878e 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageConsumerSpec.java
@@ -168,6 +168,53 @@ public class ImmutableReactiveMessageConsumerSpec 
implements ReactiveMessageCons
                this.expireTimeOfIncompleteChunkedMessage = 
consumerSpec.getExpireTimeOfIncompleteChunkedMessage();
        }
 
+       public ImmutableReactiveMessageConsumerSpec(List<String> topicNames, 
Pattern topicsPattern,
+                       RegexSubscriptionMode topicsPatternSubscriptionMode, 
Duration topicsPatternAutoDiscoveryPeriod,
+                       String subscriptionName, SubscriptionMode 
subscriptionMode, SubscriptionType subscriptionType,
+                       KeySharedPolicy keySharedPolicy, Boolean 
replicateSubscriptionState,
+                       Map<String, String> subscriptionProperties, String 
consumerName, Map<String, String> properties,
+                       Integer priorityLevel, Boolean readCompacted, Boolean 
batchIndexAckEnabled, Duration ackTimeout,
+                       Duration ackTimeoutTickTime, Duration 
acknowledgementsGroupTime, Boolean acknowledgeAsynchronously,
+                       Scheduler acknowledgeScheduler, Duration 
negativeAckRedeliveryDelay, DeadLetterPolicy deadLetterPolicy,
+                       Boolean retryLetterTopicEnable, Integer 
receiverQueueSize,
+                       Integer maxTotalReceiverQueueSizeAcrossPartitions, 
Boolean autoUpdatePartitions,
+                       Duration autoUpdatePartitionsInterval, CryptoKeyReader 
cryptoKeyReader,
+                       ConsumerCryptoFailureAction cryptoFailureAction, 
Integer maxPendingChunkedMessage,
+                       Boolean autoAckOldestChunkedMessageOnQueueFull, 
Duration expireTimeOfIncompleteChunkedMessage) {
+               this.topicNames = topicNames;
+               this.topicsPattern = topicsPattern;
+               this.topicsPatternSubscriptionMode = 
topicsPatternSubscriptionMode;
+               this.topicsPatternAutoDiscoveryPeriod = 
topicsPatternAutoDiscoveryPeriod;
+               this.subscriptionName = subscriptionName;
+               this.subscriptionMode = subscriptionMode;
+               this.subscriptionType = subscriptionType;
+               this.keySharedPolicy = keySharedPolicy;
+               this.replicateSubscriptionState = replicateSubscriptionState;
+               this.subscriptionProperties = subscriptionProperties;
+               this.consumerName = consumerName;
+               this.properties = properties;
+               this.priorityLevel = priorityLevel;
+               this.readCompacted = readCompacted;
+               this.batchIndexAckEnabled = batchIndexAckEnabled;
+               this.ackTimeout = ackTimeout;
+               this.ackTimeoutTickTime = ackTimeoutTickTime;
+               this.acknowledgementsGroupTime = acknowledgementsGroupTime;
+               this.acknowledgeAsynchronously = acknowledgeAsynchronously;
+               this.acknowledgeScheduler = acknowledgeScheduler;
+               this.negativeAckRedeliveryDelay = negativeAckRedeliveryDelay;
+               this.deadLetterPolicy = deadLetterPolicy;
+               this.retryLetterTopicEnable = retryLetterTopicEnable;
+               this.receiverQueueSize = receiverQueueSize;
+               this.maxTotalReceiverQueueSizeAcrossPartitions = 
maxTotalReceiverQueueSizeAcrossPartitions;
+               this.autoUpdatePartitions = autoUpdatePartitions;
+               this.autoUpdatePartitionsInterval = 
autoUpdatePartitionsInterval;
+               this.cryptoKeyReader = cryptoKeyReader;
+               this.cryptoFailureAction = cryptoFailureAction;
+               this.maxPendingChunkedMessage = maxPendingChunkedMessage;
+               this.autoAckOldestChunkedMessageOnQueueFull = 
autoAckOldestChunkedMessageOnQueueFull;
+               this.expireTimeOfIncompleteChunkedMessage = 
expireTimeOfIncompleteChunkedMessage;
+       }
+
        public List<String> getTopicNames() {
                return this.topicNames;
        }
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageReaderSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageReaderSpec.java
index 6d75992..b89536a 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageReaderSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageReaderSpec.java
@@ -44,6 +44,21 @@ public class ImmutableReactiveMessageReaderSpec implements 
ReactiveMessageReader
 
        private final ConsumerCryptoFailureAction cryptoFailureAction;
 
+       public ImmutableReactiveMessageReaderSpec(List<String> topicNames, 
String readerName, String subscriptionName,
+                       String generatedSubscriptionNamePrefix, Integer 
receiverQueueSize, Boolean readCompacted,
+                       List<Range> keyHashRanges, CryptoKeyReader 
cryptoKeyReader,
+                       ConsumerCryptoFailureAction cryptoFailureAction) {
+               this.topicNames = topicNames;
+               this.readerName = readerName;
+               this.subscriptionName = subscriptionName;
+               this.generatedSubscriptionNamePrefix = 
generatedSubscriptionNamePrefix;
+               this.receiverQueueSize = receiverQueueSize;
+               this.readCompacted = readCompacted;
+               this.keyHashRanges = keyHashRanges;
+               this.cryptoKeyReader = cryptoKeyReader;
+               this.cryptoFailureAction = cryptoFailureAction;
+       }
+
        public ImmutableReactiveMessageReaderSpec(ReactiveMessageReaderSpec 
readerSpec) {
                this.topicNames = (readerSpec.getTopicNames() != null && 
!readerSpec.getTopicNames().isEmpty())
                                ? Collections.unmodifiableList(new 
ArrayList<>(readerSpec.getTopicNames())) : null;
diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageSenderSpec.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageSenderSpec.java
index d70b847..8b01a2d 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageSenderSpec.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ImmutableReactiveMessageSenderSpec.java
@@ -86,6 +86,44 @@ public class ImmutableReactiveMessageSenderSpec implements 
ReactiveMessageSender
 
        private final Map<String, String> properties;
 
+       public ImmutableReactiveMessageSenderSpec(String topicName, String 
producerName, Duration sendTimeout,
+                       Integer maxPendingMessages, Integer 
maxPendingMessagesAcrossPartitions,
+                       MessageRoutingMode messageRoutingMode, HashingScheme 
hashingScheme,
+                       ProducerCryptoFailureAction cryptoFailureAction, 
MessageRouter messageRouter,
+                       Duration batchingMaxPublishDelay, Integer 
roundRobinRouterBatchingPartitionSwitchFrequency,
+                       Integer batchingMaxMessages, Integer batchingMaxBytes, 
Boolean batchingEnabled,
+                       BatcherBuilder batcherBuilder, Boolean chunkingEnabled, 
CryptoKeyReader cryptoKeyReader,
+                       Set<String> encryptionKeys, CompressionType 
compressionType, Long initialSequenceId,
+                       Boolean autoUpdatePartitions, Duration 
autoUpdatePartitionsInterval, Boolean multiSchema,
+                       ProducerAccessMode accessMode, Boolean 
lazyStartPartitionedProducers, Map<String, String> properties) {
+               this.topicName = topicName;
+               this.producerName = producerName;
+               this.sendTimeout = sendTimeout;
+               this.maxPendingMessages = maxPendingMessages;
+               this.maxPendingMessagesAcrossPartitions = 
maxPendingMessagesAcrossPartitions;
+               this.messageRoutingMode = messageRoutingMode;
+               this.hashingScheme = hashingScheme;
+               this.cryptoFailureAction = cryptoFailureAction;
+               this.messageRouter = messageRouter;
+               this.batchingMaxPublishDelay = batchingMaxPublishDelay;
+               this.roundRobinRouterBatchingPartitionSwitchFrequency = 
roundRobinRouterBatchingPartitionSwitchFrequency;
+               this.batchingMaxMessages = batchingMaxMessages;
+               this.batchingMaxBytes = batchingMaxBytes;
+               this.batchingEnabled = batchingEnabled;
+               this.batcherBuilder = batcherBuilder;
+               this.chunkingEnabled = chunkingEnabled;
+               this.cryptoKeyReader = cryptoKeyReader;
+               this.encryptionKeys = encryptionKeys;
+               this.compressionType = compressionType;
+               this.initialSequenceId = initialSequenceId;
+               this.autoUpdatePartitions = autoUpdatePartitions;
+               this.autoUpdatePartitionsInterval = 
autoUpdatePartitionsInterval;
+               this.multiSchema = multiSchema;
+               this.accessMode = accessMode;
+               this.lazyStartPartitionedProducers = 
lazyStartPartitionedProducers;
+               this.properties = properties;
+       }
+
        public ImmutableReactiveMessageSenderSpec(ReactiveMessageSenderSpec 
senderSpec) {
                this.topicName = senderSpec.getTopicName();
                this.producerName = senderSpec.getProducerName();
diff --git a/pulsar-client-reactive-jackson/build.gradle 
b/pulsar-client-reactive-jackson/build.gradle
new file mode 100644
index 0000000..538af59
--- /dev/null
+++ b/pulsar-client-reactive-jackson/build.gradle
@@ -0,0 +1,17 @@
+plugins {
+       id 'pulsar-client-reactive.codestyle-conventions'
+       id 'pulsar-client-reactive.library-conventions'
+       id 'pulsar-client-reactive.integration-test-conventions'
+}
+
+dependencies {
+       api project(':pulsar-client-reactive-api')
+       api libs.jackson.databind
+       api libs.jackson.datatype.jsr310
+
+       testImplementation libs.pulsar.client.shaded
+       testImplementation libs.junit.jupiter
+       testImplementation libs.assertj.core
+}
+
+description = "Jackson module for the reactive client configuration"
diff --git a/settings.gradle 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ClassConf.java
similarity index 58%
copy from settings.gradle
copy to 
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ClassConf.java
index 221fd1c..a18c178 100644
--- a/settings.gradle
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ClassConf.java
@@ -14,8 +14,31 @@
  * limitations under the License.
  */
 
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
 
+import java.util.Map;
+
+@SuppressWarnings("unused")
+class ClassConf {
+
+       private String className;
+
+       private Map<String, Object> args;
+
+       String getClassName() {
+               return this.className;
+       }
+
+       void setClassName(String className) {
+               this.className = className;
+       }
+
+       Map<String, Object> getArgs() {
+               return this.args;
+       }
+
+       void setArgs(Map<String, Object> args) {
+               this.args = args;
+       }
+
+}
diff --git 
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
new file mode 100644
index 0000000..864d8a3
--- /dev/null
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ConverterUtils.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.Range;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+abstract class ConverterUtils {
+
+       static DeadLetterPolicy toDeadLetterPolicy(DeadLetterPolicyConf 
deadLetterPolicy) {
+               if (deadLetterPolicy == null) {
+                       return null;
+               }
+               return 
DeadLetterPolicy.builder().maxRedeliverCount(deadLetterPolicy.getMaxRedeliverCount())
+                               
.deadLetterTopic(deadLetterPolicy.getDeadLetterTopic())
+                               
.retryLetterTopic(deadLetterPolicy.getRetryLetterTopic())
+                               
.initialSubscriptionName(deadLetterPolicy.getInitialSubscriptionName()).build();
+       }
+
+       static <T> T toClass(ClassConf conf) {
+               if (conf == null) {
+                       return null;
+               }
+               Class<?> klass;
+               try {
+                       klass = loadClass(conf.getClassName(), 
Thread.currentThread().getContextClassLoader());
+               }
+               catch (ClassNotFoundException ex) {
+                       throw new RuntimeException(String.format("Failed to 
load class %s", conf.getClassName()));
+               }
+
+               try {
+                       if (conf.getArgs() != null) {
+                               Constructor<?> ctor = 
klass.getConstructor(Map.class);
+                               return (T) ctor.newInstance(conf.getArgs());
+                       }
+                       else {
+                               Constructor<?> ctor = klass.getConstructor();
+                               return (T) ctor.newInstance();
+                       }
+               }
+               catch (NoSuchMethodException ex) {
+                       throw new RuntimeException(
+                                       String.format("Class %s does not have a 
no-arg constructor or a constructor that accepts map",
+                                                       klass.getName()),
+                                       ex);
+               }
+               catch (IllegalAccessException | InstantiationException | 
InvocationTargetException ex) {
+                       throw new RuntimeException(String.format("Failed to 
create instance for %s", klass.getName()), ex);
+               }
+       }
+
+       static KeySharedPolicy toKeySharedPolicy(String keySharedPolicy) {
+               if (keySharedPolicy == null) {
+                       return null;
+               }
+               switch (keySharedPolicy) {
+                       case "AUTO_SPLIT":
+                               return KeySharedPolicy.autoSplitHashRange();
+                       case "STICKY":
+                               return KeySharedPolicy.stickyHashRange();
+                       default:
+                               throw new IllegalArgumentException("Unsupported 
keySharedPolicy: " + keySharedPolicy);
+               }
+       }
+
+       static Scheduler toScheduler(String scheduler) {
+               if (scheduler == null) {
+                       return null;
+               }
+               switch (scheduler) {
+                       case "parallel":
+                               return Schedulers.parallel();
+                       case "single":
+                               return Schedulers.single();
+                       case "boundedElastic":
+                               return Schedulers.boundedElastic();
+                       case "elastic":
+                               return Schedulers.elastic();
+                       case "immediate":
+                               return Schedulers.immediate();
+                       default:
+                               throw new IllegalArgumentException("Unsupported 
scheduler: " + scheduler);
+               }
+       }
+
+       static Range toRange(RangeConf rangeConf) {
+               if (rangeConf == null) {
+                       return null;
+               }
+               return new Range(rangeConf.getStart(), rangeConf.getEnd());
+       }
+
+       static Class<?> loadClass(String className, ClassLoader classLoader) 
throws ClassNotFoundException {
+               Class<?> objectClass;
+               try {
+                       objectClass = Class.forName(className);
+               }
+               catch (ClassNotFoundException | NoClassDefFoundError ex) {
+                       if (classLoader != null) {
+                               objectClass = classLoader.loadClass(className);
+                       }
+                       else {
+                               throw ex;
+                       }
+               }
+               return objectClass;
+       }
+
+}
diff --git 
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/DeadLetterPolicyConf.java
 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/DeadLetterPolicyConf.java
new file mode 100644
index 0000000..1260b0b
--- /dev/null
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/DeadLetterPolicyConf.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+@SuppressWarnings("unused")
+class DeadLetterPolicyConf {
+
+       private int maxRedeliverCount;
+
+       private String retryLetterTopic;
+
+       private String deadLetterTopic;
+
+       private String initialSubscriptionName;
+
+       int getMaxRedeliverCount() {
+               return this.maxRedeliverCount;
+       }
+
+       void setMaxRedeliverCount(int maxRedeliverCount) {
+               this.maxRedeliverCount = maxRedeliverCount;
+       }
+
+       String getRetryLetterTopic() {
+               return this.retryLetterTopic;
+       }
+
+       void setRetryLetterTopic(String retryLetterTopic) {
+               this.retryLetterTopic = retryLetterTopic;
+       }
+
+       String getDeadLetterTopic() {
+               return this.deadLetterTopic;
+       }
+
+       void setDeadLetterTopic(String deadLetterTopic) {
+               this.deadLetterTopic = deadLetterTopic;
+       }
+
+       String getInitialSubscriptionName() {
+               return this.initialSubscriptionName;
+       }
+
+       void setInitialSubscriptionName(String initialSubscriptionName) {
+               this.initialSubscriptionName = initialSubscriptionName;
+       }
+
+}
diff --git 
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
new file mode 100644
index 0000000..878cca7
--- /dev/null
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageConsumerSpecMixin.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
+import reactor.core.scheduler.Scheduler;
+
+@JsonDeserialize(as = ImmutableReactiveMessageConsumerSpec.class)
+abstract class ImmutableReactiveMessageConsumerSpecMixin {
+
+       @SuppressWarnings("unused")
+       @JsonCreator
+       ImmutableReactiveMessageConsumerSpecMixin(@JsonProperty("topicNames") 
List<String> topicNames,
+                       @JsonProperty("topicsPattern") Pattern topicsPattern,
+                       @JsonProperty("topicsPatternSubscriptionMode") 
RegexSubscriptionMode topicsPatternSubscriptionMode,
+                       @JsonProperty("topicsPatternAutoDiscoveryPeriod") 
Duration topicsPatternAutoDiscoveryPeriod,
+                       @JsonProperty("subscriptionName") String 
subscriptionName,
+                       @JsonProperty("subscriptionMode") SubscriptionMode 
subscriptionMode,
+                       @JsonProperty("subscriptionType") SubscriptionType 
subscriptionType,
+                       @JsonProperty("keySharedPolicy") KeySharedPolicy 
keySharedPolicy,
+                       @JsonProperty("replicateSubscriptionState") Boolean 
replicateSubscriptionState,
+                       @JsonProperty("subscriptionProperties") Map<String, 
String> subscriptionProperties,
+                       @JsonProperty("consumerName") String consumerName,
+                       @JsonProperty("properties") Map<String, String> 
properties,
+                       @JsonProperty("priorityLevel") Integer priorityLevel, 
@JsonProperty("readCompacted") Boolean readCompacted,
+                       @JsonProperty("batchIndexAckEnabled") Boolean 
batchIndexAckEnabled,
+                       @JsonProperty("ackTimeout") Duration ackTimeout,
+                       @JsonProperty("ackTimeoutTickTime") Duration 
ackTimeoutTickTime,
+                       @JsonProperty("acknowledgementsGroupTime") Duration 
acknowledgementsGroupTime,
+                       @JsonProperty("acknowledgeAsynchronously") Boolean 
acknowledgeAsynchronously,
+                       @JsonProperty("acknowledgeScheduler") Scheduler 
acknowledgeScheduler,
+                       @JsonProperty("negativeAckRedeliveryDelay") Duration 
negativeAckRedeliveryDelay,
+                       @JsonProperty("deadLetterPolicy") DeadLetterPolicy 
deadLetterPolicy,
+                       @JsonProperty("retryLetterTopicEnable") Boolean 
retryLetterTopicEnable,
+                       @JsonProperty("receiverQueueSize") Integer 
receiverQueueSize,
+                       
@JsonProperty("maxTotalReceiverQueueSizeAcrossPartitions") Integer 
maxTotalReceiverQueueSizeAcrossPartitions,
+                       @JsonProperty("autoUpdatePartitions") Boolean 
autoUpdatePartitions,
+                       @JsonProperty("autoUpdatePartitionsInterval") Duration 
autoUpdatePartitionsInterval,
+                       @JsonProperty("cryptoKeyReader") CryptoKeyReader 
cryptoKeyReader,
+                       @JsonProperty("cryptoFailureAction") 
ConsumerCryptoFailureAction cryptoFailureAction,
+                       @JsonProperty("maxPendingChunkedMessage") Integer 
maxPendingChunkedMessage,
+                       @JsonProperty("autoAckOldestChunkedMessageOnQueueFull") 
Boolean autoAckOldestChunkedMessageOnQueueFull,
+                       @JsonProperty("expireTimeOfIncompleteChunkedMessage") 
Duration expireTimeOfIncompleteChunkedMessage) {
+
+       }
+
+}
diff --git 
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageReaderSpecMixin.java
 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageReaderSpecMixin.java
new file mode 100644
index 0000000..62a8f56
--- /dev/null
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageReaderSpecMixin.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Range;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec;
+
+@JsonDeserialize(as = ImmutableReactiveMessageReaderSpec.class)
+abstract class ImmutableReactiveMessageReaderSpecMixin {
+
+       @SuppressWarnings("unused")
+       @JsonCreator
+       ImmutableReactiveMessageReaderSpecMixin(@JsonProperty("topicNames") 
List<String> topicNames,
+                       @JsonProperty("readerName") String readerName, 
@JsonProperty("subscriptionName") String subscriptionName,
+                       @JsonProperty("generatedSubscriptionNamePrefix") String 
generatedSubscriptionNamePrefix,
+                       @JsonProperty("receiverQueueSize") Integer 
receiverQueueSize,
+                       @JsonProperty("readCompacted") Boolean readCompacted,
+                       @JsonProperty("keyHashRanges") List<Range> 
keyHashRanges,
+                       @JsonProperty("cryptoKeyReader") CryptoKeyReader 
cryptoKeyReader,
+                       @JsonProperty("cryptoFailureAction") 
ConsumerCryptoFailureAction cryptoFailureAction) {
+       }
+
+}
diff --git 
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageSenderSpecMixin.java
 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageSenderSpecMixin.java
new file mode 100644
index 0000000..63543ad
--- /dev/null
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/ImmutableReactiveMessageSenderSpecMixin.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
+
+@JsonDeserialize(as = ImmutableReactiveMessageSenderSpec.class)
+abstract class ImmutableReactiveMessageSenderSpecMixin {
+
+       @SuppressWarnings("unused")
+       @JsonCreator
+       ImmutableReactiveMessageSenderSpecMixin(@JsonProperty("topicName") 
String topicName,
+                       @JsonProperty("producerName") String producerName, 
@JsonProperty("sendTimeout") Duration sendTimeout,
+                       @JsonProperty("maxPendingMessages") Integer 
maxPendingMessages,
+                       @JsonProperty("maxPendingMessagesAcrossPartitions") 
Integer maxPendingMessagesAcrossPartitions,
+                       @JsonProperty("messageRoutingMode") MessageRoutingMode 
messageRoutingMode,
+                       @JsonProperty("hashingScheme") HashingScheme 
hashingScheme,
+                       @JsonProperty("cryptoFailureAction") 
ProducerCryptoFailureAction cryptoFailureAction,
+                       @JsonProperty("messageRouter") MessageRouter 
messageRouter,
+                       @JsonProperty("batchingMaxPublishDelay") Duration 
batchingMaxPublishDelay,
+                       
@JsonProperty("roundRobinRouterBatchingPartitionSwitchFrequency") Integer 
roundRobinRouterBatchingPartitionSwitchFrequency,
+                       @JsonProperty("batchingMaxMessages") Integer 
batchingMaxMessages,
+                       @JsonProperty("batchingMaxBytes") Integer 
batchingMaxBytes,
+                       @JsonProperty("batchingEnabled") Boolean 
batchingEnabled,
+                       @JsonProperty("batcherBuilder") BatcherBuilder 
batcherBuilder,
+                       @JsonProperty("chunkingEnabled") Boolean 
chunkingEnabled,
+                       @JsonProperty("cryptoKeyReader") CryptoKeyReader 
cryptoKeyReader,
+                       @JsonProperty("encryptionKeys") Set<String> 
encryptionKeys,
+                       @JsonProperty("compressionType") CompressionType 
compressionType,
+                       @JsonProperty("initialSequenceId") Long 
initialSequenceId,
+                       @JsonProperty("autoUpdatePartitions") Boolean 
autoUpdatePartitions,
+                       @JsonProperty("autoUpdatePartitionsInterval") Duration 
autoUpdatePartitionsInterval,
+                       @JsonProperty("multiSchema") Boolean multiSchema, 
@JsonProperty("accessMode") ProducerAccessMode accessMode,
+                       @JsonProperty("lazyStartPartitionedProducers") Boolean 
lazyStartPartitionedProducers,
+                       @JsonProperty("properties") Map<String, String> 
properties) {
+
+       }
+
+}
diff --git a/settings.gradle 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageConsumerSpecMixin.java
similarity index 66%
copy from settings.gradle
copy to 
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageConsumerSpecMixin.java
index 221fd1c..157e65a 100644
--- a/settings.gradle
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageConsumerSpecMixin.java
@@ -14,8 +14,12 @@
  * limitations under the License.
  */
 
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
 
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
+
+@JsonDeserialize(as = MutableReactiveMessageConsumerSpec.class)
+class MutableReactiveMessageConsumerSpecMixin {
+
+}
diff --git a/settings.gradle 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageReaderSpecMixin.java
similarity index 66%
copy from settings.gradle
copy to 
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageReaderSpecMixin.java
index 221fd1c..ba2180c 100644
--- a/settings.gradle
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageReaderSpecMixin.java
@@ -14,8 +14,12 @@
  * limitations under the License.
  */
 
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
 
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageReaderSpec;
+
+@JsonDeserialize(as = MutableReactiveMessageReaderSpec.class)
+class MutableReactiveMessageReaderSpecMixin {
+
+}
diff --git a/settings.gradle 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageSenderSpecMixin.java
similarity index 66%
copy from settings.gradle
copy to 
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageSenderSpecMixin.java
index 221fd1c..1244298 100644
--- a/settings.gradle
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/MutableReactiveMessageSenderSpecMixin.java
@@ -14,8 +14,12 @@
  * limitations under the License.
  */
 
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
 
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
+
+@JsonDeserialize(as = MutableReactiveMessageSenderSpec.class)
+class MutableReactiveMessageSenderSpecMixin {
+
+}
diff --git 
a/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModule.java
 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModule.java
new file mode 100644
index 0000000..4021027
--- /dev/null
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModule.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer;
+import com.fasterxml.jackson.datatype.jsr310.ser.DurationSerializer;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.Range;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
+import 
org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageReaderSpec;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
+import reactor.core.scheduler.Scheduler;
+
+public class PulsarReactiveClientModule extends SimpleModule {
+
+       public PulsarReactiveClientModule() {
+               super();
+               addDeserializer(KeySharedPolicy.class, new 
KeySharedPolicyDeserializer());
+               addSerializer(KeySharedPolicy.class, new 
KeySharedPolicySerializer());
+               addDeserializer(Scheduler.class, new SchedulerDeserializer());
+               addSerializer(Scheduler.class, new SchedulerSerializer());
+               addDeserializer(DeadLetterPolicy.class, new 
DeadLetterPolicyDeserializer());
+               addDeserializer(CryptoKeyReader.class, new 
ClassDeserializer<>());
+               addSerializer(CryptoKeyReader.class, new ClassSerializer<>());
+               addDeserializer(Range.class, new RangeDeserializer());
+               addDeserializer(MessageRouter.class, new ClassDeserializer<>());
+               addSerializer(MessageRouter.class, new ClassSerializer<>());
+               addDeserializer(BatcherBuilder.class, new 
ClassDeserializer<>());
+               addDeserializer(Duration.class, DurationDeserializer.INSTANCE);
+               addSerializer(Duration.class, DurationSerializer.INSTANCE);
+               setMixInAnnotation(ReactiveMessageConsumerSpec.class, 
ImmutableReactiveMessageConsumerSpecMixin.class);
+               setMixInAnnotation(ImmutableReactiveMessageConsumerSpec.class, 
ImmutableReactiveMessageConsumerSpecMixin.class);
+               setMixInAnnotation(MutableReactiveMessageConsumerSpec.class, 
MutableReactiveMessageConsumerSpecMixin.class);
+               setMixInAnnotation(ReactiveMessageReaderSpec.class, 
ImmutableReactiveMessageReaderSpecMixin.class);
+               setMixInAnnotation(ImmutableReactiveMessageReaderSpec.class, 
ImmutableReactiveMessageReaderSpecMixin.class);
+               setMixInAnnotation(MutableReactiveMessageReaderSpec.class, 
MutableReactiveMessageReaderSpecMixin.class);
+               setMixInAnnotation(ReactiveMessageSenderSpec.class, 
ImmutableReactiveMessageSenderSpecMixin.class);
+               setMixInAnnotation(ImmutableReactiveMessageSenderSpec.class, 
ImmutableReactiveMessageSenderSpecMixin.class);
+               setMixInAnnotation(MutableReactiveMessageSenderSpec.class, 
MutableReactiveMessageSenderSpecMixin.class);
+       }
+
+       public static class KeySharedPolicyDeserializer extends 
JsonDeserializer<KeySharedPolicy> {
+
+               @Override
+               public KeySharedPolicy deserialize(JsonParser p, 
DeserializationContext context) throws IOException {
+                       return 
ConverterUtils.toKeySharedPolicy(p.getValueAsString());
+               }
+
+       }
+
+       public static class KeySharedPolicySerializer extends 
JsonSerializer<KeySharedPolicy> {
+
+               @Override
+               public void serialize(KeySharedPolicy keySharedPolicy, 
JsonGenerator gen, SerializerProvider serializers)
+                               throws IOException {
+                       if (keySharedPolicy instanceof 
KeySharedPolicy.KeySharedPolicySticky) {
+                               gen.writeString("STICKY");
+                       }
+                       else if (keySharedPolicy instanceof 
KeySharedPolicy.KeySharedPolicyAutoSplit) {
+                               gen.writeString("AUTO_SPLIT");
+                       }
+                       else {
+                               
gen.writeString(keySharedPolicy.getClass().getName());
+                       }
+               }
+
+       }
+
+       public static class SchedulerDeserializer extends 
JsonDeserializer<Scheduler> {
+
+               @Override
+               public Scheduler deserialize(JsonParser p, 
DeserializationContext context) throws IOException {
+                       return ConverterUtils.toScheduler(p.getValueAsString());
+               }
+
+       }
+
+       public static class SchedulerSerializer extends 
JsonSerializer<Scheduler> {
+
+               @Override
+               public void serialize(Scheduler scheduler, JsonGenerator gen, 
SerializerProvider serializers)
+                               throws IOException {
+                       switch (scheduler.getClass().getName()) {
+                               case 
"reactor.core.scheduler.BoundedElasticScheduler":
+                                       gen.writeString("boundedElastic");
+                                       break;
+                               case "reactor.core.scheduler.ParallelScheduler":
+                                       gen.writeString("parallel");
+                                       break;
+                               case "reactor.core.scheduler.SingleScheduler":
+                                       gen.writeString("single");
+                                       break;
+                               case 
"reactor.core.scheduler.ImmediateScheduler":
+                                       gen.writeString("immediate");
+                                       break;
+                               case 
"reactor.core.scheduler.Schedulers$CachedScheduler":
+                                       
gen.writeString(scheduler.toString().substring("Schedulers.".length(),
+                                                       
scheduler.toString().length() - "()".length()));
+                                       break;
+                               default:
+                                       
gen.writeString(scheduler.getClass().getName());
+                       }
+               }
+
+       }
+
+       public static class DeadLetterPolicyDeserializer extends 
JsonDeserializer<DeadLetterPolicy> {
+
+               @Override
+               public DeadLetterPolicy deserialize(JsonParser p, 
DeserializationContext context) throws IOException {
+                       DeadLetterPolicyConf deadLetterPolicyConf = 
p.readValueAs(DeadLetterPolicyConf.class);
+                       return 
ConverterUtils.toDeadLetterPolicy(deadLetterPolicyConf);
+               }
+
+       }
+
+       public static class RangeDeserializer extends JsonDeserializer<Range> {
+
+               @Override
+               public Range deserialize(JsonParser p, DeserializationContext 
context) throws IOException {
+                       RangeConf rangeConf = p.readValueAs(RangeConf.class);
+                       return ConverterUtils.toRange(rangeConf);
+               }
+
+       }
+
+       public static class ClassSerializer<T> extends JsonSerializer<T> {
+
+               @Override
+               public void serialize(T value, JsonGenerator gen, 
SerializerProvider serializers) throws IOException {
+                       gen.writeStartObject();
+                       gen.writeStringField("className", 
value.getClass().getName());
+                       gen.writeEndObject();
+               }
+
+       }
+
+       public static class ClassDeserializer<T> extends JsonDeserializer<T> {
+
+               @Override
+               public T deserialize(JsonParser p, DeserializationContext 
context) throws IOException {
+                       ClassConf classConf = p.readValueAs(ClassConf.class);
+                       return ConverterUtils.toClass(classConf);
+               }
+
+       }
+
+}
diff --git a/settings.gradle 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/RangeConf.java
similarity index 65%
copy from settings.gradle
copy to 
pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/RangeConf.java
index 221fd1c..db32526 100644
--- a/settings.gradle
+++ 
b/pulsar-client-reactive-jackson/src/main/java/org/apache/pulsar/reactive/client/jackson/RangeConf.java
@@ -14,8 +14,29 @@
  * limitations under the License.
  */
 
-rootProject.name = 'pulsar-client-reactive'
-include 'pulsar-client-reactive-api'
-include 'pulsar-client-reactive-adapter'
-include 'pulsar-client-reactive-producer-cache-caffeine'
+package org.apache.pulsar.reactive.client.jackson;
 
+@SuppressWarnings("unused")
+class RangeConf {
+
+       private int start;
+
+       private int end;
+
+       int getStart() {
+               return this.start;
+       }
+
+       void setStart(int start) {
+               this.start = start;
+       }
+
+       int getEnd() {
+               return this.end;
+       }
+
+       void setEnd(int end) {
+               this.end = end;
+       }
+
+}
diff --git 
a/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
 
b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
new file mode 100644
index 0000000..d36bea8
--- /dev/null
+++ 
b/pulsar-client-reactive-jackson/src/test/java/org/apache/pulsar/reactive/client/jackson/PulsarReactiveClientModuleTest.java
@@ -0,0 +1,554 @@
+/*
+ * Copyright 2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.reactive.client.jackson;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.KeyBasedBatcherBuilder;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec;
+import 
org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
+import 
org.apache.pulsar.reactive.client.api.MutableReactiveMessageConsumerSpec;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageReaderSpec;
+import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
+import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import reactor.core.Disposable;
+import reactor.core.scheduler.Scheduler;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class PulsarReactiveClientModuleTest {
+
+       private static final ObjectMapper MAPPER = new 
ObjectMapper().registerModule(new PulsarReactiveClientModule())
+                       
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+       @ParameterizedTest
+       @ValueSource(classes = { ReactiveMessageConsumerSpec.class, 
ImmutableReactiveMessageConsumerSpec.class,
+                       MutableReactiveMessageConsumerSpec.class })
+       void shouldSerDeserReactiveMessageConsumerSpec(Class<? extends 
ReactiveMessageConsumerSpec> klass)
+                       throws Exception {
+               // @formatter:off
+               String content = ("{"
+                               + "'topicNames': ['my-topic'],"
+                               + "'topicsPattern': 'my-topic-*',"
+                               + "'topicsPatternSubscriptionMode': 
'PersistentOnly',"
+                               + "'topicsPatternAutoDiscoveryPeriod': 30,"
+                               + "'subscriptionName': 'my-sub',"
+                               + "'subscriptionMode': 'Durable',"
+                               + "'subscriptionType': 'Exclusive',"
+                               + "'keySharedPolicy': 'STICKY',"
+                               + "'replicateSubscriptionState': true,"
+                               + "'subscriptionProperties': {'my-key': 
'my-value'},"
+                               + "'consumerName': 'my-consumer',"
+                               + "'properties': {'my-key': 'my-value'},"
+                               + "'priorityLevel': 42,"
+                               + "'readCompacted': true,"
+                               + "'batchIndexAckEnabled': true,"
+                               + "'ackTimeout': 30,"
+                               + "'ackTimeoutTickTime': 30,"
+                               + "'acknowledgementsGroupTime': 30,"
+                               + "'acknowledgeAsynchronously': true,"
+                               + "'acknowledgeScheduler': 'boundedElastic',"
+                               + "'negativeAckRedeliveryDelay': 30,"
+                               + "'deadLetterPolicy': {"
+                               + "    'maxRedeliverCount': 1,"
+                               + "    'retryLetterTopic': 'my-retry-topic',"
+                               + "    'deadLetterTopic': 'my-dlq',"
+                               + "    'initialSubscriptionName': 'my-dlq-sub'"
+                               + "},"
+                               + "'retryLetterTopicEnable': true,"
+                               + "'receiverQueueSize': 42,"
+                               + "'maxTotalReceiverQueueSizeAcrossPartitions': 
42,"
+                               + "'autoUpdatePartitions': true,"
+                               + "'autoUpdatePartitionsInterval': 30,"
+                               + "'cryptoKeyReader': {"
+                               + "    'className': 
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader',"
+                               + "    'args': {'dummy': 'my-dummy'}"
+                               + "},"
+                               + "'cryptoFailureAction': 'FAIL',"
+                               + "'maxPendingChunkedMessage': 42,"
+                               + "'autoAckOldestChunkedMessageOnQueueFull': 
true,"
+                               + "'expireTimeOfIncompleteChunkedMessage': 30"
+                               + "}").replaceAll("'", "\"");
+               // @formatter:on
+
+               ReactiveMessageConsumerSpec spec = MAPPER.readValue(content, 
klass);
+
+               assertThat(spec.getTopicNames()).containsExactly("my-topic");
+               
assertThat(spec.getTopicsPattern().pattern()).isEqualTo("my-topic-*");
+               
assertThat(spec.getTopicsPatternSubscriptionMode()).isEqualTo(RegexSubscriptionMode.PersistentOnly);
+               
assertThat(spec.getTopicsPatternAutoDiscoveryPeriod()).hasMillis(30_000);
+               assertThat(spec.getSubscriptionName()).isEqualTo("my-sub");
+               
assertThat(spec.getSubscriptionMode()).isEqualTo(SubscriptionMode.Durable);
+               
assertThat(spec.getSubscriptionType()).isEqualTo(SubscriptionType.Exclusive);
+               
assertThat(spec.getKeySharedPolicy()).isInstanceOf(KeySharedPolicy.KeySharedPolicySticky.class);
+               assertThat(spec.getReplicateSubscriptionState()).isTrue();
+               
assertThat(spec.getSubscriptionProperties()).containsOnlyKeys("my-key");
+               
assertThat(spec.getSubscriptionProperties()).containsEntry("my-key", 
"my-value");
+               assertThat(spec.getConsumerName()).isEqualTo("my-consumer");
+               assertThat(spec.getProperties()).containsOnlyKeys("my-key");
+               assertThat(spec.getProperties()).containsEntry("my-key", 
"my-value");
+               assertThat(spec.getPriorityLevel()).isEqualTo(42);
+               assertThat(spec.getReadCompacted()).isTrue();
+               assertThat(spec.getBatchIndexAckEnabled()).isTrue();
+               assertThat(spec.getAckTimeout()).hasMillis(30_000);
+               assertThat(spec.getAckTimeoutTickTime()).hasMillis(30_000);
+               
assertThat(spec.getAcknowledgementsGroupTime()).hasMillis(30_000);
+               assertThat(spec.getAcknowledgeAsynchronously()).isTrue();
+               
assertThat(spec.getAcknowledgeScheduler().toString()).isEqualTo("Schedulers.boundedElastic()");
+               
assertThat(spec.getNegativeAckRedeliveryDelay()).hasMillis(30_000);
+               
assertThat(spec.getDeadLetterPolicy().getMaxRedeliverCount()).isEqualTo(1);
+               
assertThat(spec.getDeadLetterPolicy().getDeadLetterTopic()).isEqualTo("my-dlq");
+               
assertThat(spec.getDeadLetterPolicy().getRetryLetterTopic()).isEqualTo("my-retry-topic");
+               
assertThat(spec.getDeadLetterPolicy().getInitialSubscriptionName()).isEqualTo("my-dlq-sub");
+               assertThat(spec.getRetryLetterTopicEnable()).isTrue();
+               assertThat(spec.getReceiverQueueSize()).isEqualTo(42);
+               
assertThat(spec.getMaxTotalReceiverQueueSizeAcrossPartitions()).isEqualTo(42);
+               assertThat(spec.getAutoUpdatePartitions()).isTrue();
+               
assertThat(spec.getAutoUpdatePartitionsInterval()).hasMillis(30_000);
+               
assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class);
+               Map<String, Object> params = ((TestCryptoKeyReader) 
spec.getCryptoKeyReader()).params;
+               assertThat(params).containsOnlyKeys("dummy");
+               assertThat(params).containsEntry("dummy", "my-dummy");
+               
assertThat(spec.getCryptoFailureAction()).isEqualTo(ConsumerCryptoFailureAction.FAIL);
+               assertThat(spec.getMaxPendingChunkedMessage()).isEqualTo(42);
+               
assertThat(spec.getAutoAckOldestChunkedMessageOnQueueFull()).isTrue();
+               
assertThat(spec.getExpireTimeOfIncompleteChunkedMessage()).hasMillis(30_000);
+
+               String json = 
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec);
+
+               // @formatter:off
+               String expected = ("{\n"
+                               + "  'topicNames' : [ 'my-topic' ],\n"
+                               + "  'topicsPattern' : 'my-topic-*',\n"
+                               + "  'topicsPatternSubscriptionMode' : 
'PersistentOnly',\n"
+                               + "  'topicsPatternAutoDiscoveryPeriod' : 
30.000000000,\n"
+                               + "  'subscriptionName' : 'my-sub',\n"
+                               + "  'subscriptionMode' : 'Durable',\n"
+                               + "  'subscriptionType' : 'Exclusive',\n"
+                               + "  'keySharedPolicy' : 'STICKY',\n"
+                               + "  'replicateSubscriptionState' : true,\n"
+                               + "  'subscriptionProperties' : {\n"
+                               + "    'my-key' : 'my-value'\n"
+                               + "  },\n"
+                               + "  'consumerName' : 'my-consumer',\n"
+                               + "  'properties' : {\n"
+                               + "    'my-key' : 'my-value'\n"
+                               + "  },\n"
+                               + "  'priorityLevel' : 42,\n"
+                               + "  'readCompacted' : true,\n"
+                               + "  'batchIndexAckEnabled' : true,\n"
+                               + "  'ackTimeout' : 30.000000000,\n"
+                               + "  'ackTimeoutTickTime' : 30.000000000,\n"
+                               + "  'acknowledgementsGroupTime' : 
30.000000000,\n"
+                               + "  'acknowledgeAsynchronously' : true,\n"
+                               + "  'acknowledgeScheduler' : 
'boundedElastic',\n"
+                               + "  'negativeAckRedeliveryDelay' : 
30.000000000,\n"
+                               + "  'deadLetterPolicy' : {\n"
+                               + "    'maxRedeliverCount' : 1,\n"
+                               + "    'retryLetterTopic' : 'my-retry-topic',\n"
+                               + "    'deadLetterTopic' : 'my-dlq',\n"
+                               + "    'initialSubscriptionName' : 
'my-dlq-sub'\n"
+                               + "  },\n"
+                               + "  'retryLetterTopicEnable' : true,\n"
+                               + "  'receiverQueueSize' : 42,\n"
+                               + "  
'maxTotalReceiverQueueSizeAcrossPartitions' : 42,\n"
+                               + "  'autoUpdatePartitions' : true,\n"
+                               + "  'autoUpdatePartitionsInterval' : 
30.000000000,\n"
+                               + "  'cryptoKeyReader' : {\n"
+                               + "    'className' : 
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n"
+                               + "  },\n"
+                               + "  'cryptoFailureAction' : 'FAIL',\n"
+                               + "  'maxPendingChunkedMessage' : 42,\n"
+                               + "  'autoAckOldestChunkedMessageOnQueueFull' : 
true,\n"
+                               + "  'expireTimeOfIncompleteChunkedMessage' : 
30.000000000\n"
+                               + "}").replaceAll("'", "\"");
+               // @formatter:on
+
+               assertThat(json).isEqualTo(expected);
+       }
+
+       @ParameterizedTest
+       @ValueSource(classes = {
+                       // MutableReactiveMessageConsumerSpec.class,
+                       ReactiveMessageConsumerSpec.class, 
ImmutableReactiveMessageConsumerSpec.class })
+       void shouldSerDeserEmptyReactiveMessageConsumerSpec(Class<? extends 
ReactiveMessageConsumerSpec> klass)
+                       throws Exception {
+               String content = "{}";
+               ReactiveMessageConsumerSpec spec = MAPPER.readValue(content, 
klass);
+               String json = MAPPER.writeValueAsString(spec);
+               assertThat(json).isEqualTo(content);
+       }
+
+       @ParameterizedTest
+       @ValueSource(classes = { ReactiveMessageReaderSpec.class, 
ImmutableReactiveMessageReaderSpec.class,
+                       MutableReactiveMessageReaderSpec.class })
+       void shouldSerDeserReactiveMessageReaderSpec(Class<? extends 
ReactiveMessageReaderSpec> klass) throws Exception {
+               // @formatter:off
+               String content = ("{"
+                               + "'topicNames': ['my-topic'],"
+                               + "'readerName': 'my-reader',"
+                               + "'subscriptionName': 'my-sub',"
+                               + "'generatedSubscriptionNamePrefix': 
'my-prefix-',"
+                               + "'receiverQueueSize': 42,"
+                               + "'readCompacted': true,"
+                               + "'keyHashRanges': [{"
+                               + "    'start': 42,"
+                               + "    'end': 43"
+                               + "}],"
+                               + "'cryptoKeyReader': {"
+                               + "    'className': 
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader',"
+                               + "    'args': {'dummy': 'my-dummy'}"
+                               + "},"
+                               + "'cryptoFailureAction': 'FAIL'"
+                               + "}").replaceAll("'", "\"");
+               // @formatter:on
+
+               ReactiveMessageReaderSpec spec = MAPPER.readValue(content, 
klass);
+
+               assertThat(spec.getTopicNames()).containsExactly("my-topic");
+               assertThat(spec.getReaderName()).isEqualTo("my-reader");
+               assertThat(spec.getSubscriptionName()).isEqualTo("my-sub");
+               
assertThat(spec.getGeneratedSubscriptionNamePrefix()).isEqualTo("my-prefix-");
+               assertThat(spec.getReceiverQueueSize()).isEqualTo(42);
+               assertThat(spec.getReadCompacted()).isTrue();
+               
assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class);
+               assertThat(spec.getKeyHashRanges()).containsExactly(new 
Range(42, 43));
+               
assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class);
+               Map<String, Object> params = ((TestCryptoKeyReader) 
spec.getCryptoKeyReader()).params;
+               assertThat(params).containsOnlyKeys("dummy");
+               assertThat(params).containsEntry("dummy", "my-dummy");
+               
assertThat(spec.getCryptoFailureAction()).isEqualTo(ConsumerCryptoFailureAction.FAIL);
+
+               String json = 
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec);
+
+               // @formatter:off
+               String expected = ("{\n"
+                               + "  'topicNames' : [ 'my-topic' ],\n"
+                               + "  'readerName' : 'my-reader',\n"
+                               + "  'subscriptionName' : 'my-sub',\n"
+                               + "  'generatedSubscriptionNamePrefix' : 
'my-prefix-',\n"
+                               + "  'receiverQueueSize' : 42,\n"
+                               + "  'readCompacted' : true,\n"
+                               + "  'keyHashRanges' : [ {\n"
+                               + "    'start' : 42,\n"
+                               + "    'end' : 43\n"
+                               + "  } ],\n"
+                               + "  'cryptoKeyReader' : {\n"
+                               + "    'className' : 
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n"
+                               + "  },\n"
+                               + "  'cryptoFailureAction' : 'FAIL'\n"
+                               + "}").replaceAll("'", "\"");
+               // @formatter:on
+
+               assertThat(json).isEqualTo(expected);
+       }
+
+       @ParameterizedTest
+       @ValueSource(classes = { MutableReactiveMessageSenderSpec.class, 
ReactiveMessageSenderSpec.class,
+                       ImmutableReactiveMessageSenderSpec.class })
+       void shouldSerDeserEmptyReactiveMessageSenderSpec(Class<? extends 
ReactiveMessageSenderSpec> klass)
+                       throws Exception {
+               String content = "{}";
+               ReactiveMessageSenderSpec spec = MAPPER.readValue(content, 
klass);
+               String json = MAPPER.writeValueAsString(spec);
+               assertThat(json).isEqualTo(content);
+       }
+
+       @ParameterizedTest
+       @ValueSource(classes = {
+                       // MutableReactiveMessageReaderSpec.class,
+                       ReactiveMessageReaderSpec.class, 
ImmutableReactiveMessageReaderSpec.class })
+       void shouldSerDeserEmptyReactiveMessageReaderSpec(Class<? extends 
ReactiveMessageReaderSpec> klass)
+                       throws Exception {
+               String content = "{}";
+               ReactiveMessageReaderSpec spec = MAPPER.readValue(content, 
klass);
+               String json = MAPPER.writeValueAsString(spec);
+               assertThat(json).isEqualTo(content);
+       }
+
+       @ParameterizedTest
+       @ValueSource(classes = { ReactiveMessageSenderSpec.class, 
ImmutableReactiveMessageSenderSpec.class,
+                       MutableReactiveMessageSenderSpec.class })
+       void shouldSerDeserReactiveMessageSenderSpec(Class<? extends 
ReactiveMessageSenderSpec> klass) throws Exception {
+               // @formatter:off
+               String content = ("{"
+                               + "'topicName': 'my-topic',"
+                               + "'producerName': 'my-producer',"
+                               + "'sendTimeout': 30,"
+                               + "'maxPendingMessages': 42,"
+                               + "'maxPendingMessagesAcrossPartitions': 42,"
+                               + "'messageRoutingMode': 'SinglePartition',"
+                               + "'hashingScheme': 'JavaStringHash',"
+                               + "'cryptoFailureAction': 'FAIL',"
+                               + "'messageRouter': {"
+                               + "    'className': 
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestMessageRouter',"
+                               + "    'args': {'dummy': 'my-dummy'}"
+                               + "},"
+                               + "'batchingMaxPublishDelay': 30,"
+                               + 
"'roundRobinRouterBatchingPartitionSwitchFrequency': 42,"
+                               + "'batchingMaxMessages': 42,"
+                               + "'batchingMaxBytes': 42,"
+                               + "'batchingEnabled': true,"
+                               + "'batcherBuilder': {"
+                               + "    'className': 
'org.apache.pulsar.client.impl.KeyBasedBatcherBuilder'"
+                               + "},"
+                               + "'chunkingEnabled': true,"
+                               + "'cryptoKeyReader': {"
+                               + "    'className': 
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader',"
+                               + "    'args': {'dummy': 'my-dummy'}"
+                               + "},"
+                               + "'encryptionKeys': ['my-encryption-key'],"
+                               + "'compressionType': 'LZ4',"
+                               + "'initialSequenceId': 42,"
+                               + "'autoUpdatePartitions': true,"
+                               + "'autoUpdatePartitionsInterval': 30,"
+                               + "'multiSchema': true,"
+                               + "'accessMode': 'Shared',"
+                               + "'lazyStartPartitionedProducers': true,"
+                               + "'properties' : {"
+                               + "  'my-key' : 'my-value'"
+                               + "}"
+                               + "}").replaceAll("'", "\"");
+               // @formatter:on
+
+               ReactiveMessageSenderSpec spec = MAPPER.readValue(content, 
klass);
+
+               assertThat(spec.getTopicName()).isEqualTo("my-topic");
+               assertThat(spec.getProducerName()).isEqualTo("my-producer");
+               assertThat(spec.getSendTimeout()).hasMillis(30_000);
+               assertThat(spec.getMaxPendingMessages()).isEqualTo(42);
+               
assertThat(spec.getMaxPendingMessagesAcrossPartitions()).isEqualTo(42);
+               
assertThat(spec.getMessageRoutingMode()).isEqualTo(MessageRoutingMode.SinglePartition);
+               
assertThat(spec.getHashingScheme()).isEqualTo(HashingScheme.JavaStringHash);
+               
assertThat(spec.getCryptoFailureAction()).isEqualTo(ProducerCryptoFailureAction.FAIL);
+               
assertThat(spec.getMessageRouter()).isInstanceOf(TestMessageRouter.class);
+               Map<String, Object> params = ((TestMessageRouter) 
spec.getMessageRouter()).params;
+               assertThat(params).containsOnlyKeys("dummy");
+               assertThat(params).containsEntry("dummy", "my-dummy");
+               assertThat(spec.getBatchingMaxPublishDelay()).hasMillis(30_000);
+               
assertThat(spec.getRoundRobinRouterBatchingPartitionSwitchFrequency()).isEqualTo(42);
+               assertThat(spec.getBatchingMaxMessages()).isEqualTo(42);
+               assertThat(spec.getBatchingMaxBytes()).isEqualTo(42);
+               assertThat(spec.getBatchingEnabled()).isTrue();
+               
assertThat(spec.getBatcherBuilder()).isInstanceOf(KeyBasedBatcherBuilder.class);
+               assertThat(spec.getChunkingEnabled()).isTrue();
+               
assertThat(spec.getCryptoKeyReader()).isInstanceOf(TestCryptoKeyReader.class);
+               params = ((TestCryptoKeyReader) 
spec.getCryptoKeyReader()).params;
+               assertThat(params).containsOnlyKeys("dummy");
+               assertThat(params).containsEntry("dummy", "my-dummy");
+               
assertThat(spec.getEncryptionKeys()).containsExactly("my-encryption-key");
+               
assertThat(spec.getCompressionType()).isEqualTo(CompressionType.LZ4);
+               assertThat(spec.getInitialSequenceId()).isEqualTo(42);
+               assertThat(spec.getAutoUpdatePartitions()).isTrue();
+               
assertThat(spec.getAutoUpdatePartitionsInterval()).hasMillis(30_000);
+               assertThat(spec.getMultiSchema()).isTrue();
+               
assertThat(spec.getAccessMode()).isEqualTo(ProducerAccessMode.Shared);
+               assertThat(spec.getLazyStartPartitionedProducers()).isTrue();
+               assertThat(spec.getProperties()).containsEntry("my-key", 
"my-value");
+
+               String json = 
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(spec);
+
+               // @formatter:off
+               String expected = ("{\n"
+                               + "  'topicName' : 'my-topic',\n"
+                               + "  'producerName' : 'my-producer',\n"
+                               + "  'sendTimeout' : 30.000000000,\n"
+                               + "  'maxPendingMessages' : 42,\n"
+                               + "  'maxPendingMessagesAcrossPartitions' : 
42,\n"
+                               + "  'messageRoutingMode' : 
'SinglePartition',\n"
+                               + "  'hashingScheme' : 'JavaStringHash',\n"
+                               + "  'cryptoFailureAction' : 'FAIL',\n"
+                               + "  'messageRouter' : {\n"
+                               + "    'className' : 
'org.apache.pulsar.reactive.client.jackson"
+                               + 
".PulsarReactiveClientModuleTest$TestMessageRouter'\n"
+                               + "  },\n"
+                               + "  'batchingMaxPublishDelay' : 
30.000000000,\n"
+                               + "  
'roundRobinRouterBatchingPartitionSwitchFrequency' : 42,\n"
+                               + "  'batchingMaxMessages' : 42,\n"
+                               + "  'batchingMaxBytes' : 42,\n"
+                               + "  'batchingEnabled' : true,\n"
+                               + "  'batcherBuilder' : { },\n"
+                               + "  'chunkingEnabled' : true,\n"
+                               + "  'cryptoKeyReader' : {\n"
+                               + "    'className' : 
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'\n"
+                               + "  },\n"
+                               + "  'encryptionKeys' : [ 'my-encryption-key' 
],\n"
+                               + "  'compressionType' : 'LZ4',\n"
+                               + "  'initialSequenceId' : 42,\n"
+                               + "  'autoUpdatePartitions' : true,\n"
+                               + "  'autoUpdatePartitionsInterval' : 
30.000000000,\n"
+                               + "  'multiSchema' : true,\n"
+                               + "  'accessMode' : 'Shared',\n"
+                               + "  'lazyStartPartitionedProducers' : true,\n"
+                               + "  'properties' : {\n"
+                               + "    'my-key' : 'my-value'\n"
+                               + "  }\n"
+                               + "}").replaceAll("'", "\"");
+               // @formatter:on
+
+               assertThat(json).isEqualTo(expected);
+       }
+
+       @ParameterizedTest
+       @ValueSource(strings = { "AUTO_SPLIT", "STICKY" })
+       void shouldSerDeserKeySharedPolicy(String keySharedPolicy) throws 
Exception {
+               String content = (String.format("\"%s\"", keySharedPolicy));
+               KeySharedPolicy policy = MAPPER.readValue(content, 
KeySharedPolicy.class);
+               String json = MAPPER.writeValueAsString(policy);
+               assertThat(json).isEqualTo(content);
+       }
+
+       @Test
+       void shouldSerializeCustomKeySharedPolicy() throws Exception {
+               String json = MAPPER.writeValueAsString(new 
TestKeySharedPolicy());
+               String expected = 
"\"org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestKeySharedPolicy\"";
+               assertThat(json).isEqualTo(expected);
+       }
+
+       @ParameterizedTest
+       @ValueSource(strings = { "parallel", "elastic", "boundedElastic", 
"immediate", "single" })
+       void shouldSerDeserScheduler(String scheduler) throws Exception {
+               String content = (String.format("\"%s\"", scheduler));
+               Scheduler policy = MAPPER.readValue(content, Scheduler.class);
+               String json = MAPPER.writeValueAsString(policy);
+               assertThat(json).isEqualTo(content);
+       }
+
+       @Test
+       void shouldSerializeCustomScheduler() throws Exception {
+               String json = MAPPER.writeValueAsString(new TestScheduler());
+               String expected = 
"\"org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestScheduler\"";
+               assertThat(json).isEqualTo(expected);
+       }
+
+       @Test
+       void shouldSerDeserDeadLetterPolicy() throws Exception {
+               // @formatter:off
+               String content = ("{\n"
+                               + "  'maxRedeliverCount' : 0,\n"
+                               + "  'retryLetterTopic' : 'my-retry-topic',\n"
+                               + "  'deadLetterTopic' : 'my-dlq',\n"
+                               + "  'initialSubscriptionName' : 'my-dlq-sub'\n"
+                               + "}").replaceAll("'", "\"");
+               // @formatter:on
+               DeadLetterPolicy policy = MAPPER.readValue(content, 
DeadLetterPolicy.class);
+               String json = 
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(policy);
+               assertThat(json).isEqualTo(content);
+       }
+
+       @Test
+       void shouldSerDeserEmptyDeadLetterPolicy() throws Exception {
+               DeadLetterPolicy policy = MAPPER.readValue("{}", 
DeadLetterPolicy.class);
+               String json = MAPPER.writeValueAsString(policy);
+               assertThat(json).isEqualTo("{\"maxRedeliverCount\":0}");
+       }
+
+       @Test
+       void shouldSerDeserCryptoKeyReader() throws Exception {
+               // @formatter:off
+               String content = ("{"
+                               + "    'className': 
'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader',"
+                               + "    'args': {'dummy': 'my-dummy'}"
+                               + "}").replaceAll("'", "\"");
+               // @formatter:on
+               CryptoKeyReader cryptoKeyReader = MAPPER.readValue(content, 
CryptoKeyReader.class);
+               String json = MAPPER.writeValueAsString(cryptoKeyReader);
+               String expected = 
("{'className':'org.apache.pulsar.reactive.client.jackson.PulsarReactiveClientModuleTest$TestCryptoKeyReader'}")
+                               .replaceAll("'", "\"");
+               assertThat(json).isEqualTo(expected);
+       }
+
+       public static class TestScheduler implements Scheduler {
+
+               @Override
+               public Disposable schedule(Runnable task) {
+                       return null;
+               }
+
+               @Override
+               public Worker createWorker() {
+                       return null;
+               }
+
+       }
+
+       public static class TestKeySharedPolicy extends KeySharedPolicy {
+
+               @Override
+               public void validate() {
+               }
+
+       }
+
+       static class TestCryptoKeyReader implements CryptoKeyReader {
+
+               private final Map<String, Object> params;
+
+               // CHECKSTYLE:OFF
+               public TestCryptoKeyReader(Map<String, Object> params) {
+                       this.params = params;
+               }
+               // CHECKSTYLE:ON
+
+               @Override
+               public EncryptionKeyInfo getPublicKey(String keyName, 
Map<String, String> metadata) {
+                       return null;
+               }
+
+               @Override
+               public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> metadata) {
+                       return null;
+               }
+
+       }
+
+       static class TestMessageRouter implements MessageRouter {
+
+               private final Map<String, Object> params;
+
+               // CHECKSTYLE:OFF
+               public TestMessageRouter(Map<String, Object> params) {
+                       this.params = params;
+               }
+               // CHECKSTYLE:ON
+
+       }
+
+}
diff --git a/settings.gradle b/settings.gradle
index 221fd1c..52a1478 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,4 +18,5 @@ rootProject.name = 'pulsar-client-reactive'
 include 'pulsar-client-reactive-api'
 include 'pulsar-client-reactive-adapter'
 include 'pulsar-client-reactive-producer-cache-caffeine'
+include 'pulsar-client-reactive-jackson'
 

Reply via email to