This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e5e503320d909bf1e20c5e2c320c297224a05c3a Author: Anurag reddy K <[email protected]> AuthorDate: Tue Dec 24 12:11:42 2024 +0530 [fix][client] Make DeadLetterPolicy & KeySharedPolicy serializable (#23718) Co-authored-by: anurag.reddy <[email protected]> (cherry picked from commit 14129e352e4bb4e06cbf2c6c581edcdbaf9f6775) --- .../pulsar/broker/service/BatchMessageTest.java | 5 ++- .../apache/pulsar/client/api/DeadLetterPolicy.java | 4 +- .../apache/pulsar/client/api/KeySharedPolicy.java | 5 ++- .../java/org/apache/pulsar/client/api/Range.java | 4 +- .../impl/conf/ConsumerConfigurationData.java | 4 +- .../client/impl/conf/ReaderConfigurationData.java | 2 +- .../impl/conf/ConsumerConfigurationDataTest.java | 50 ++++++++++++++++++++++ 7 files changed, 66 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 2fd288239e3..e5f9e43b8bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -991,8 +991,8 @@ public class BatchMessageTest extends BrokerTestBase { int numMsgs = 1000; int batchMessages = 10; - final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID(); - final String subscriptionName = "sub-1"; + final String topicName = "persistent://prop/ns-abc/testBatchMessageDispatchingAccordingToPermits-" + UUID.randomUUID(); + final String subscriptionName = "bmdap-sub-1"; ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared) @@ -1017,6 +1017,7 @@ public class BatchMessageTest extends BrokerTestBase { producer.close(); consumer1.close(); + consumer2.close(); } @Test(dataProvider="testSubTypeAndEnableBatch") diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java index f0b75ff57dd..c2a172666b0 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import java.io.Serializable; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -36,7 +37,8 @@ import org.apache.pulsar.common.classification.InterfaceStability; @AllArgsConstructor @InterfaceAudience.Public @InterfaceStability.Stable -public class DeadLetterPolicy { +public class DeadLetterPolicy implements Serializable { + private static final long serialVersionUID = 1L; /** * Maximum number of times that a message will be redelivered before being sent to the dead letter queue. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java index f5bc5b846b6..ccaed04d75d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -29,7 +30,7 @@ import org.apache.pulsar.common.classification.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Stable -public abstract class KeySharedPolicy { +public abstract class KeySharedPolicy implements Serializable { protected KeySharedMode keySharedMode; @@ -82,6 +83,7 @@ public abstract class KeySharedPolicy { * for message, the cursor will rewind. */ public static class KeySharedPolicySticky extends KeySharedPolicy { + private static final long serialVersionUID = 1L; protected final List<Range> ranges; @@ -129,6 +131,7 @@ public abstract class KeySharedPolicy { * Auto split hash range key shared policy. */ public static class KeySharedPolicyAutoSplit extends KeySharedPolicy { + private static final long serialVersionUID = 1L; KeySharedPolicyAutoSplit() { this.keySharedMode = KeySharedMode.AUTO_SPLIT; diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java index 3db225330d0..14d9eec0e1b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import java.io.Serializable; import java.util.Objects; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -27,7 +28,8 @@ import org.apache.pulsar.common.classification.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Stable -public class Range implements Comparable<Range> { +public class Range implements Comparable<Range>, Serializable { + private static final long serialVersionUID = 1L; private final int start; private final int end; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 6e884ba2791..cd82b54618f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -359,7 +359,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable { + "When specifying the dead letter policy while not specifying `ackTimeoutMillis`, you can set the" + " ack timeout to 30000 millisecond." ) - private transient DeadLetterPolicy deadLetterPolicy; + private DeadLetterPolicy deadLetterPolicy; private boolean retryEnable = false; @@ -388,7 +388,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable { private boolean resetIncludeHead = false; @JsonIgnore - private transient KeySharedPolicy keySharedPolicy; + private KeySharedPolicy keySharedPolicy; private boolean batchIndexAckEnabled = false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index 73d97f1f336..cd5aa4c12f5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -144,7 +144,7 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable { ) private boolean resetIncludeHead = false; - private transient List<Range> keyHashRanges; + private List<Range> keyHashRanges; private boolean poolMessages = false; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java index f47f83bcbce..291583c3067 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java @@ -19,7 +19,18 @@ package org.apache.pulsar.client.impl.conf; import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; import java.util.regex.Pattern; + +import lombok.Cleanup; +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.SubscriptionType; +import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -45,4 +56,43 @@ public class ConsumerConfigurationDataTest { assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(expectedPriority); } + + @Test + public void testSerializable() throws Exception { + ConsumerConfigurationData<String> consumerConfigurationData = new ConsumerConfigurationData<>(); + consumerConfigurationData.setPriorityLevel(1); + consumerConfigurationData.setSubscriptionName("my-sub"); + consumerConfigurationData.setSubscriptionType(SubscriptionType.Shared); + consumerConfigurationData.setReceiverQueueSize(100); + consumerConfigurationData.setAckTimeoutMillis(1000); + consumerConfigurationData.setTopicNames(Collections.singleton("my-topic")); + + DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .retryLetterTopic("retry-topic") + .deadLetterTopic("dead-topic") + .build(); + consumerConfigurationData.setDeadLetterPolicy(deadLetterPolicy); + + @Cleanup + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + @Cleanup + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(consumerConfigurationData); + byte[] serialized = bos.toByteArray(); + + // Deserialize + @Cleanup + ByteArrayInputStream bis = new ByteArrayInputStream(serialized); + @Cleanup + ObjectInputStream ois = new ObjectInputStream(bis); + Object object = ois.readObject(); + + Assert.assertEquals(object.getClass(), ConsumerConfigurationData.class); + Assert.assertEquals(object, consumerConfigurationData); + + DeadLetterPolicy deserialisedDeadLetterPolicy = ((ConsumerConfigurationData<?>) object).getDeadLetterPolicy(); + Assert.assertNotNull(deserialisedDeadLetterPolicy); + Assert.assertEquals(deserialisedDeadLetterPolicy, deadLetterPolicy); + } }
