This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5f7a6afa2e3b166faf9902fe97a82c7e6902ddaf Author: Heesung Sohn <[email protected]> AuthorDate: Tue Jun 14 13:31:24 2022 -0700 [improve][tests] improved flaky test runs (#16011) * [improve][tests] improved flaky test runs - improved PulsarFunctionTlsTests by reordering tearDown() logic - improved ManagedLedgerFactoryImpl.shutdown() by closing cacheEviction threads early - improved TestPulsarConnector memory consumption by removing unnecessary spy() - improved PulsarFunctionsTest run by using receive() instead of receive(30, TimeUnit.SECONDS); * Reverted PulsarFunctionsTest consumer.receive() change (cherry picked from commit b1b25ef15be4595e2284cd4bbd4b8cfe39ed0743) --- .../apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 6 +++--- .../org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java | 7 ++++--- .../src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java | 4 ++-- .../java/org/apache/pulsar/sql/presto/TestPulsarConnector.java | 4 ++-- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 5ba1dc14c44..d829efad87f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -257,7 +257,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001); long waitTimeMillis = (long) (1000 / evictionFrequency); - while (true) { + while (!closed) { try { doCacheEviction(); @@ -509,6 +509,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { statsTask.cancel(true); flushCursorsTask.cancel(true); + cacheEvictionExecutor.shutdownNow(); List<String> ledgerNames = new ArrayList<>(this.ledgers.keySet()); List<CompletableFuture<Void>> futures = new ArrayList<>(ledgerNames.size()); @@ -589,7 +590,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { })); } })); - cacheEvictionExecutor.shutdownNow(); entryCacheManager.clear(); return FutureUtil.waitForAll(futures); } @@ -603,6 +603,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { statsTask.cancel(true); flushCursorsTask.cancel(true); + cacheEvictionExecutor.shutdownNow(); // take a snapshot of ledgers currently in the map to prevent race conditions List<CompletableFuture<ManagedLedgerImpl>> ledgers = new ArrayList<>(this.ledgers.values()); @@ -646,7 +647,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { } scheduledExecutor.shutdownNow(); - cacheEvictionExecutor.shutdownNow(); entryCacheManager.clear(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 64e3b588236..844596743d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -177,12 +177,13 @@ public class PulsarFunctionTlsTest { void tearDown() throws Exception { try { for (int i = 0; i < BROKER_COUNT; i++) { - if (pulsarServices[i] != null) { - pulsarServices[i].close(); - } if (pulsarAdmins[i] != null) { pulsarAdmins[i].close(); } + if (pulsarServices[i] != null) { + pulsarServices[i].close(); + } + } bkEnsemble.stop(); } finally { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 15ee27dc3a5..9f6d9f65dbb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -171,9 +171,9 @@ public class PulsarFunctionTlsTest { log.info("--- Shutting down ---"); try { functionAdmin.close(); - bkEnsemble.stop(); - workerServer.stop(); functionsWorkerService.stop(); + workerServer.stop(); + bkEnsemble.stop(); } finally { if (tempDirectory != null) { tempDirectory.delete(); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index 7db32f59148..e7b19c8e5f5 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -699,10 +699,10 @@ public abstract class TestPulsarConnector { when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory); for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) { - PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor( + PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor( topicsToColumnHandles.get(split.getKey()), split.getValue(), pulsarConnectorConfig, managedLedgerFactory, new ManagedLedgerConfig(), - new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory)); + new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory); this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor); } }
