This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ab9384b02f2c8b46a4c84121f3c709d8cbad2201 Author: coderzc <[email protected]> AuthorDate: Tue Aug 29 09:42:21 2023 +0800 Revert "[fix][broker] Fix get topic policies as null during clean cache (#20763)" This reverts commit 111c14d16156fd7a364a53c1bedf23b34da9526c. --- .../pulsar/compaction/TwoPhaseCompactor.java | 1 - .../apache/pulsar/compaction/CompactorTest.java | 95 +++++----------------- 2 files changed, 22 insertions(+), 74 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index d71227d94f5..21ccb281f05 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -283,7 +283,6 @@ public class TwoPhaseCompactor extends Compactor { promise.complete(null); } }); - return; } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 5f83ef97d32..e86be6a4db8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -19,10 +19,13 @@ package org.apache.pulsar.compaction; import static org.apache.pulsar.client.impl.RawReaderTest.extractKey; + import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import io.netty.buffer.ByteBuf; + import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; @@ -30,30 +33,23 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import lombok.Cleanup; + import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; -import org.awaitility.Awaitility; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -69,6 +65,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { protected Compactor compactor; + @BeforeMethod @Override public void setup() throws Exception { @@ -104,17 +101,16 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { return compactor; } - private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) - throws Exception { + private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception { long compactedLedgerId = compact(topic); LedgerHandle ledger = bk.openLedger(compactedLedgerId, - Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, - Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); + Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, + Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD); Assert.assertEquals(ledger.getLastAddConfirmed() + 1, // 0..lac - expected.size(), - "Should have as many entries as there is keys"); + expected.size(), + "Should have as many entries as there is keys"); List<String> keys = new ArrayList<>(); Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed()); @@ -128,7 +124,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { byte[] bytes = new byte[payload.readableBytes()]; payload.readBytes(bytes); Assert.assertEquals(bytes, expected.remove(key), - "Compacted version should match expected version"); + "Compacted version should match expected version"); m.close(); } if (checkMetrics) { @@ -152,18 +148,17 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { final int numMessages = 1000; final int maxKeys = 10; - @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); Random r = new Random(0); for (int j = 0; j < numMessages; j++) { int keyIndex = r.nextInt(maxKeys); - String key = "key" + keyIndex; + String key = "key"+keyIndex; byte[] data = ("my-message-" + key + "-" + j).getBytes(); producer.newMessage() .key(key) @@ -178,11 +173,10 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { public void testCompactAddCompact() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); @@ -216,11 +210,10 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { public void testCompactedInOrder() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.newMessage() .key("c") @@ -263,50 +256,6 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60); } - @Test - public void testCompactedWithConcurrentSend() throws Exception { - String topic = "persistent://my-property/use/my-ns/testCompactedWithConcurrentSend"; - - @Cleanup - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); - - BookKeeper bk = pulsar.getBookKeeperClientFactory().create( - this.conf, null, null, Optional.empty(), null); - Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); - - CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { - for (int i = 0; i < 100; i++) { - try { - producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i).getBytes()).send(); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }); - - PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); - CompactedTopic compactedTopic = persistentTopic.getCompactedTopic(); - - Awaitility.await().untilAsserted(() -> { - long compactedLedgerId = compactor.compact(topic).join(); - Thread.sleep(300); - Optional<CompactedTopicContext> compactedTopicContext = persistentTopic.getCompactedTopicContext(); - Assert.assertTrue(compactedTopicContext.isPresent()); - Assert.assertEquals(compactedTopicContext.get().ledger.getId(), compactedLedgerId); - }); - - Position lastCompactedPosition = compactedTopic.getCompactionHorizon().get(); - Entry lastCompactedEntry = compactedTopic.readLastEntryOfCompactedLedger().get(); - - Assert.assertTrue(PositionImpl.get(lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) - .compareTo(lastCompactedEntry.getLedgerId(), lastCompactedEntry.getEntryId()) >= 0); - - future.join(); - } - public ByteBuf extractPayload(RawMessage m) throws Exception { ByteBuf payloadAndMetadata = m.getHeadersAndPayload(); Commands.skipChecksumIfPresent(payloadAndMetadata);
