sijie commented on a change in pull request #8618:
URL: https://github.com/apache/pulsar/pull/8618#discussion_r541122916
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
##########
@@ -23,7 +23,9 @@
import com.google.common.base.Charsets;
import java.time.Clock;
import java.util.Arrays;
+import java.util.HashSet;
Review comment:
I don't think we need these imports here. correct?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
##########
@@ -61,10 +61,12 @@ public void findMessages(final long timestamp,
AsyncCallbacks.FindEntryCallback
}
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
entry -> {
- MessageImpl msg = null;
+ MessageImpl<byte[]> msg = null;
try {
- msg = MessageImpl.deserialize(entry.getDataBuffer());
- return msg.getPublishTime() < timestamp;
+ msg =
MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
+ return msg.getBrokerEntryMetadata() != null
Review comment:
Can we add a method in `MessageImpl` to compare the timestamps?
```
boolean publishedEarlierThan(long timestamp)?
```
Like what you did at
https://github.com/apache/pulsar/pull/8618/files#diff-955419b4b0ad976e96f9f7595989e79c391109aeaa304bd286a80fc6eb9360c7R299
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -35,6 +37,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.pulsar.common.protocol.Commands;
Review comment:
I don't think we need this import.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2145,15 +2155,21 @@ public void terminateFailed(ManagedLedgerException
exception, Object ctx) {
}
public boolean isOldestMessageExpired(ManagedCursor cursor, long
messageTTLInSeconds) {
- MessageImpl msg = null;
+ MessageImpl<byte[]> msg = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
- msg = MessageImpl.deserialize(entry.getDataBuffer());
- isOldestMessageExpired = messageTTLInSeconds != 0 &&
System.currentTimeMillis() > (msg.getPublishTime()
- + TimeUnit.SECONDS.toMillis((long)
(messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD)));
+ msg =
MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
Review comment:
Can we use `MessageImpl.isExpired`?
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1920,6 +1929,72 @@ private static ByteBufPair
serializeCommandSendWithSize(BaseCommand.Builder cmdB
return command;
}
+ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload,
+
Set<BrokerEntryMetadataInterceptor> interceptors) {
+ // | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE
| BROKER_ENTRY_METADATA |
+ // | 2 bytes | 4 bytes
| BROKER_ENTRY_METADATA_SIZE bytes |
+
+ PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder =
PulsarApi.BrokerEntryMetadata.newBuilder();
+ for (BrokerEntryMetadataInterceptor interceptor : interceptors) {
+ interceptor.intercept(brokerMetadataBuilder);
+ }
+ PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
brokerMetadataBuilder.build();
+ int brokerMetaSize = brokerEntryMetadata.getSerializedSize();
+ ByteBuf brokerMeta =
+ PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6,
brokerMetaSize + 6);
+ brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+ brokerMeta.writeInt(brokerMetaSize);
+ ByteBufCodedOutputStream outStream =
ByteBufCodedOutputStream.get(brokerMeta);
+ try {
+ brokerEntryMetadata.writeTo(outStream);
+ } catch (IOException e) {
+ // This is in-memory serialization, should not fail
+ throw new RuntimeException(e);
+ }
+ outStream.recycle();
+
+ CompositeByteBuf compositeByteBuf =
PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+ compositeByteBuf.addComponents(true, brokerMeta, headerAndPayload);
+ return compositeByteBuf;
+ }
+
+ public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf
headerAndPayloadWithBrokerEntryMetadata) {
+ int readerIndex =
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
+ if (headerAndPayloadWithBrokerEntryMetadata.readShort() ==
magicBrokerEntryMetadata) {
+ int brokerEntryMetadataSize =
headerAndPayloadWithBrokerEntryMetadata.readInt();
+
headerAndPayloadWithBrokerEntryMetadata.readerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex()
+ + brokerEntryMetadataSize);
+ } else {
+ headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
+ }
+ return headerAndPayloadWithBrokerEntryMetadata;
+ }
+
+ public static PulsarApi.BrokerEntryMetadata
parseBrokerEntryMetadataIfExist(
+ ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
+ int readerIndex =
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
+ if (headerAndPayloadWithBrokerEntryMetadata.readShort() ==
magicBrokerEntryMetadata) {
+ int brokerEntryMetadataSize =
headerAndPayloadWithBrokerEntryMetadata.readInt();
+ int writerIndex =
headerAndPayloadWithBrokerEntryMetadata.writerIndex();
+
headerAndPayloadWithBrokerEntryMetadata.writerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex()
+ + brokerEntryMetadataSize);
+ ByteBufCodedInputStream brokerEntryMetadataInputStream =
+
ByteBufCodedInputStream.get(headerAndPayloadWithBrokerEntryMetadata);
+ PulsarApi.BrokerEntryMetadata.Builder builder =
PulsarApi.BrokerEntryMetadata.newBuilder();
+ try {
+ builder.mergeFrom(brokerEntryMetadataInputStream,
null).build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
Review comment:
Same question here - we should let the caller catch the exception.
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.intercept;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A tool class for loading BrokerEntryMetadataInterceptor classes.
+ */
+public class BrokerEntryMetadataUtils {
+
+ private static final Logger log =
LoggerFactory.getLogger(BrokerEntryMetadataUtils.class);
+
+ public static Set<BrokerEntryMetadataInterceptor>
loadBrokerEntryMetadataInterceptors(
+ Set<String> interceptorNames, ClassLoader classLoader) {
+ Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>();
+ if (interceptorNames != null && interceptorNames.size() > 0) {
+ for (String interceptorName : interceptorNames) {
+ try {
+ Class<BrokerEntryMetadataInterceptor> clz =
(Class<BrokerEntryMetadataInterceptor>) ClassLoaderUtils
+ .loadClass(interceptorName, classLoader);
+ try {
+ interceptors.add(clz.newInstance());
+ } catch (InstantiationException | IllegalAccessException
e) {
+ log.error("Create new BrokerEntryMetadataInterceptor
instance for {} falied.",
Review comment:
I think we should throw RuntimeExceptions if we failed to load broker
entry metadata interceptors.
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1920,6 +1929,72 @@ private static ByteBufPair
serializeCommandSendWithSize(BaseCommand.Builder cmdB
return command;
}
+ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload,
+
Set<BrokerEntryMetadataInterceptor> interceptors) {
+ // | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE
| BROKER_ENTRY_METADATA |
+ // | 2 bytes | 4 bytes
| BROKER_ENTRY_METADATA_SIZE bytes |
+
+ PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder =
PulsarApi.BrokerEntryMetadata.newBuilder();
+ for (BrokerEntryMetadataInterceptor interceptor : interceptors) {
+ interceptor.intercept(brokerMetadataBuilder);
+ }
+ PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
brokerMetadataBuilder.build();
+ int brokerMetaSize = brokerEntryMetadata.getSerializedSize();
+ ByteBuf brokerMeta =
+ PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6,
brokerMetaSize + 6);
+ brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+ brokerMeta.writeInt(brokerMetaSize);
+ ByteBufCodedOutputStream outStream =
ByteBufCodedOutputStream.get(brokerMeta);
+ try {
+ brokerEntryMetadata.writeTo(outStream);
+ } catch (IOException e) {
+ // This is in-memory serialization, should not fail
+ throw new RuntimeException(e);
Review comment:
If we failed to serialize entry metadata into a bytebuf, we should throw
an exception. The caller of this method should catch this exception and fail
the write requests.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
##########
@@ -90,6 +91,11 @@ public void expireMessages(int messageTTLInSeconds) {
}
}
+ public boolean isExpired(int messageTTLInSeconds, long
brokerTimestampForMessage) {
Review comment:
We don't this method anymore, correct?
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
##########
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.intercept;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+/**
+ * A plugin interface that allows you to intercept the client requests to
+ * the Pulsar brokers and add timestamp from broker side metadata for each
entry.
+ */
+public class AppendBrokerTimestampMetadataInterceptor implements
BrokerEntryMetadataInterceptor{
Review comment:
```suggestion
public class AppendBrokerTimestampMetadataInterceptor implements
BrokerEntryMetadataInterceptor {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]