BewareMyPower commented on code in PR #24522: URL: https://github.com/apache/pulsar/pull/24522#discussion_r2210058349
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java: ########## @@ -420,6 +420,13 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L */ long getOffloadedSize(); + /** + * Reset Exception before write to null. + */ + default void resetExceptionBeforeWrite() { Review Comment: This method is confusing to managed ledger implementation that how should it implement it. Could it be better to introduce `fence` and `unfence` mechanisms to managed ledger and call them in the same method of `PersistentTopic`? Besides, I think there might be some other solutions to avoid introducing a confusing default method to the managed ledger: 1. Maintain the exception in `PersistentTopic` in `addFailed` or `fence()` and fail the `asyncAddEntry` if the exception is not null. The challenge is that though `internalAsyncAddEntry` is synchronized, it's executed in the internal executor, so `asyncAddEntry` cannot synchronize the fence operation. 2. Maintain the exception in managed ledger interceptor. Just some thoughts, I didn't try these solutions at the moment. ########## pulsar-broker/src/test/java/org/apache/pulsar/broker/PublishWithMLPayloadProcessorTest.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +public class PublishWithMLPayloadProcessorTest extends ProducerConsumerBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setBrokerEntryPayloadProcessors( + Collections.singleton("org.apache.pulsar.broker.ManagedLedgerPayloadProcessor0")); Review Comment: ```suggestion conf.setBrokerEntryPayloadProcessors(Set.of(ManagedLedgerPayloadProcessor0.class.getName())); ``` Be IDE friendly that IDE can search the reference of `ManagedLedgerPayloadProcessor0` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org