This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d309b2c Refactored to reflect changes in BK for OrderedExecutor (#1489) d309b2c is described below commit d309b2cb2f766151dc64861ba15e6dc71fbe8d8f Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Apr 3 14:48:46 2018 -0700 Refactored to reflect changes in BK for OrderedExecutor (#1489) * Refactored to reflect changes in BK for OrderedExecutor * Reverted back part of last change * Fixed test --- .../bookkeeper/mledger/impl/EntryCacheImpl.java | 4 +- .../bookkeeper/mledger/impl/EntryCacheManager.java | 2 +- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++- .../mledger/impl/ManagedLedgerFactoryImpl.java | 21 +++++----- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 45 ++++++++++---------- .../mledger/impl/MetaStoreImplZookeeper.java | 36 +++++++++------- .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 4 +- .../bookkeeper/mledger/impl/OpReadEntry.java | 6 +-- .../mledger/impl/EntryCacheManagerTest.java | 6 +-- .../mledger/impl/ManagedLedgerMBeanTest.java | 8 ++-- .../org/apache/pulsar/broker/PulsarService.java | 9 ++-- .../pulsar/broker/namespace/NamespaceService.java | 4 +- .../pulsar/broker/service/BrokerService.java | 20 ++++----- .../apache/pulsar/broker/service/ServerCnx.java | 2 +- .../service/nonpersistent/NonPersistentTopic.java | 9 ++-- .../service/persistent/MessageDeduplication.java | 2 +- .../PersistentDispatcherMultipleConsumers.java | 2 +- .../broker/auth/SameThreadOrderedSafeExecutor.java | 14 +++---- .../broker/cache/ResourceQuotaCacheTest.java | 10 +---- .../broker/namespace/OwnershipCacheTest.java | 11 ++--- .../apache/pulsar/client/impl/ConsumerImpl.java | 2 +- .../discovery/service/BrokerDiscoveryProvider.java | 1 - .../service/web/ZookeeperCacheLoader.java | 10 +---- .../proxy/server/util/ZookeeperCacheLoader.java | 10 +---- .../pulsar/zookeeper/GlobalZooKeeperCache.java | 6 +-- .../pulsar/zookeeper/LocalZooKeeperCache.java | 9 ++-- .../apache/pulsar/zookeeper/ZooKeeperCache.java | 49 ++++++++++------------ .../zookeeper/ZookeeperBkClientFactoryImpl.java | 12 +++--- .../pulsar/zookeeper/ZookeeperCacheTest.java | 12 +++--- 29 files changed, 152 insertions(+), 180 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index f267a6c..60ba634 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -188,7 +188,7 @@ public class EntryCacheImpl implements EntryCache { manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength()); ml.mbean.addReadEntriesSample(1, returnEntry.getLength()); - ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> { + ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { callback.readEntryComplete(returnEntry, obj); })); } else { @@ -254,7 +254,7 @@ public class EntryCacheImpl implements EntryCache { checkNotNull(ml.getName()); checkNotNull(ml.getExecutor()); - ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> { + ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { // We got the entries, we need to transform them to a List<> type long totalSize = 0; final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index 7faa18c..262cbeb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -109,7 +109,7 @@ public class EntryCacheManager { // Trigger a single eviction in background. While the eviction is running we stop inserting entries in the cache if (currentSize > evictionTriggerThreshold && evictionInProgress.compareAndSet(false, true)) { - mlFactory.executor.execute(safeRun(() -> { + mlFactory.scheduledExecutor.execute(safeRun(() -> { // Trigger a new cache eviction cycle to bring the used memory below the cacheEvictionWatermark // percentage limit long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermak); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 29fe0a7..a2be71d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -37,6 +37,7 @@ import com.google.common.collect.Sets; import com.google.common.collect.TreeRangeSet; import com.google.common.util.concurrent.RateLimiter; import com.google.protobuf.InvalidProtocolBufferException; + import java.util.ArrayDeque; import java.util.Collections; import java.util.List; @@ -51,6 +52,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; + import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.BKException; @@ -847,7 +849,7 @@ public class ManagedCursorImpl implements ManagedCursor { final PositionImpl newPosition = (PositionImpl) newPos; // order trim and reset operations on a ledger - ledger.getExecutor().submitOrdered(ledger.getName(), safeRun(() -> { + ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> { if (ledger.isValidPosition(newPosition) || newPosition.equals(PositionImpl.earliest) || newPosition.equals(PositionImpl.latest)) { internalResetCursor(newPosition, callback); @@ -1923,7 +1925,7 @@ public class ManagedCursorImpl implements ManagedCursor { bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(), config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - ledger.getExecutor().submit(safeRun(() -> { + ledger.getExecutor().execute(safeRun(() -> { ledger.mbean.endCursorLedgerCreateOp(); if (rc != BKException.Code.OK) { log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 4c0d3d7..d148d1d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -23,7 +23,7 @@ import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLed import com.google.common.base.Predicates; import com.google.common.collect.Maps; -import io.netty.util.concurrent.DefaultThreadFactory; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,12 +31,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -63,7 +63,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.util.Futures; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.pulsar.common.util.DateFormatter; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; @@ -77,9 +76,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final boolean isBookkeeperManaged; private final ZooKeeper zookeeper; private final ManagedLedgerFactoryConfig config; - protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(16, - new DefaultThreadFactory("bookkeeper-ml")); - private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(16) + protected final OrderedScheduler scheduledExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(16) + .name("bookkeeper-ml-scheduler").build(); + private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(16) .name("bookkeeper-ml-workers").build(); protected final ManagedLedgerFactoryMBeanImpl mbean; @@ -122,7 +121,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { this.config = config; this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); - this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS); + this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS); } public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception { @@ -138,7 +137,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { this.config = config; this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); - this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS); + this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS); } private synchronized void refreshStats() { @@ -232,7 +231,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { ledgers.computeIfAbsent(name, (mlName) -> { // Create the managed ledger CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>(); - final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, bookKeeper, store, config, executor, + final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, bookKeeper, store, config, scheduledExecutor, orderedExecutor, name); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @Override @@ -305,7 +304,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { } } - executor.shutdown(); + scheduledExecutor.shutdown(); orderedExecutor.shutdown(); entryCacheManager.clear(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 34283ed..4aceb6c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -22,18 +22,26 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.common.collect.Range; +import com.google.common.util.concurrent.RateLimiter; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Queue; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -45,6 +53,7 @@ import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -77,23 +86,13 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.mledger.util.Pair; import org.apache.pulsar.common.api.Commands; -import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.BoundType; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -import com.google.common.collect.Range; -import com.google.common.util.concurrent.RateLimiter; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final static long MegaByte = 1024 * 1024; @@ -190,8 +189,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state"); private volatile State state = null; - private final ScheduledExecutorService scheduledExecutor; - private final OrderedScheduler executor; + private final OrderedScheduler scheduledExecutor; + private final OrderedExecutor executor; final ManagedLedgerFactoryImpl factory; protected final ManagedLedgerMBeanImpl mbean; @@ -204,7 +203,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // ////////////////////////////////////////////////////////////////////// public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, - ManagedLedgerConfig config, ScheduledExecutorService scheduledExecutor, OrderedScheduler orderedExecutor, + ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor, final String name) { this.factory = factory; this.bookKeeper = bookKeeper; @@ -250,7 +249,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (ledgers.size() > 0) { final long id = ledgers.lastKey(); OpenCallback opencb = (rc, lh, ctx1) -> { - executor.submitOrdered(name, safeRun(() -> { + executor.executeOrdered(name, safeRun(() -> { mbean.endDataLedgerOpenOp(); if (log.isDebugEnabled()) { log.debug("[{}] Opened ledger {}: ", name, id, BKException.getMessage(rc)); @@ -338,7 +337,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - executor.submitOrdered(name, safeRun(() -> { + executor.executeOrdered(name, safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { callback.initializeFailed(createManagedLedgerException(rc)); @@ -1319,7 +1318,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { mbean.startDataLedgerOpenOp(); bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (int rc, LedgerHandle lh, Object ctx) -> { - executor.submit(safeRun(() -> { + executor.executeOrdered(name, safeRun(() -> { mbean.endDataLedgerOpenOp(); if (rc != BKException.Code.OK) { // Remove the ledger future from cache to give chance to reopen it later @@ -1484,12 +1483,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { break; } - executor.submit(safeRun(() -> waitingCursor.notifyEntriesAvailable())); + executor.execute(safeRun(() -> waitingCursor.notifyEntriesAvailable())); } } private void trimConsumedLedgersInBackground() { - executor.submitOrdered(name, safeRun(() -> { + executor.executeOrdered(name, safeRun(() -> { internalTrimConsumedLedgers(); })); } @@ -2113,11 +2112,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return ledgers; } - ScheduledExecutorService getScheduledExecutor() { + OrderedScheduler getScheduledExecutor() { return scheduledExecutor; } - OrderedScheduler getExecutor() { + OrderedExecutor getExecutor() { return executor; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 7a76975..31705c1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -20,11 +20,16 @@ package org.apache.bookkeeper.mledger.impl; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.base.Charsets; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.TextFormat; +import com.google.protobuf.TextFormat.ParseException; + import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; @@ -40,11 +45,6 @@ import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.TextFormat; -import com.google.protobuf.TextFormat.ParseException; - @SuppressWarnings("checkstyle:javadoctype") public class MetaStoreImplZookeeper implements MetaStore { @@ -55,7 +55,7 @@ public class MetaStoreImplZookeeper implements MetaStore { private static final String prefix = prefixName + "/"; private final ZooKeeper zk; - private final OrderedScheduler executor; + private final OrderedExecutor executor; private static class ZKStat implements Stat { private final int version; @@ -90,7 +90,7 @@ public class MetaStoreImplZookeeper implements MetaStore { } } - public MetaStoreImplZookeeper(ZooKeeper zk, OrderedScheduler executor) + public MetaStoreImplZookeeper(ZooKeeper zk, OrderedExecutor executor) throws Exception { this.zk = zk; this.executor = executor; @@ -130,7 +130,8 @@ public class MetaStoreImplZookeeper implements MetaStore { @Override public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback<ManagedLedgerInfo> callback) { // Try to get the content or create an empty node - zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.submit(safeRun(() -> { + zk.getData(prefix + ledgerName, false, + (rc, path, ctx, readData, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (rc == Code.OK.intValue()) { try { ManagedLedgerInfo info = parseManagedLedgerInfo(readData); @@ -171,7 +172,7 @@ public class MetaStoreImplZookeeper implements MetaStore { byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(), - (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> { + (rc, path, zkCtx, stat1) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName, Code.get(rc), stat != null ? stat.getVersion() : "null"); @@ -195,7 +196,8 @@ public class MetaStoreImplZookeeper implements MetaStore { if (log.isDebugEnabled()) { log.debug("[{}] Get cursors list", ledgerName); } - zk.getChildren(prefix + ledgerName, false, (rc, path, ctx, children, stat) -> executor.submit(safeRun(() -> { + zk.getChildren(prefix + ledgerName, false, + (rc, path, ctx, children, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children); } @@ -219,7 +221,7 @@ public class MetaStoreImplZookeeper implements MetaStore { log.debug("Reading from {}", path); } - zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.submit(safeRun(() -> { + zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { @@ -251,7 +253,7 @@ public class MetaStoreImplZookeeper implements MetaStore { log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } zk.create(path, content, Acl, CreateMode.PERSISTENT, - (rc, path1, ctx, name) -> executor.submit(safeRun(() -> { + (rc, path1, ctx, name) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (rc != Code.OK.intValue()) { log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName, cursorName, info, Code.get(rc)); @@ -269,7 +271,8 @@ public class MetaStoreImplZookeeper implements MetaStore { if (log.isDebugEnabled()) { log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } - zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, stat1) -> executor.submit(safeRun(() -> { + zk.setData(path, content, zkStat.getVersion(), + (rc, path1, ctx, stat1) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (rc == Code.BADVERSION.intValue()) { callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc)))); } else if (rc != Code.OK.intValue()) { @@ -285,7 +288,8 @@ public class MetaStoreImplZookeeper implements MetaStore { public void asyncRemoveCursor(final String ledgerName, final String consumerName, final MetaStoreCallback<Void> callback) { log.info("[{}] Remove consumer={}", ledgerName, consumerName); - zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { + zk.delete(prefix + ledgerName + "/" + consumerName, -1, + (rc, path, ctx) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, consumerName, Code.get(rc)); } @@ -300,7 +304,7 @@ public class MetaStoreImplZookeeper implements MetaStore { @Override public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> callback) { log.info("[{}] Remove ManagedLedger", ledgerName); - zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { + zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] zk delete done. rc={}", ledgerName, Code.get(rc)); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 85c4da6..f4daec5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -123,14 +123,14 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { // be marked as failed. ml.mbean.recordAddEntryError(); - ml.getExecutor().submitOrdered(ml.getName(), SafeRun.safeRun(() -> { + ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> { // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock // from a BK callback. ml.ledgerClosed(lh); })); } else { // Trigger addComplete callback in a thread hashed on the managed ledger name - ml.getExecutor().submitOrdered(ml.getName(), this); + ml.getExecutor().executeOrdered(ml.getName(), this); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 31d1c79..6b3a03a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -83,7 +83,7 @@ class OpReadEntry implements ReadEntriesCallback { if (!entries.isEmpty()) { // There were already some entries that were read before, we can return them - cursor.ledger.getExecutor().submit(safeRun(() -> { + cursor.ledger.getExecutor().execute(safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); @@ -131,7 +131,7 @@ class OpReadEntry implements ReadEntriesCallback { } // Schedule next read in a different thread - cursor.ledger.getExecutor().submit(safeRun(() -> { + cursor.ledger.getExecutor().execute(safeRun(() -> { readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition); cursor.ledger.asyncReadEntries(OpReadEntry.this); })); @@ -139,7 +139,7 @@ class OpReadEntry implements ReadEntriesCallback { // The reading was already completed, release resources and trigger callback cursor.readOperationCompleted(); - cursor.ledger.getExecutor().submit(safeRun(() -> { + cursor.ledger.getExecutor().execute(safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 107abcf..47c515d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -24,9 +24,9 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; @@ -41,7 +41,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase { @BeforeClass void setup() throws Exception { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build(); ml1 = mock(ManagedLedgerImpl.class); when(ml1.getScheduledExecutor()).thenReturn(executor); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java index c3e75c2..ebaa1fd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java @@ -65,7 +65,7 @@ public class ManagedLedgerMBeanTest extends MockedBookKeeperTestCase { mbean.addLedgerSwitchLatencySample(1, TimeUnit.SECONDS); // Simulate stats getting update from different thread - factory.executor.submit(() -> { + factory.scheduledExecutor.submit(() -> { mbean.refreshStats(1, TimeUnit.SECONDS); }).get(); @@ -90,7 +90,7 @@ public class ManagedLedgerMBeanTest extends MockedBookKeeperTestCase { ledger.addEntry(new byte[600]); cursor.markDelete(p1); - factory.executor.submit(() -> { + factory.scheduledExecutor.submit(() -> { mbean.refreshStats(1, TimeUnit.SECONDS); }).get(); @@ -109,7 +109,7 @@ public class ManagedLedgerMBeanTest extends MockedBookKeeperTestCase { mbean.recordAddEntryError(); mbean.recordReadEntriesError(); - factory.executor.submit(() -> { + factory.scheduledExecutor.submit(() -> { mbean.refreshStats(1, TimeUnit.SECONDS); }).get(); @@ -119,7 +119,7 @@ public class ManagedLedgerMBeanTest extends MockedBookKeeperTestCase { List<Entry> entries = cursor.readEntries(100); assertEquals(entries.size(), 1); - factory.executor.submit(() -> { + factory.scheduledExecutor.submit(() -> { mbean.refreshStats(1, TimeUnit.SECONDS); }).get(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 58010e8..47753e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -38,6 +38,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; @@ -106,8 +107,8 @@ public class PulsarService implements AutoCloseable { new DefaultThreadFactory("pulsar")); private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("zk-cache-callback")); - private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8) - .name("pulsar-ordered").build(); + private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered") + .build(); private final ScheduledExecutorService loadManagerExecutor; private ScheduledFuture<?> loadReportTask = null; private ScheduledFuture<?> loadSheddingTask = null; @@ -445,7 +446,7 @@ public class PulsarService implements AutoCloseable { LOG.info("starting configuration cache service"); - this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.cacheExecutor); + this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor()); this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), getOrderedExecutor(), this.cacheExecutor); @@ -611,7 +612,7 @@ public class PulsarService implements AutoCloseable { return loadManagerExecutor; } - public OrderedScheduler getOrderedExecutor() { + public OrderedExecutor getOrderedExecutor() { return orderedExecutor; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index fe30713..2668f95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -636,8 +636,8 @@ public class NamespaceService { if (t != null) { // retry several times on BadVersion if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { - pulsar.getOrderedExecutor().submit( - () -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture)); + pulsar.getOrderedExecutor() + .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture)); } else { // Retry enough, or meet other exception String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e1a0d3f..6c2e011 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -476,7 +476,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies replicationFuture.exceptionally((ex) -> { log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex); nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(ex); }); @@ -579,7 +579,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); log.warn(msg); - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); return; } @@ -618,7 +618,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies }); } catch (NamingException e) { log.warn("Failed to create topic {}-{}", topic, e.getMessage()); - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(e); } } @@ -626,7 +626,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies @Override public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new PersistenceException(exception)); } }, null); @@ -635,7 +635,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); // remove topic from topics-map in different thread to avoid possible deadlock if // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(exception); return null; }); @@ -644,7 +644,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) { CompletableFuture<ManagedLedgerConfig> future = new CompletableFuture<>(); // Execute in background thread, since getting the policies might block if the z-node wasn't already cached - pulsar.getOrderedExecutor().submitOrdered(topicName, safeRun(() -> { + pulsar.getOrderedExecutor().executeOrdered(topicName, safeRun(() -> { NamespaceName namespace = topicName.getNamespaceObject(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); @@ -1104,7 +1104,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } private void updateTopicMessageDispatchRate() { - this.pulsar().getExecutor().submit(() -> { + this.pulsar().getExecutor().execute(() -> { // update message-rate for each topic topics.forEach((name, topicFuture) -> { if (topicFuture.isDone()) { @@ -1150,7 +1150,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } private void updateManagedLedgerConfig() { - this.pulsar().getExecutor().submit(() -> { + this.pulsar().getExecutor().execute(() -> { // update managed-ledger config of each topic topics.forEach((name, topicFuture) -> { if (topicFuture.isDone()) { @@ -1403,7 +1403,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies // block dispatcher with higher unack-msg when it reaches broker-unack msg limit log.info("[{}] Starting blocking dispatchers with unacked msgs {} due to reached max broker limit {}", maxUnackedMessages, maxUnackedMsgsPerDispatcher); - executor().submit(() -> blockDispatchersWithLargeUnAckMessages()); + executor().execute(() -> blockDispatchersWithLargeUnAckMessages()); } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < maxUnackedMessages / 2) { // unblock broker-dispatching if received enough acked messages back if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) { @@ -1457,7 +1457,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies try { dispatcherList.forEach(dispatcher -> { dispatcher.unBlockDispatcherOnUnackedMsgs(); - executor().submit(() -> dispatcher.readMoreEntries()); + executor().execute(() -> dispatcher.readMoreEntries()); log.info("[{}] Dispatcher is unblocked", dispatcher.getName()); blockedDispatchers.remove(dispatcher); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bada69c..c052e4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -955,7 +955,7 @@ public class ServerCnx extends PulsarHandler { if (nonPersistentPendingMessages > MaxNonPersistentPendingMessages) { final long producerId = send.getProducerId(); final long sequenceId = send.getSequenceId(); - service.getTopicOrderedExecutor().submitOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> { + service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> { ctx.writeAndFlush(Commands.newSendReceipt(producerId, sequenceId, -1, -1), ctx.voidPromise()); })); producer.recordMessageDrop(send.getNumMessages()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index fc6a920..9eb7ce7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -27,8 +27,10 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; @@ -190,7 +193,7 @@ public class NonPersistentTopic implements Topic { // retain data for sub/replication because io-thread will release actual payload data.retain(2); - this.executor.submitOrdered(topic, SafeRun.safeRun(() -> { + this.executor.executeOrdered(topic, SafeRun.safeRun(() -> { subscriptions.forEach((name, subscription) -> { ByteBuf duplicateBuffer = data.retainedDuplicate(); Entry entry = create(0L, 0L, duplicateBuffer); @@ -210,7 +213,7 @@ public class NonPersistentTopic implements Topic { } })); - this.executor.submitOrdered(topic, SafeRun.safeRun(() -> { + this.executor.executeOrdered(topic, SafeRun.safeRun(() -> { replicators.forEach((name, replicator) -> { ByteBuf duplicateBuffer = data.retainedDuplicate(); Entry entry = create(0L, 0L, duplicateBuffer); @@ -487,7 +490,7 @@ public class NonPersistentTopic implements Topic { FutureUtil.waitForAll(futures).thenRun(() -> { log.info("[{}] Topic closed", topic); - brokerService.pulsar().getExecutor().submit(() -> brokerService.removeTopicFromCache(topic)); + brokerService.pulsar().getExecutor().execute(() -> brokerService.removeTopicFromCache(topic)); closeFuture.complete(null); }).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e7e50b8..443d29b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -152,7 +152,7 @@ public class MessageDeduplication { if (managedCursor.hasMoreEntries()) { // Read next batch of entries - pulsar.getExecutor().submit(() -> replayCursor(future)); + pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying future.complete(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 2eb83ae..17c5db7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -589,7 +589,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu // unblock dispatcher if it acks back enough messages if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { log.info("[{}] Dispatcher is unblocked", name); - topic.getBrokerService().executor().submit(() -> readMoreEntries()); + topic.getBrokerService().executor().execute(() -> readMoreEntries()); } } // increment broker-level count diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java index 98cafc5..2909857 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java @@ -18,30 +18,30 @@ */ package org.apache.pulsar.broker.auth; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import io.netty.util.concurrent.DefaultThreadFactory; + +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.SafeRunnable; import org.apache.bookkeeper.stats.NullStatsLogger; -import io.netty.util.concurrent.DefaultThreadFactory; - -public class SameThreadOrderedSafeExecutor extends OrderedScheduler { +public class SameThreadOrderedSafeExecutor extends OrderedExecutor { public SameThreadOrderedSafeExecutor() { super("same-thread-executor", 1, new DefaultThreadFactory("test"), NullStatsLogger.INSTANCE, false, 100000, 10); } @Override - public void submit(SafeRunnable r) { + public void execute(Runnable r) { r.run(); } @Override - public void submitOrdered(int orderingKey, SafeRunnable r) { + public void executeOrdered(int orderingKey, SafeRunnable r) { r.run(); } @Override - public void submitOrdered(long orderingKey, SafeRunnable r) { + public void executeOrdered(long orderingKey, SafeRunnable r) { r.run(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java index 9fd2fb7..2fe63e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java @@ -25,8 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import com.google.common.hash.Hashing; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.broker.PulsarService; @@ -43,8 +42,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.hash.Hashing; - public class ResourceQuotaCacheTest { private PulsarService pulsar; @@ -52,14 +49,12 @@ public class ResourceQuotaCacheTest { private LocalZooKeeperCacheService localCache; private NamespaceBundleFactory bundleFactory; private OrderedScheduler executor; - private ScheduledExecutorService scheduledExecutor; @BeforeMethod public void setup() throws Exception { pulsar = mock(PulsarService.class); executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build(); - scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor); + zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor); localCache = new LocalZooKeeperCacheService(zkCache, null); // set mock pulsar localzkcache @@ -77,7 +72,6 @@ public class ResourceQuotaCacheTest { @AfterMethod public void teardown() { executor.shutdown(); - scheduledExecutor.shutdown(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index e8d4bb9..dfca1b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -33,10 +33,10 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.hash.Hashing; + import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.common.util.OrderedScheduler; @@ -58,8 +58,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.hash.Hashing; - public class OwnershipCacheTest { private PulsarService pulsar; @@ -71,7 +69,6 @@ public class OwnershipCacheTest { private NamespaceService nsService; private BrokerService brokerService; private OrderedScheduler executor; - private ScheduledExecutorService scheduledExecutor; @BeforeMethod public void setup() throws Exception { @@ -80,8 +77,7 @@ public class OwnershipCacheTest { pulsar = mock(PulsarService.class); config = mock(ServiceConfiguration.class); executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build(); - scheduledExecutor = Executors.newScheduledThreadPool(2); - zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor); + zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor); localCache = spy(new LocalZooKeeperCacheService(zkCache, null)); ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class); when(pulsar.getLocalZkCacheService()).thenReturn(localCache); @@ -106,7 +102,6 @@ public class OwnershipCacheTest { @AfterMethod public void teardown() throws Exception { executor.shutdown(); - scheduledExecutor.shutdown(); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c468298..2cbd642 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -655,7 +655,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return; } - listenerExecutor.submit(() -> { + listenerExecutor.execute(() -> { if (isActive) { consumerEventListener.becameActive(this, partitionIndex); } else { diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java index a6e3cbc..9a4b9aa 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java @@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.common.util.OrderedScheduler; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.common.naming.TopicName; diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java index 30f75be..1371d05 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java @@ -22,8 +22,6 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -37,8 +35,6 @@ import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Connects with ZooKeeper and sets watch to listen changes for active broker list. * @@ -55,8 +51,6 @@ public class ZookeeperCacheLoader implements Closeable { private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8) .name("pulsar-discovery-ordered-cache").build(); - private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(8, - new DefaultThreadFactory("pulsar-discovery-cache")); public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers"; @@ -74,8 +68,7 @@ public class ZookeeperCacheLoader implements Closeable { log.error("Shutting down ZK sessions: {}", exitCode); }); - this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor, - executor); + this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor); localZkConnectionSvc.start(exitCode -> { try { localZkCache.getZooKeeper().close(); @@ -115,7 +108,6 @@ public class ZookeeperCacheLoader implements Closeable { @Override public void close() { orderedExecutor.shutdown(); - executor.shutdown(); } private void updateBrokerList(Set<String> brokerNodes) throws Exception { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java index 0ba1ea6..27d5a9b 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java @@ -22,8 +22,6 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -37,8 +35,6 @@ import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Connects with ZooKeeper and sets watch to listen changes for active broker list. * @@ -56,8 +52,6 @@ public class ZookeeperCacheLoader implements Closeable { private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8) .name("pulsar-discovery-ordered-cache").build(); - private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(8, - new DefaultThreadFactory("pulsar-discovery-cache")); public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers"; @@ -75,8 +69,7 @@ public class ZookeeperCacheLoader implements Closeable { log.error("Shutting down ZK sessions: {}", exitCode); }); - this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor, - executor); + this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor); localZkConnectionSvc.start(exitCode -> { try { localZkCache.getZooKeeper().close(); @@ -121,7 +114,6 @@ public class ZookeeperCacheLoader implements Closeable { @Override public void close() { orderedExecutor.shutdown(); - executor.shutdown(); } private void updateBrokerList(Set<String> brokerNodes) throws Exception { diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java index bd75e82..b59cde5 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java @@ -28,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; @@ -51,8 +51,8 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable { private final ScheduledExecutorService scheduledExecutor; public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis, - String globalZkConnect, OrderedScheduler orderedExecutor, ScheduledExecutorService scheduledExecutor) { - super(null, orderedExecutor, scheduledExecutor); + String globalZkConnect, OrderedExecutor orderedExecutor, ScheduledExecutorService scheduledExecutor) { + super(null, orderedExecutor); this.zlClientFactory = zkClientFactory; this.zkSessionTimeoutMillis = zkSessionTimeoutMillis; this.globalZkConnect = globalZkConnect; diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java index 23547dd..945b58e 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java @@ -18,9 +18,7 @@ */ package org.apache.pulsar.zookeeper; -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -37,9 +35,8 @@ public class LocalZooKeeperCache extends ZooKeeperCache { private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCache.class); - public LocalZooKeeperCache(final ZooKeeper zk, final OrderedScheduler executor, - ScheduledExecutorService scheduledExecutor) { - super(zk, executor, scheduledExecutor); + public LocalZooKeeperCache(final ZooKeeper zk, final OrderedExecutor executor) { + super(zk, executor); } @Override diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index b2715b3..e2e46aa 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -20,6 +20,12 @@ package org.apache.pulsar.zookeeper; import static com.google.common.base.Preconditions.checkNotNull; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Sets; + import java.nio.file.Paths; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; @@ -29,14 +35,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -49,14 +53,6 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.benmanes.caffeine.cache.AsyncLoadingCache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Sets; - -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Per ZK client ZooKeeper cache supporting ZNode data and children list caches. A cache entry is identified, accessed * and invalidated by the ZNode path. For the data cache, ZNode data parsing is done at request time with the given @@ -84,19 +80,18 @@ public abstract class ZooKeeperCache implements Watcher { protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache; protected final Cache<String, Set<String>> childrenCache; protected final Cache<String, Boolean> existsCache; - private final OrderedScheduler executor; - private final ScheduledExecutorService scheduledExecutor; - private boolean shouldShutdownExecutor = false; + private final OrderedExecutor executor; + private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build(); + private boolean shouldShutdownExecutor; public static final int cacheTimeOutInSec = 30; protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null); - public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler executor, ScheduledExecutorService scheduledExecutor) { + public ZooKeeperCache(ZooKeeper zkSession, OrderedExecutor executor) { checkNotNull(executor); - checkNotNull(scheduledExecutor); this.executor = executor; - this.scheduledExecutor = scheduledExecutor; this.zkSession.set(zkSession); + this.shouldShutdownExecutor = false; this.dataCache = Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.HOURS) .buildAsync((key, executor1) -> null); @@ -106,8 +101,7 @@ public abstract class ZooKeeperCache implements Watcher { } public ZooKeeperCache(ZooKeeper zkSession) { - this(zkSession, OrderedScheduler.newSchedulerBuilder().numThreads(1).name("zk-cache-executor").build(), - Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("zk-cache-callback-executor"))); + this(zkSession, OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build()); this.shouldShutdownExecutor = true; } @@ -132,7 +126,7 @@ public abstract class ZooKeeperCache implements Watcher { LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", path, updater); } try { - executor.submitOrdered(path, new SafeRunnable() { + executor.executeOrdered(path, new SafeRunnable() { @Override public void safeRun() { updater.reloadCache(path); @@ -181,7 +175,7 @@ public abstract class ZooKeeperCache implements Watcher { } public void asyncInvalidate(String path) { - scheduledExecutor.submit(() -> invalidate(path)); + backgroundExecutor.execute(() -> invalidate(path)); } public void invalidate(final String path) { @@ -322,20 +316,20 @@ public abstract class ZooKeeperCache implements Watcher { // Broker doesn't restart on global-zk session lost: so handling unexpected exception try { this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> { - Executor exec = scheduledExecutor != null ? scheduledExecutor : executor; if (rc == Code.OK.intValue()) { try { T obj = deserializer.deserialize(path, content); // avoid using the zk-client thread to process the result - exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat))); + executor.execute( + () -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat))); } catch (Exception e) { - exec.execute(() -> zkFuture.completeExceptionally(e)); + executor.execute(() -> zkFuture.completeExceptionally(e)); } } else if (rc == Code.NONODE.intValue()) { // Return null values for missing z-nodes, as this is not "exceptional" condition - exec.execute(() -> zkFuture.complete(null)); + executor.execute(() -> zkFuture.complete(null)); } else { - exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); + executor.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); } }, null); } catch (Exception e) { @@ -430,7 +424,8 @@ public abstract class ZooKeeperCache implements Watcher { public void stop() { if (shouldShutdownExecutor) { this.executor.shutdown(); - this.scheduledExecutor.shutdown(); } + + this.backgroundExecutor.shutdown(); } } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java index c0f5299..d18aea9 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java @@ -23,21 +23,21 @@ import static org.apache.bookkeeper.util.SafeRunnable.safeRun; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import lombok.extern.slf4j.Slf4j; + +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; -import lombok.extern.slf4j.Slf4j; - @Slf4j public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory { - private final OrderedScheduler executor; + private final OrderedExecutor executor; - public ZookeeperBkClientFactoryImpl(OrderedScheduler executor) { + public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) { this.executor = executor; } @@ -45,7 +45,7 @@ public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory { public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType, int zkSessionTimeoutMillis) { CompletableFuture<ZooKeeper> future = new CompletableFuture<>(); - executor.submit(safeRun(() -> { + executor.execute(safeRun(() -> { try { ZooKeeper zk = ZooKeeperClient.newBuilder().connectString(serverList) .sessionTimeoutMs(zkSessionTimeoutMillis) diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index e9bad89..af00abc 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -87,7 +87,7 @@ public class ZookeeperCacheTest { @Test(timeOut = 10000) void testSimpleCache() throws Exception { - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) { @Override public String deserialize(String key, byte[] content) throws Exception { @@ -127,7 +127,7 @@ public class ZookeeperCacheTest { void testChildrenCache() throws Exception { zkClient.create("/test", new byte[0], null, null); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test"); // Create callback counter @@ -180,7 +180,7 @@ public class ZookeeperCacheTest { // Check existence after creation of the node zkClient.create("/test", new byte[0], null, null); Thread.sleep(20); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); boolean exists = zkCacheService.exists("/test"); Assert.assertTrue(exists, "/test should exists in the cache"); @@ -197,7 +197,7 @@ public class ZookeeperCacheTest { zkClient.create("/test/c1", new byte[0], null, null); zkClient.create("/test/c2", new byte[0], null, null); Thread.sleep(20); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); boolean exists = zkCacheService.exists("/test"); Assert.assertTrue(exists, "/test should exists in the cache"); @@ -339,7 +339,7 @@ public class ZookeeperCacheTest { // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle // callback-result process MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) { @Override public String deserialize(String key, byte[] content) throws Exception { @@ -387,7 +387,7 @@ public class ZookeeperCacheTest { // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle // callback-result process MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); final AtomicInteger count = new AtomicInteger(0); ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) { -- To stop receiving notification emails like this one, please contact mme...@apache.org.