This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e5e503320d909bf1e20c5e2c320c297224a05c3a
Author: Anurag reddy K <[email protected]>
AuthorDate: Tue Dec 24 12:11:42 2024 +0530

    [fix][client] Make DeadLetterPolicy & KeySharedPolicy serializable (#23718)
    
    Co-authored-by: anurag.reddy <[email protected]>
    (cherry picked from commit 14129e352e4bb4e06cbf2c6c581edcdbaf9f6775)
---
 .../pulsar/broker/service/BatchMessageTest.java    |  5 ++-
 .../apache/pulsar/client/api/DeadLetterPolicy.java |  4 +-
 .../apache/pulsar/client/api/KeySharedPolicy.java  |  5 ++-
 .../java/org/apache/pulsar/client/api/Range.java   |  4 +-
 .../impl/conf/ConsumerConfigurationData.java       |  4 +-
 .../client/impl/conf/ReaderConfigurationData.java  |  2 +-
 .../impl/conf/ConsumerConfigurationDataTest.java   | 50 ++++++++++++++++++++++
 7 files changed, 66 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 2fd288239e3..e5f9e43b8bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -991,8 +991,8 @@ public class BatchMessageTest extends BrokerTestBase {
 
         int numMsgs = 1000;
         int batchMessages = 10;
-        final String topicName = 
"persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
-        final String subscriptionName = "sub-1";
+        final String topicName = 
"persistent://prop/ns-abc/testBatchMessageDispatchingAccordingToPermits-" + 
UUID.randomUUID();
+        final String subscriptionName = "bmdap-sub-1";
 
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared)
@@ -1017,6 +1017,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         producer.close();
         consumer1.close();
+        consumer2.close();
     }
 
     @Test(dataProvider="testSubTypeAndEnableBatch")
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
index f0b75ff57dd..c2a172666b0 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.io.Serializable;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -36,7 +37,8 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
 @AllArgsConstructor
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class DeadLetterPolicy {
+public class DeadLetterPolicy implements Serializable {
+    private static final long serialVersionUID = 1L;
 
     /**
      * Maximum number of times that a message will be redelivered before being 
sent to the dead letter queue.
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java
index f5bc5b846b6..ccaed04d75d 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -29,7 +30,7 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public abstract class KeySharedPolicy {
+public abstract class KeySharedPolicy implements Serializable {
 
     protected KeySharedMode keySharedMode;
 
@@ -82,6 +83,7 @@ public abstract class KeySharedPolicy {
      * for message, the cursor will rewind.
      */
     public static class KeySharedPolicySticky extends KeySharedPolicy {
+        private static final long serialVersionUID = 1L;
 
         protected final List<Range> ranges;
 
@@ -129,6 +131,7 @@ public abstract class KeySharedPolicy {
      * Auto split hash range key shared policy.
      */
     public static class KeySharedPolicyAutoSplit extends KeySharedPolicy {
+        private static final long serialVersionUID = 1L;
 
         KeySharedPolicyAutoSplit() {
             this.keySharedMode = KeySharedMode.AUTO_SPLIT;
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
index 3db225330d0..14d9eec0e1b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.io.Serializable;
 import java.util.Objects;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
@@ -27,7 +28,8 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Range implements Comparable<Range> {
+public class Range implements Comparable<Range>, Serializable {
+    private static final long serialVersionUID = 1L;
 
     private final int start;
     private final int end;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 6e884ba2791..cd82b54618f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -359,7 +359,7 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
                     + "When specifying the dead letter policy while not 
specifying `ackTimeoutMillis`, you can set the"
                     + " ack timeout to 30000 millisecond."
     )
-    private transient DeadLetterPolicy deadLetterPolicy;
+    private DeadLetterPolicy deadLetterPolicy;
 
     private boolean retryEnable = false;
 
@@ -388,7 +388,7 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
     private boolean resetIncludeHead = false;
 
     @JsonIgnore
-    private transient KeySharedPolicy keySharedPolicy;
+    private KeySharedPolicy keySharedPolicy;
 
     private boolean batchIndexAckEnabled = false;
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 73d97f1f336..cd5aa4c12f5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -144,7 +144,7 @@ public class ReaderConfigurationData<T> implements 
Serializable, Cloneable {
     )
     private boolean resetIncludeHead = false;
 
-    private transient List<Range> keyHashRanges;
+    private List<Range> keyHashRanges;
 
     private boolean poolMessages = false;
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
index f47f83bcbce..291583c3067 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
@@ -19,7 +19,18 @@
 package org.apache.pulsar.client.impl.conf;
 
 import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
 import java.util.regex.Pattern;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -45,4 +56,43 @@ public class ConsumerConfigurationDataTest {
 
         
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(expectedPriority);
     }
+
+    @Test
+    public void testSerializable() throws Exception {
+        ConsumerConfigurationData<String> consumerConfigurationData = new 
ConsumerConfigurationData<>();
+        consumerConfigurationData.setPriorityLevel(1);
+        consumerConfigurationData.setSubscriptionName("my-sub");
+        consumerConfigurationData.setSubscriptionType(SubscriptionType.Shared);
+        consumerConfigurationData.setReceiverQueueSize(100);
+        consumerConfigurationData.setAckTimeoutMillis(1000);
+        
consumerConfigurationData.setTopicNames(Collections.singleton("my-topic"));
+
+        DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
+                .maxRedeliverCount(10)
+                .retryLetterTopic("retry-topic")
+                .deadLetterTopic("dead-topic")
+                .build();
+        consumerConfigurationData.setDeadLetterPolicy(deadLetterPolicy);
+
+        @Cleanup
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        @Cleanup
+        ObjectOutputStream oos = new ObjectOutputStream(bos);
+        oos.writeObject(consumerConfigurationData);
+        byte[] serialized = bos.toByteArray();
+
+        // Deserialize
+        @Cleanup
+        ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+        @Cleanup
+        ObjectInputStream ois = new ObjectInputStream(bis);
+        Object object = ois.readObject();
+
+        Assert.assertEquals(object.getClass(), 
ConsumerConfigurationData.class);
+        Assert.assertEquals(object, consumerConfigurationData);
+
+        DeadLetterPolicy deserialisedDeadLetterPolicy = 
((ConsumerConfigurationData<?>) object).getDeadLetterPolicy();
+        Assert.assertNotNull(deserialisedDeadLetterPolicy);
+        Assert.assertEquals(deserialisedDeadLetterPolicy, deadLetterPolicy);
+    }
 }

Reply via email to