This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 03ec01d14d4 [fix][client] Move MessageIdAdv to the pulsar-common module (#20139) 03ec01d14d4 is described below commit 03ec01d14d418233ce2fba2da7f429665df90de5 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Thu Apr 20 17:49:27 2023 +0800 [fix][client] Move MessageIdAdv to the pulsar-common module (#20139) (cherry picked from commit 99a68e40c9dcc0771ddfd73321cc967be6433801) --- .../apache/pulsar/client/api/TopicMessageId.java | 82 +--------------------- .../PulsarClientImplementationBinding.java | 3 + .../apache/pulsar/client/impl/ConsumerImpl.java | 2 +- .../apache/pulsar/client/impl/MessageIdImpl.java | 4 +- .../PulsarClientImplementationBindingImpl.java | 18 ++++- .../pulsar/client/impl/TopicMessageIdImpl.java | 77 ++++++++++++++++++-- .../pulsar/client/impl/TopicMessageImpl.java | 3 +- .../pulsar/client/impl/MessageIdCompareToTest.java | 5 -- .../pulsar/client/impl/TopicMessageIdImplTest.java | 12 ++-- .../org/apache/pulsar/client/api/MessageIdAdv.java | 0 .../org/apache/pulsar/client/api/package-info.java | 22 ++++++ .../io/kafka/connect/KafkaConnectSinkTest.java | 4 +- .../apache/pulsar/websocket/ConsumerHandler.java | 6 +- 13 files changed, 133 insertions(+), 105 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java index b70267bb0fb..4d02a7f4096 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.api; -import java.util.BitSet; +import org.apache.pulsar.client.internal.DefaultImplementation; /** * The MessageId used for a consumer that subscribes multiple topics or partitioned topics. @@ -45,84 +45,6 @@ public interface TopicMessageId extends MessageId { if (messageId instanceof TopicMessageId) { return (TopicMessageId) messageId; } - return new Impl(topic, messageId); - } - - /** - * The simplest implementation of a TopicMessageId interface. - */ - class Impl implements MessageIdAdv, TopicMessageId { - private final String topic; - private final MessageIdAdv messageId; - - public Impl(String topic, MessageId messageId) { - this.topic = topic; - this.messageId = (MessageIdAdv) messageId; - } - - @Override - public byte[] toByteArray() { - return messageId.toByteArray(); - } - - @Override - public String getOwnerTopic() { - return topic; - } - - @Override - public long getLedgerId() { - return messageId.getLedgerId(); - } - - @Override - public long getEntryId() { - return messageId.getEntryId(); - } - - @Override - public int getPartitionIndex() { - return messageId.getPartitionIndex(); - } - - @Override - public int getBatchIndex() { - return messageId.getBatchIndex(); - } - - @Override - public int getBatchSize() { - return messageId.getBatchSize(); - } - - @Override - public BitSet getAckSet() { - return messageId.getAckSet(); - } - - @Override - public MessageIdAdv getFirstChunkMessageId() { - return messageId.getFirstChunkMessageId(); - } - - @Override - public int compareTo(MessageId o) { - return messageId.compareTo(o); - } - - @Override - public boolean equals(Object obj) { - return messageId.equals(obj); - } - - @Override - public int hashCode() { - return messageId.hashCode(); - } - - @Override - public String toString() { - return messageId.toString(); - } + return DefaultImplementation.getDefaultImplementation().newTopicMessageId(topic, messageId); } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index 875a7930235..8fd05bff265 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessagePayloadFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; @@ -252,4 +253,6 @@ public interface PulsarClientImplementationBinding { SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp, Map<String, String> propertiesValue); + + TopicMessageId newTopicMessageId(String topic, MessageId messageId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index cc016093196..199e8a9ae71 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2345,7 +2345,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @Override public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() { return getLastMessageIdAsync() - .thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId))); + .thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(topic, (MessageIdAdv) msgId))); } public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 83ee7625783..8cffba44dc5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -128,7 +128,7 @@ public class MessageIdImpl implements MessageIdAdv { throw new IOException(e); } - MessageId messageId; + MessageIdAdv messageId; if (idData.hasBatchIndex()) { if (idData.hasBatchSize()) { messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), @@ -143,7 +143,7 @@ public class MessageIdImpl implements MessageIdAdv { } if (idData.getPartition() > -1 && topicName != null) { messageId = new TopicMessageIdImpl( - topicName.getPartition(idData.getPartition()).toString(), topicName.toString(), messageId); + topicName.getPartition(idData.getPartition()).toString(), messageId); } return messageId; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index 1b069c5172d..346eb20ef4c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -35,9 +34,11 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessagePayloadFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; @@ -387,4 +388,19 @@ public final class PulsarClientImplementationBindingImpl implements PulsarClient Map<String, String> propertiesValue) { return new SchemaInfoImpl(name, schema, type, timestamp, propertiesValue); } + + @Override + public TopicMessageId newTopicMessageId(String topic, MessageId messageId) { + final MessageIdAdv messageIdAdv; + if (messageId instanceof MessageIdAdv) { + messageIdAdv = (MessageIdAdv) messageId; + } else { + try { + messageIdAdv = (MessageIdAdv) MessageId.fromByteArray(messageId.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return new TopicMessageIdImpl(topic, messageIdAdv); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index 189dc1c6083..00fe12b62b1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -18,15 +18,27 @@ */ package org.apache.pulsar.client.impl; +import java.util.BitSet; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.TopicMessageId; -public class TopicMessageIdImpl extends TopicMessageId.Impl { +public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId { - private final String topicName; + private final String ownerTopic; + private final MessageIdAdv msgId; + private final String topicName; // it's never used + public TopicMessageIdImpl(String topic, MessageIdAdv msgId) { + this.ownerTopic = topic; + this.msgId = msgId; + this.topicName = ""; + } + + @Deprecated public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { - super(topicPartitionName, messageId); + this.msgId = (MessageIdAdv) messageId; + this.ownerTopic = topicPartitionName; this.topicName = topicName; } @@ -55,11 +67,66 @@ public class TopicMessageIdImpl extends TopicMessageId.Impl { @Override public boolean equals(Object obj) { - return super.equals(obj); + return msgId.equals(obj); } @Override public int hashCode() { - return super.hashCode(); + return msgId.hashCode(); + } + + @Override + public int compareTo(MessageId o) { + return msgId.compareTo(o); + } + + @Override + public byte[] toByteArray() { + return msgId.toByteArray(); + } + + @Override + public String getOwnerTopic() { + return ownerTopic; + } + + @Override + public long getLedgerId() { + return msgId.getLedgerId(); + } + + @Override + public long getEntryId() { + return msgId.getEntryId(); + } + + @Override + public int getPartitionIndex() { + return msgId.getPartitionIndex(); + } + + @Override + public int getBatchIndex() { + return msgId.getBatchIndex(); + } + + @Override + public int getBatchSize() { + return msgId.getBatchSize(); + } + + @Override + public BitSet getAckSet() { + return msgId.getAckSet(); + } + + @Override + public MessageIdAdv getFirstChunkMessageId() { + return msgId.getFirstChunkMessageId(); + } + + @Override + public String toString() { + return msgId.toString(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index d24ecbd6aa9..1b6cba2f723 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Optional; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.EncryptionContext; @@ -42,7 +43,7 @@ public class TopicMessageImpl<T> implements Message<T> { this.receivedByconsumer = receivedByConsumer; this.msg = msg; - this.messageId = new TopicMessageIdImpl(topicPartitionName, topicPartitionName, msg.getMessageId()); + this.messageId = new TopicMessageIdImpl(topicPartitionName, (MessageIdAdv) msg.getMessageId()); } /** diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 4f0eca6ea4a..fd81e9d5790 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -148,15 +148,12 @@ public class MessageIdCompareToTest { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( "test-topic-partition-0", - "test-topic", new BatchMessageIdImpl(123L, 345L, 566, 789)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( "test-topic-partition-0", - "test-topic", new BatchMessageIdImpl(123L, 345L, 567, 789)); TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl( "test-topic-partition-0", - "test-topic", new BatchMessageIdImpl(messageIdImpl)); assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to be less than"); @@ -173,11 +170,9 @@ public class MessageIdCompareToTest { BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1); TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( "test-topic-partition-0", - "test-topic", new MessageIdImpl(123L, 345L, 566)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( "test-topic-partition-0", - "test-topic", new MessageIdImpl(123L, 345L, 567)); assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than"); assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than"); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java index d2e2ce9c15c..daf49f0e775 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicMessageIdImplTest.java @@ -28,9 +28,9 @@ public class TopicMessageIdImplTest { public void hashCodeTest() { MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0); MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1); - TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", "topic", msgId1); - TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1); - TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", "topic", msgId2); + TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", msgId1); + TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", msgId1); + TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", msgId2); assertEquals(topicMsgId1.hashCode(), topicMsgId1.hashCode()); assertEquals(topic2MsgId1.hashCode(), topic2MsgId1.hashCode()); @@ -43,9 +43,9 @@ public class TopicMessageIdImplTest { public void equalsTest() { MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0); MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1); - TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", "topic", msgId1); - TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1); - TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", "topic", msgId2); + TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", msgId1); + TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", msgId1); + TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", msgId2); assertEquals(topicMsgId1, topicMsgId1); assertEquals(topicMsgId1, topic2MsgId1); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java similarity index 100% rename from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java rename to pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java new file mode 100644 index 00000000000..3f6d1d56e10 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Additional helper classes to the pulsar-client-api module. + */ +package org.apache.pulsar.client.api; diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 5410e0bb8d6..1100b13b425 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -1572,7 +1572,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { assertNull(ref); ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage( - new TopicMessageIdImpl("topic-0", "topic", new MessageIdImpl(ledgerId, entryId, 0)) + new TopicMessageIdImpl("topic-0", new MessageIdImpl(ledgerId, entryId, 0)) ); assertNull(ref); @@ -1584,7 +1584,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase { assertEquals(ref.getBatchIdx(), batchIdx); ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage( - new TopicMessageIdImpl("topic-0", "topic", new BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx)) + new TopicMessageIdImpl("topic-0", new BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx)) ); assertEquals(ref.getLedgerId(), ledgerId); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 579b4233399..c988fd1e70c 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; @@ -45,6 +46,7 @@ import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.websocket.data.ConsumerCommand; @@ -293,8 +295,8 @@ public class ConsumerHandler extends AbstractWebSocketHandler { private void handleAck(ConsumerCommand command) throws IOException { // We should have received an ack - TopicMessageId msgId = TopicMessageId.create(topic.toString(), - MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId))); + TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(), + (MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId))); if (log.isDebugEnabled()) { log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(), subscription, msgId, getRemote().getInetSocketAddress().toString());