This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 03ec01d14d4 [fix][client] Move MessageIdAdv to the pulsar-common 
module (#20139)
03ec01d14d4 is described below

commit 03ec01d14d418233ce2fba2da7f429665df90de5
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Thu Apr 20 17:49:27 2023 +0800

    [fix][client] Move MessageIdAdv to the pulsar-common module (#20139)
    
    (cherry picked from commit 99a68e40c9dcc0771ddfd73321cc967be6433801)
---
 .../apache/pulsar/client/api/TopicMessageId.java   | 82 +---------------------
 .../PulsarClientImplementationBinding.java         |  3 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  2 +-
 .../apache/pulsar/client/impl/MessageIdImpl.java   |  4 +-
 .../PulsarClientImplementationBindingImpl.java     | 18 ++++-
 .../pulsar/client/impl/TopicMessageIdImpl.java     | 77 ++++++++++++++++++--
 .../pulsar/client/impl/TopicMessageImpl.java       |  3 +-
 .../pulsar/client/impl/MessageIdCompareToTest.java |  5 --
 .../pulsar/client/impl/TopicMessageIdImplTest.java | 12 ++--
 .../org/apache/pulsar/client/api/MessageIdAdv.java |  0
 .../org/apache/pulsar/client/api/package-info.java | 22 ++++++
 .../io/kafka/connect/KafkaConnectSinkTest.java     |  4 +-
 .../apache/pulsar/websocket/ConsumerHandler.java   |  6 +-
 13 files changed, 133 insertions(+), 105 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
index b70267bb0fb..4d02a7f4096 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
-import java.util.BitSet;
+import org.apache.pulsar.client.internal.DefaultImplementation;
 
 /**
  * The MessageId used for a consumer that subscribes multiple topics or 
partitioned topics.
@@ -45,84 +45,6 @@ public interface TopicMessageId extends MessageId {
         if (messageId instanceof TopicMessageId) {
             return (TopicMessageId) messageId;
         }
-        return new Impl(topic, messageId);
-    }
-
-    /**
-     * The simplest implementation of a TopicMessageId interface.
-     */
-    class Impl implements MessageIdAdv, TopicMessageId {
-        private final String topic;
-        private final MessageIdAdv messageId;
-
-        public Impl(String topic, MessageId messageId) {
-            this.topic = topic;
-            this.messageId = (MessageIdAdv) messageId;
-        }
-
-        @Override
-        public byte[] toByteArray() {
-            return messageId.toByteArray();
-        }
-
-        @Override
-        public String getOwnerTopic() {
-            return topic;
-        }
-
-        @Override
-        public long getLedgerId() {
-            return messageId.getLedgerId();
-        }
-
-        @Override
-        public long getEntryId() {
-            return messageId.getEntryId();
-        }
-
-        @Override
-        public int getPartitionIndex() {
-            return messageId.getPartitionIndex();
-        }
-
-        @Override
-        public int getBatchIndex() {
-            return messageId.getBatchIndex();
-        }
-
-        @Override
-        public int getBatchSize() {
-            return messageId.getBatchSize();
-        }
-
-        @Override
-        public BitSet getAckSet() {
-            return messageId.getAckSet();
-        }
-
-        @Override
-        public MessageIdAdv getFirstChunkMessageId() {
-            return messageId.getFirstChunkMessageId();
-        }
-
-        @Override
-        public int compareTo(MessageId o) {
-            return messageId.compareTo(o);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            return messageId.equals(obj);
-        }
-
-        @Override
-        public int hashCode() {
-            return messageId.hashCode();
-        }
-
-        @Override
-        public String toString() {
-            return messageId.toString();
-        }
+        return 
DefaultImplementation.getDefaultImplementation().newTopicMessageId(topic, 
messageId);
     }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index 875a7930235..8fd05bff265 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessagePayloadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
@@ -252,4 +253,6 @@ public interface PulsarClientImplementationBinding {
 
     SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, 
long timestamp,
                                  Map<String, String> propertiesValue);
+
+    TopicMessageId newTopicMessageId(String topic, MessageId messageId);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cc016093196..199e8a9ae71 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2345,7 +2345,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     @Override
     public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
         return getLastMessageIdAsync()
-                .thenApply(msgId -> 
Collections.singletonList(TopicMessageId.create(topic, msgId)));
+                .thenApply(msgId -> Collections.singletonList(new 
TopicMessageIdImpl(topic, (MessageIdAdv) msgId)));
     }
 
     public CompletableFuture<GetLastMessageIdResponse> 
internalGetLastMessageIdAsync() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 83ee7625783..8cffba44dc5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -128,7 +128,7 @@ public class MessageIdImpl implements MessageIdAdv {
             throw new IOException(e);
         }
 
-        MessageId messageId;
+        MessageIdAdv messageId;
         if (idData.hasBatchIndex()) {
             if (idData.hasBatchSize()) {
                 messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
@@ -143,7 +143,7 @@ public class MessageIdImpl implements MessageIdAdv {
         }
         if (idData.getPartition() > -1 && topicName != null) {
             messageId = new TopicMessageIdImpl(
-                    topicName.getPartition(idData.getPartition()).toString(), 
topicName.toString(), messageId);
+                    topicName.getPartition(idData.getPartition()).toString(), 
messageId);
         }
 
         return messageId;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 1b069c5172d..346eb20ef4c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
@@ -35,9 +34,11 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessagePayloadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
@@ -387,4 +388,19 @@ public final class PulsarClientImplementationBindingImpl 
implements PulsarClient
                                         Map<String, String> propertiesValue) {
         return new SchemaInfoImpl(name, schema, type, timestamp, 
propertiesValue);
     }
+
+    @Override
+    public TopicMessageId newTopicMessageId(String topic, MessageId messageId) 
{
+        final MessageIdAdv messageIdAdv;
+        if (messageId instanceof MessageIdAdv) {
+            messageIdAdv = (MessageIdAdv) messageId;
+        } else {
+            try {
+                messageIdAdv = (MessageIdAdv) 
MessageId.fromByteArray(messageId.toByteArray());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return new TopicMessageIdImpl(topic, messageIdAdv);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 189dc1c6083..00fe12b62b1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -18,15 +18,27 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.BitSet;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.TopicMessageId;
 
-public class TopicMessageIdImpl extends TopicMessageId.Impl {
+public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId {
 
-    private final String topicName;
+    private final String ownerTopic;
+    private final MessageIdAdv msgId;
+    private final String topicName; // it's never used
 
+    public TopicMessageIdImpl(String topic, MessageIdAdv msgId) {
+        this.ownerTopic = topic;
+        this.msgId = msgId;
+        this.topicName = "";
+    }
+
+    @Deprecated
     public TopicMessageIdImpl(String topicPartitionName, String topicName, 
MessageId messageId) {
-        super(topicPartitionName, messageId);
+        this.msgId = (MessageIdAdv) messageId;
+        this.ownerTopic = topicPartitionName;
         this.topicName = topicName;
     }
 
@@ -55,11 +67,66 @@ public class TopicMessageIdImpl extends TopicMessageId.Impl 
{
 
     @Override
     public boolean equals(Object obj) {
-        return super.equals(obj);
+        return msgId.equals(obj);
     }
 
     @Override
     public int hashCode() {
-        return super.hashCode();
+        return msgId.hashCode();
+    }
+
+    @Override
+    public int compareTo(MessageId o) {
+        return msgId.compareTo(o);
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        return msgId.toByteArray();
+    }
+
+    @Override
+    public String getOwnerTopic() {
+        return ownerTopic;
+    }
+
+    @Override
+    public long getLedgerId() {
+        return msgId.getLedgerId();
+    }
+
+    @Override
+    public long getEntryId() {
+        return msgId.getEntryId();
+    }
+
+    @Override
+    public int getPartitionIndex() {
+        return msgId.getPartitionIndex();
+    }
+
+    @Override
+    public int getBatchIndex() {
+        return msgId.getBatchIndex();
+    }
+
+    @Override
+    public int getBatchSize() {
+        return msgId.getBatchSize();
+    }
+
+    @Override
+    public BitSet getAckSet() {
+        return msgId.getAckSet();
+    }
+
+    @Override
+    public MessageIdAdv getFirstChunkMessageId() {
+        return msgId.getFirstChunkMessageId();
+    }
+
+    @Override
+    public String toString() {
+        return msgId.toString();
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index d24ecbd6aa9..1b6cba2f723 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.EncryptionContext;
 
@@ -42,7 +43,7 @@ public class TopicMessageImpl<T> implements Message<T> {
         this.receivedByconsumer = receivedByConsumer;
 
         this.msg = msg;
-        this.messageId = new TopicMessageIdImpl(topicPartitionName, 
topicPartitionName, msg.getMessageId());
+        this.messageId = new TopicMessageIdImpl(topicPartitionName, 
(MessageIdAdv) msg.getMessageId());
     }
 
     /**
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
index 4f0eca6ea4a..fd81e9d5790 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java
@@ -148,15 +148,12 @@ public class MessageIdCompareToTest  {
         MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
         TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
             "test-topic-partition-0",
-            "test-topic",
             new BatchMessageIdImpl(123L, 345L, 566, 789));
         TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
             "test-topic-partition-0",
-            "test-topic",
             new BatchMessageIdImpl(123L, 345L, 567, 789));
         TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
             "test-topic-partition-0",
-            "test-topic",
             new BatchMessageIdImpl(messageIdImpl));
         assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to 
be greater than");
         assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to 
be less than");
@@ -173,11 +170,9 @@ public class MessageIdCompareToTest  {
         BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 
567, -1);
         TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
             "test-topic-partition-0",
-            "test-topic",
             new MessageIdImpl(123L, 345L, 566));
         TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
             "test-topic-partition-0",
-            "test-topic",
             new MessageIdImpl(123L, 345L, 567));
         assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to 
be greater than");
         assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to 
be greater than");
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
index d2e2ce9c15c..daf49f0e775 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java
@@ -28,9 +28,9 @@ public class TopicMessageIdImplTest {
     public void hashCodeTest() {
         MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
         MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
-        TopicMessageIdImpl topicMsgId1 = new 
TopicMessageIdImpl("topic-partition-1", "topic", msgId1);
-        TopicMessageIdImpl topic2MsgId1 = new 
TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1);
-        TopicMessageIdImpl topicMsgId2 = new 
TopicMessageIdImpl("topic-partition-2", "topic", msgId2);
+        TopicMessageIdImpl topicMsgId1 = new 
TopicMessageIdImpl("topic-partition-1", msgId1);
+        TopicMessageIdImpl topic2MsgId1 = new 
TopicMessageIdImpl("topic2-partition-1", msgId1);
+        TopicMessageIdImpl topicMsgId2 = new 
TopicMessageIdImpl("topic-partition-2", msgId2);
 
         assertEquals(topicMsgId1.hashCode(), topicMsgId1.hashCode());
         assertEquals(topic2MsgId1.hashCode(), topic2MsgId1.hashCode());
@@ -43,9 +43,9 @@ public class TopicMessageIdImplTest {
     public void equalsTest() {
         MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
         MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
-        TopicMessageIdImpl topicMsgId1 = new 
TopicMessageIdImpl("topic-partition-1", "topic", msgId1);
-        TopicMessageIdImpl topic2MsgId1 = new 
TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1);
-        TopicMessageIdImpl topicMsgId2 = new 
TopicMessageIdImpl("topic-partition-2", "topic", msgId2);
+        TopicMessageIdImpl topicMsgId1 = new 
TopicMessageIdImpl("topic-partition-1", msgId1);
+        TopicMessageIdImpl topic2MsgId1 = new 
TopicMessageIdImpl("topic2-partition-1", msgId1);
+        TopicMessageIdImpl topicMsgId2 = new 
TopicMessageIdImpl("topic-partition-2", msgId2);
 
         assertEquals(topicMsgId1, topicMsgId1);
         assertEquals(topicMsgId1, topic2MsgId1);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
 b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
similarity index 100%
rename from 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java 
b/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java
new file mode 100644
index 00000000000..3f6d1d56e10
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Additional helper classes to the pulsar-client-api module.
+ */
+package org.apache.pulsar.client.api;
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 5410e0bb8d6..1100b13b425 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -1572,7 +1572,7 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         assertNull(ref);
 
         ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
-                        new TopicMessageIdImpl("topic-0", "topic", new 
MessageIdImpl(ledgerId, entryId, 0))
+                        new TopicMessageIdImpl("topic-0", new 
MessageIdImpl(ledgerId, entryId, 0))
         );
         assertNull(ref);
 
@@ -1584,7 +1584,7 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
         assertEquals(ref.getBatchIdx(), batchIdx);
 
         ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
-                new TopicMessageIdImpl("topic-0", "topic", new 
BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx))
+                new TopicMessageIdImpl("topic-0", new 
BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx))
         );
 
         assertEquals(ref.getLedgerId(), ledgerId);
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 579b4233399..c988fd1e70c 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
@@ -45,6 +46,7 @@ import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.websocket.data.ConsumerCommand;
@@ -293,8 +295,8 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
 
     private void handleAck(ConsumerCommand command) throws IOException {
         // We should have received an ack
-        TopicMessageId msgId = TopicMessageId.create(topic.toString(),
-                
MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
+        TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(),
+                (MessageIdAdv) 
MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
         if (log.isDebugEnabled()) {
             log.debug("[{}/{}] Received ack request of message {} from {} ", 
consumer.getTopic(),
                     subscription, msgId, 
getRemote().getInetSocketAddress().toString());

Reply via email to