ivankelly closed pull request #1192: BP-29 (task 3): use metadata service uri for constructing registration client URL: https://github.com/apache/bookkeeper/pull/1192
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java new file mode 100644 index 000000000..938bf49f2 --- /dev/null +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.common.testing; + +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; +import java.util.Set; + +/** + * Assertion utils. + */ +public final class MoreAsserts { + + private MoreAsserts() {} + + public static <T> void assertSetEquals(Set<T> expected, Set<T> actual) { + SetView<T> diff = Sets.difference(expected, actual); + assertTrue( + "Expected set contains items not exist at actual set : " + diff.immutableCopy(), + diff.isEmpty()); + diff = Sets.difference(actual, expected); + assertTrue( + "Actual set contains items not exist at expected set : " + diff.immutableCopy(), + diff.isEmpty()); + } + +} diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java index 12b3a23af..4942e348b 100644 --- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java @@ -166,12 +166,7 @@ public MockExecutorController controlScheduleAtFixedRate(ScheduledExecutorServic Runnable task = invocationOnMock.getArgument(0); long value = invocationOnMock.getArgument(1); TimeUnit unit = invocationOnMock.getArgument(2); - DeferredTask deferredTask = executor.addDelayedTask(executor, unit.toMillis(value), task); - if (value <= 0) { - task.run(); - FutureUtils.complete(deferredTask.future, null); - } - return deferredTask; + return executor.addDelayedTask(executor, unit.toMillis(value), task); }; } @@ -192,7 +187,12 @@ private DeferredTask addDelayedTask( Runnable task) { checkArgument(delayTimeMs >= 0); DeferredTask deferredTask = new DeferredTask(task, delayTimeMs); - executor.deferredTasks.add(deferredTask); + if (delayTimeMs > 0) { + executor.deferredTasks.add(deferredTask); + } else { + task.run(); + FutureUtils.complete(deferredTask.future, null); + } return deferredTask; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index a94f20638..d6f88e9ec 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -32,6 +32,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; +import java.net.URI; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -58,16 +59,17 @@ import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.discover.RegistrationClient; -import org.apache.bookkeeper.discover.ZKRegistrationClient; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.feature.SettableFeatureProvider; -import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory; import org.apache.bookkeeper.meta.CleanupLedgerManager; import org.apache.bookkeeper.meta.LedgerIdGenerator; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.MetadataClientDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.exceptions.MetadataException; +import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.proto.BookieClient; @@ -102,7 +104,7 @@ static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class); - final RegistrationClient regClient; + final EventLoopGroup eventLoopGroup; // The stats logger for this client. @@ -142,6 +144,7 @@ // Features final Feature disableEnsembleChangeFeature; + final MetadataClientDriver metadataDriver; // Ledger manager responsible for how to store ledger meta data final LedgerManagerFactory ledgerManagerFactory; final LedgerManager ledgerManager; @@ -441,21 +444,21 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo this.disableEnsembleChangeFeature = this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName()); - // initialize registration client + // initialize metadata driver try { - Class<? extends RegistrationClient> regClientCls = conf.getRegistrationClientClass(); - this.regClient = ReflectionUtils.newInstance(regClientCls); - this.regClient.initialize( + this.metadataDriver = MetadataDrivers.getClientDriver( + URI.create(conf.getMetadataServiceUri())); + this.metadataDriver.initialize( conf, scheduler, statsLogger, java.util.Optional.ofNullable(zkc)); } catch (ConfigurationException ce) { - LOG.error("Failed to initialize registration client", ce); - throw new IOException("Failed to initialize registration client", ce); - } catch (BKException be) { - LOG.error("Failed to initialize registration client", be); - throw new IOException("Failed to initialize registration client", be); + LOG.error("Failed to initialize metadata client driver using invalid metadata service uri", ce); + throw new IOException("Failed to initialize metadata client driver", ce); + } catch (MetadataException me) { + LOG.error("Encountered metadata exceptions on initializing metadata client driver", me); + throw new IOException("Failed to initialize metadata client driver", me); } // initialize event loop group @@ -505,7 +508,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool, scheduler, statsLogger); this.bookieWatcher = new BookieWatcher( - conf, this.placementPolicy, regClient, + conf, this.placementPolicy, metadataDriver.getRegistrationClient(), this.statsLogger.scope(WATCHER_SCOPE)); if (conf.getDiskWeightBasedPlacementEnabled()) { LOG.info("Weighted ledger placement enabled"); @@ -525,9 +528,9 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo // initialize ledger manager try { this.ledgerManagerFactory = - AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, regClient.getLayoutManager()); - } catch (IOException | InterruptedException e) { - throw e; + this.metadataDriver.getLedgerManagerFactory(); + } catch (MetadataException e) { + throw new IOException("Failed to initialize ledger manager factory", e); } this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager()); this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator(); @@ -549,7 +552,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo scheduler = null; requestTimer = null; reorderReadSequence = false; - regClient = null; + metadataDriver = null; readSpeculativeRequestPolicy = Optional.absent(); readLACSpeculativeRequestPolicy = Optional.absent(); placementPolicy = null; @@ -680,8 +683,8 @@ boolean isReorderReadSequence() { } @VisibleForTesting - public RegistrationClient getRegClient() { - return regClient; + public MetadataClientDriver getMetadataClientDriver() { + return metadataDriver; } /** @@ -731,7 +734,7 @@ public static DigestType fromApiDigestType(org.apache.bookkeeper.client.api.Dige } ZooKeeper getZkHandle() { - return ((ZKRegistrationClient) regClient).getZk(); + return ((ZKMetadataClientDriver) metadataDriver).getZk(); } protected ClientConfiguration getConf() { @@ -1450,7 +1453,6 @@ public void close() throws BKException, InterruptedException { // which will reject any incoming metadata requests. ledgerManager.close(); ledgerIdGenerator.close(); - ledgerManagerFactory.close(); } catch (IOException ie) { LOG.error("Failed to close ledger manager : ", ie); } @@ -1477,7 +1479,7 @@ public void close() throws BKException, InterruptedException { if (ownEventLoopGroup) { eventLoopGroup.shutdownGracefully(); } - this.regClient.close(); + this.metadataDriver.close(); } private void initOpLoggers(StatsLogger stats) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index dcface641..a71d2b4a1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -228,7 +228,10 @@ public void close() throws InterruptedException, BKException { */ public void watchWritableBookiesChanged(final RegistrationListener listener) throws BKException { - bkc.regClient.watchWritableBookies(listener); + bkc + .getMetadataClientDriver() + .getRegistrationClient() + .watchWritableBookies(listener); } /** @@ -240,7 +243,10 @@ public void watchWritableBookiesChanged(final RegistrationListener listener) */ public void watchReadOnlyBookiesChanged(final RegistrationListener listener) throws BKException { - bkc.regClient.watchReadOnlyBookies(listener); + bkc + .getMetadataClientDriver() + .getRegistrationClient() + .watchReadOnlyBookies(listener); } /** @@ -1153,7 +1159,7 @@ public static boolean format(ServerConfiguration conf, } BookKeeper bkc = new BookKeeper(new ClientConfiguration(conf)); - bkc.ledgerManagerFactory.format(conf, bkc.regClient.getLayoutManager()); + bkc.ledgerManagerFactory.format(conf, bkc.getMetadataClientDriver().getLayoutManager()); return rm.format(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java index 7639703c1..112cd4821 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java @@ -233,7 +233,10 @@ public boolean completeUnlessQueued() { } public void start() { - this.bk.regClient.watchWritableBookies(bookies -> availableBookiesChanged(bookies.getValue())); + this.bk + .getMetadataClientDriver() + .getRegistrationClient() + .watchWritableBookies(bookies -> availableBookiesChanged(bookies.getValue())); scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java index 9c90c333e..68d27ad44 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java @@ -215,6 +215,8 @@ public String getMetadataServiceUri() throws ConfigurationException { ledgerManagerType = org.apache.bookkeeper.meta.FlatLedgerManagerFactory.NAME; } else if (factoryClass == LongHierarchicalLedgerManagerFactory.class) { ledgerManagerType = LongHierarchicalLedgerManagerFactory.NAME; + } else if (factoryClass == org.apache.bookkeeper.meta.MSLedgerManagerFactory.class) { + ledgerManagerType = org.apache.bookkeeper.meta.MSLedgerManagerFactory.NAME; } else { throw new IllegalArgumentException("Unknown zookeeper based ledger manager factory : " + factoryClass); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 51aa699b9..6511f929f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -1718,7 +1718,9 @@ public ClientConfiguration setNettyUsePooledBuffers(boolean enabled) { * * @param regClientClass * ClientClass + * @deprecated since 4.7.0 */ + @Deprecated public ClientConfiguration setRegistrationClientClass( Class<? extends RegistrationClient> regClientClass) { setProperty(REGISTRATION_CLIENT_CLASS, regClientClass); @@ -1729,7 +1731,9 @@ public ClientConfiguration setRegistrationClientClass( * Get Registration Client Class. * * @return registration manager class. + * @deprecated since 4.7.0 */ + @Deprecated public Class<? extends RegistrationClient> getRegistrationClientClass() throws ConfigurationException { return ReflectionUtils.getClass(this, REGISTRATION_CLIENT_CLASS, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java index 7b18325b1..b53fd2405 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java @@ -18,17 +18,11 @@ package org.apache.bookkeeper.discover; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.meta.LayoutManager; import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.versioning.Versioned; /** @@ -47,22 +41,6 @@ } - /** - * Initialize the registration client with provided resources. - * - * <p>The existence of <i>zkSupplier</i> is for backward compatability. - * - * @param conf client configuration - * @param statsLogger stats logger - * @param optionalCtx optional context is passed for initialization. - * @return - */ - RegistrationClient initialize(ClientConfiguration conf, - ScheduledExecutorService scheduler, - StatsLogger statsLogger, - Optional<Object> optionalCtx) - throws BKException; - @Override void close(); @@ -116,11 +94,4 @@ RegistrationClient initialize(ClientConfiguration conf, */ void unwatchReadOnlyBookies(RegistrationListener listener); - /** - * Gets layout manager. - * - * @return the layout manager - */ - LayoutManager getLayoutManager(); - } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java index 237809a08..ff48a08b3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java @@ -18,14 +18,13 @@ package org.apache.bookkeeper.discover; +import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import java.io.IOException; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; @@ -33,33 +32,23 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import lombok.AccessLevel; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BKException.BKInterruptedException; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BKException.ZKException; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.common.util.SafeRunnable; -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.meta.LayoutManager; -import org.apache.bookkeeper.meta.ZkLayoutManager; import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Version.Occurred; import org.apache.bookkeeper.versioning.Versioned; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.ZooKeeperClient; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; /** * ZooKeeper based {@link RegistrationClient}. @@ -67,9 +56,9 @@ @Slf4j public class ZKRegistrationClient implements RegistrationClient { - private static final int ZK_CONNECT_BACKOFF_MS = 200; + static final int ZK_CONNECT_BACKOFF_MS = 200; - private class WatchTask + class WatchTask implements SafeRunnable, Watcher, BiConsumer<Versioned<Set<BookieSocketAddress>>, Throwable>, @@ -133,13 +122,15 @@ public void safeRun() { @Override public void accept(Versioned<Set<BookieSocketAddress>> bookieSet, Throwable throwable) { if (throwable != null) { - scheduleWatchTask(ZK_CONNECT_BACKOFF_MS); - firstRunFuture.completeExceptionally(throwable); + if (firstRunFuture.isDone()) { + scheduleWatchTask(ZK_CONNECT_BACKOFF_MS); + } else { + firstRunFuture.completeExceptionally(throwable); + } return; } - if (this.version.compare(bookieSet.getVersion()) == Occurred.BEFORE - || this.version.compare(bookieSet.getVersion()) == Occurred.CONCURRENTLY) { + if (this.version.compare(bookieSet.getVersion()) == Occurred.BEFORE) { this.version = bookieSet.getVersion(); this.bookies = bookieSet.getValue(); @@ -169,101 +160,45 @@ synchronized boolean isClosed() { @Override public synchronized void close() { - if (!closed) { + if (closed) { return; } + zk.removeWatches( + regPath, + this, + WatcherType.Children, + true, + (rc, path, ctx) -> {}, + null + ); closed = true; } } - private ClientConfiguration conf; - private ZooKeeper zk = null; - // whether the zk handle is one we created, or is owned by whoever - // instantiated us - private boolean ownZKHandle = false; - private ScheduledExecutorService scheduler; + private final ZooKeeper zk; + private final ScheduledExecutorService scheduler; + @Getter(AccessLevel.PACKAGE) private WatchTask watchWritableBookiesTask = null; + @Getter(AccessLevel.PACKAGE) private WatchTask watchReadOnlyBookiesTask = null; // registration paths - private String bookieRegistrationPath; - private String bookieReadonlyRegistrationPath; + private final String bookieRegistrationPath; + private final String bookieReadonlyRegistrationPath; - // layout manager - private List<ACL> acls; - private LayoutManager layoutManager; - - @Override - public RegistrationClient initialize(ClientConfiguration conf, - ScheduledExecutorService scheduler, - StatsLogger statsLogger, - Optional<Object> zkOptional) - throws BKException { - this.conf = conf; + public ZKRegistrationClient(ZooKeeper zk, + String ledgersRootPath, + ScheduledExecutorService scheduler) { + this.zk = zk; this.scheduler = scheduler; - this.bookieRegistrationPath = conf.getZkAvailableBookiesPath(); + this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE; this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY; - - this.acls = ZkUtils.getACLs(conf); - - // construct the zookeeper - if (zkOptional.isPresent() - && zkOptional.get() instanceof ZooKeeper) { - // if an external zookeeper is added, use the zookeeper instance - this.zk = (ZooKeeper) (zkOptional.get()); - this.ownZKHandle = false; - } else { - try { - this.zk = ZooKeeperClient.newBuilder() - .connectString(conf.getZkServers()) - .sessionTimeoutMs(conf.getZkTimeout()) - .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(), - conf.getZkTimeout(), 0)) - .statsLogger(statsLogger) - .build(); - - if (null == zk.exists(bookieReadonlyRegistrationPath, false)) { - try { - zk.create(bookieReadonlyRegistrationPath, - new byte[0], - acls, - CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - // this node is just now created by someone. - } - } - } catch (IOException | KeeperException e) { - log.error("Failed to create zookeeper client to {}", conf.getZkServers(), e); - ZKException zke = new ZKException(); - zke.fillInStackTrace(); - throw zke; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new BKInterruptedException(); - } - this.ownZKHandle = true; - } - - // layout manager - this.layoutManager = new ZkLayoutManager( - zk, - conf.getZkLedgersRootPath(), - acls); - - return this; } @Override public void close() { - if (ownZKHandle && null != zk) { - try { - zk.close(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("Interrupted on closing zookeeper client", e); - } - } + // no-op } public ZooKeeper getZk() { @@ -290,7 +225,7 @@ public ZooKeeper getZk() { return; } - Version version = new LongVersion(stat.getVersion()); + Version version = new LongVersion(stat.getCversion()); Set<BookieSocketAddress> bookies = convertToBookieAddresses(children); future.complete(new Versioned<>(bookies, version)); }, null); @@ -300,9 +235,17 @@ public ZooKeeper getZk() { @Override public synchronized CompletableFuture<Void> watchWritableBookies(RegistrationListener listener) { - CompletableFuture<Void> f = new CompletableFuture<>(); + CompletableFuture<Void> f; if (null == watchWritableBookiesTask) { + f = new CompletableFuture<>(); watchWritableBookiesTask = new WatchTask(bookieRegistrationPath, f); + f = f.whenComplete((value, cause) -> { + if (null != cause) { + unwatchWritableBookies(listener); + } + }); + } else { + f = watchWritableBookiesTask.firstRunFuture; } watchWritableBookiesTask.addListener(listener); @@ -327,9 +270,17 @@ public synchronized void unwatchWritableBookies(RegistrationListener listener) { @Override public synchronized CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener listener) { - CompletableFuture<Void> f = new CompletableFuture<>(); + CompletableFuture<Void> f; if (null == watchReadOnlyBookiesTask) { + f = new CompletableFuture<>(); watchReadOnlyBookiesTask = new WatchTask(bookieReadonlyRegistrationPath, f); + f = f.whenComplete((value, cause) -> { + if (null != cause) { + unwatchReadOnlyBookies(listener); + } + }); + } else { + f = watchReadOnlyBookiesTask.firstRunFuture; } watchReadOnlyBookiesTask.addListener(listener); @@ -371,14 +322,5 @@ public synchronized void unwatchReadOnlyBookies(RegistrationListener listener) { } return newBookieAddrs; } - @VisibleForTesting - public void setLayoutManager(LayoutManager layoutManager) { - this.layoutManager = layoutManager; - } - - @Override - public LayoutManager getLayoutManager() { - return layoutManager; - } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java index e1c5c0c81..7f2c2d96c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java @@ -33,7 +33,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -50,7 +49,6 @@ import org.apache.bookkeeper.client.BKException.BKInterruptedException; import org.apache.bookkeeper.client.BKException.MetaStoreException; import org.apache.bookkeeper.common.concurrent.FutureUtils; -import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory; import org.apache.bookkeeper.meta.LayoutManager; @@ -58,7 +56,6 @@ import org.apache.bookkeeper.meta.ZkLayoutManager; import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.ZkUtils; @@ -551,8 +548,11 @@ public boolean nukeExistingCluster() throws Exception { String availableBookiesPath = conf.getZkAvailableBookiesPath(); boolean availableNodeExists = null != zk.exists(availableBookiesPath, false); - try (RegistrationClient regClient = new ZKRegistrationClient()) { - regClient.initialize(new ClientConfiguration(conf), null, NullStatsLogger.INSTANCE, Optional.empty()); + try (RegistrationClient regClient = new ZKRegistrationClient( + zk, + zkLedgersRootPath, + null + )) { if (availableNodeExists) { Collection<BookieSocketAddress> rwBookies = FutureUtils .result(regClient.getWritableBookies(), EXCEPTION_FUNC).getValue(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java index a175cc4f8..c2ff956d5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java @@ -88,6 +88,7 @@ private static final int MS_CONNECT_BACKOFF_MS = 200; public static final int CUR_VERSION = 1; + public static final String NAME = "ms"; public static final String TABLE_NAME = "LEDGER"; public static final String META_FIELD = ".META"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java index 2fd302c09..b53836790 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java @@ -64,8 +64,7 @@ MetadataClientDriver initialize(ClientConfiguration conf, * * @return the registration client used for discovering registered bookies. */ - RegistrationClient getRegistrationClient() - throws MetadataException; + RegistrationClient getRegistrationClient(); /** * Return the ledger manager factory used for accessing ledger metadata. @@ -75,6 +74,13 @@ RegistrationClient getRegistrationClient() LedgerManagerFactory getLedgerManagerFactory() throws MetadataException; + /** + * Return the layout manager. + * + * @return the layout manager. + */ + LayoutManager getLayoutManager(); + @Override void close(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java index 8a657f26f..d124700de 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java @@ -167,19 +167,30 @@ private static void loadInitialBookieDrivers() { */ public static void registerClientDriver(String metadataBackendScheme, Class<? extends MetadataClientDriver> driver) { + registerClientDriver(metadataBackendScheme, driver, false); + } + + @VisibleForTesting + public static void registerClientDriver(String metadataBackendScheme, + Class<? extends MetadataClientDriver> driver, + boolean allowOverride) { if (!initialized) { initialize(); } String scheme = metadataBackendScheme.toLowerCase(); MetadataClientDriverInfo oldDriverInfo = clientDrivers.get(scheme); - if (null != oldDriverInfo) { + if (null != oldDriverInfo && !allowOverride) { return; } MetadataClientDriverInfo newDriverInfo = new MetadataClientDriverInfo(driver); oldDriverInfo = clientDrivers.putIfAbsent(scheme, newDriverInfo); if (null != oldDriverInfo) { log.debug("Metadata client driver for {} is already there.", scheme); + if (allowOverride) { + log.debug("Overriding client driver for {}", scheme); + clientDrivers.put(scheme, newDriverInfo); + } } } @@ -191,19 +202,30 @@ public static void registerClientDriver(String metadataBackendScheme, */ public static void registerBookieDriver(String metadataBackendScheme, Class<? extends MetadataBookieDriver> driver) { + registerBookieDriver(metadataBackendScheme, driver, false); + } + + @VisibleForTesting + public static void registerBookieDriver(String metadataBackendScheme, + Class<? extends MetadataBookieDriver> driver, + boolean allowOverride) { if (!initialized) { initialize(); } String scheme = metadataBackendScheme.toLowerCase(); MetadataBookieDriverInfo oldDriverInfo = bookieDrivers.get(scheme); - if (null != oldDriverInfo) { + if (null != oldDriverInfo && !allowOverride) { return; } MetadataBookieDriverInfo newDriverInfo = new MetadataBookieDriverInfo(driver); oldDriverInfo = bookieDrivers.putIfAbsent(scheme, newDriverInfo); if (null != oldDriverInfo) { log.debug("Metadata bookie driver for {} is already there.", scheme); + if (allowOverride) { + log.debug("Overriding bookie driver for {}", scheme); + bookieDrivers.put(scheme, newDriverInfo); + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java index 3fec50cb7..9141c6ced 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java @@ -21,13 +21,11 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.discover.RegistrationClient; import org.apache.bookkeeper.discover.ZKRegistrationClient; import org.apache.bookkeeper.meta.MetadataClientDriver; import org.apache.bookkeeper.meta.MetadataDrivers; -import org.apache.bookkeeper.meta.exceptions.Code; import org.apache.bookkeeper.meta.exceptions.MetadataException; import org.apache.bookkeeper.stats.StatsLogger; @@ -66,21 +64,12 @@ public synchronized MetadataClientDriver initialize(ClientConfiguration conf, } @Override - public synchronized RegistrationClient getRegistrationClient() throws MetadataException { + public synchronized RegistrationClient getRegistrationClient() { if (null == regClient) { - regClient = new ZKRegistrationClient(); - try { - regClient.initialize( - clientConf, - scheduler, - statsLogger, - Optional.of(zk)); - } catch (BKException e) { - throw new MetadataException( - Code.METADATA_SERVICE_ERROR, - "Failed to initialize registration client", - e); - } + regClient = new ZKRegistrationClient( + zk, + ledgersRootPath, + scheduler); } return regClient; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java index f3946225d..98a018d28 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java @@ -21,11 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY; +import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; import java.io.IOException; import java.net.URI; import java.util.List; import java.util.Optional; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.conf.AbstractConfiguration; @@ -80,6 +83,9 @@ protected static String getZKServersFromServiceUri(URI uri) { case LongHierarchicalLedgerManagerFactory.NAME: ledgerManagerFactoryClass = LongHierarchicalLedgerManagerFactory.class; break; + case org.apache.bookkeeper.meta.MSLedgerManagerFactory.NAME: + ledgerManagerFactoryClass = org.apache.bookkeeper.meta.MSLedgerManagerFactory.class; + break; default: throw new IllegalArgumentException("Unknown ledger manager type found '" + schemeParts[1] + "' at uri : " + metadataServiceUri); @@ -97,6 +103,7 @@ protected static String getZKServersFromServiceUri(URI uri) { // zookeeper related variables protected List<ACL> acls; + @Getter protected ZooKeeper zk = null; // whether the zk handle is one we created, or is owned by whoever // instantiated us @@ -120,7 +127,7 @@ protected void initialize(AbstractConfiguration<?> conf, Optional<Object> optionalCtx) throws MetadataException { this.conf = conf; - String metadataServiceUriStr; + final String metadataServiceUriStr; try { metadataServiceUriStr = conf.getMetadataServiceUri(); } catch (ConfigurationException e) { @@ -128,15 +135,19 @@ protected void initialize(AbstractConfiguration<?> conf, throw new MetadataException( Code.INVALID_METADATA_SERVICE_URI, e); } + this.metadataServiceUri = URI.create(metadataServiceUriStr); ledgerManagerFactoryClass = resolveLedgerManagerFactory(metadataServiceUri); // get the initialize root path this.ledgersRootPath = metadataServiceUri.getPath(); - String bookieReadonlyRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE; + final String bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE; + final String bookieReadonlyRegistrationPath = bookieRegistrationPath + "/" + READONLY; // construct the zookeeper - String zkServers = getZKServersFromServiceUri(metadataServiceUri); + final String zkServers = getZKServersFromServiceUri(metadataServiceUri); + log.info("Initialize zookeeper metadata driver at metadata service uri {} :" + + " zkServers = {}, ledgersRootPath = {}.", metadataServiceUriStr, zkServers, ledgersRootPath); this.acls = ZkUtils.getACLs(conf); if (optionalCtx.isPresent() && optionalCtx.get() instanceof ZooKeeper) { @@ -156,7 +167,7 @@ protected void initialize(AbstractConfiguration<?> conf, if (null == zk.exists(bookieReadonlyRegistrationPath, false)) { try { zk.create(bookieReadonlyRegistrationPath, - new byte[0], + EMPTY_BYTE_ARRAY, acls, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException e) { @@ -167,7 +178,8 @@ protected void initialize(AbstractConfiguration<?> conf, log.error("Failed to create zookeeper client to {}", zkServers, e); MetadataException me = new MetadataException( Code.METADATA_SERVICE_ERROR, - "Failed to create zookeeper client to " + zkServers); + "Failed to create zookeeper client to " + zkServers, + e); me.fillInStackTrace(); throw me; } @@ -181,6 +193,10 @@ protected void initialize(AbstractConfiguration<?> conf, acls); } + public LayoutManager getLayoutManager() { + return layoutManager; + } + @SneakyThrows public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 75b29bad0..0c22a49f8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -141,7 +141,7 @@ private void initialize(ServerConfiguration conf, ZooKeeper zkc) LedgerManagerFactory ledgerManagerFactory = AbstractZkLedgerManagerFactory .newLedgerManagerFactory( conf, - bkc.getRegClient().getLayoutManager()); + bkc.getMetadataClientDriver().getLayoutManager()); ledgerManager = ledgerManagerFactory.newLedgerManager(); this.bookieLedgerIndexer = new BookieLedgerIndexer(ledgerManager); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 8969aa69b..276ed7120 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -135,7 +135,7 @@ public ReplicationWorker(final ZooKeeper zkc, LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory .newLedgerManagerFactory( this.conf, - bkc.getRegClient().getLayoutManager()); + bkc.getMetadataClientDriver().getLayoutManager()); this.underreplicationManager = mFactory .newLedgerUnderreplicationManager(); this.admin = new BookKeeperAdmin(bkc, statsLogger); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommand.java index 1ebfd88c5..b8f93a517 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommand.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.tools.cli.helpers; +import java.net.URI; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -26,8 +27,9 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.discover.RegistrationClient; +import org.apache.bookkeeper.meta.MetadataClientDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.util.ReflectionUtils; /** * This is a mixin for commands that talks to discovery service. @@ -37,21 +39,16 @@ @Override public void run(ServerConfiguration conf) throws Exception { - // cast the server configuration to a client configuration object. - ClientConfiguration clientConf = new ClientConfiguration(conf); - run(clientConf); - } - - protected void run(ClientConfiguration conf) throws Exception { - Class<? extends RegistrationClient> regClientCls = conf.getRegistrationClientClass(); + URI metadataServiceUri = URI.create(conf.getMetadataServiceUri()); @Cleanup("shutdown") ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - try (RegistrationClient regClient = ReflectionUtils.newInstance(regClientCls)) { - regClient.initialize( - conf, + try (MetadataClientDriver driver = MetadataDrivers.getClientDriver(metadataServiceUri)) { + ClientConfiguration clientConf = new ClientConfiguration(conf); + driver.initialize( + clientConf, executor, NullStatsLogger.INSTANCE, Optional.empty()); - run(regClient); + run(driver.getRegistrationClient()); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java index 4981159c0..253f0d909 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -83,7 +84,7 @@ public BookieClient getBookieClient() { * in the other set before completing. */ private Future<?> waitForBookieInSet(BookieSocketAddress b, - boolean writable) { + boolean writable) throws Exception { log.info("Wait for {} to become {}", b, writable ? "writable" : "readonly"); @@ -103,9 +104,32 @@ public BookieClient getBookieClient() { } }; - regClient.watchWritableBookies(writableListener); - regClient.watchReadOnlyBookies(readOnlyListener); - return CompletableFuture.allOf(writableFuture, readOnlyFuture); + getMetadataClientDriver().getRegistrationClient().watchWritableBookies(writableListener); + getMetadataClientDriver().getRegistrationClient().watchReadOnlyBookies(readOnlyListener); + + if (writable) { + return writableFuture + .thenCompose(ignored -> getMetadataClientDriver().getRegistrationClient().getReadOnlyBookies()) + .thenCompose(readonlyBookies -> { + if (readonlyBookies.getValue().contains(b)) { + // if the bookie still shows up at readonly path, wait for it to disappear + return readOnlyFuture; + } else { + return FutureUtils.Void(); + } + }); + } else { + return readOnlyFuture + .thenCompose(ignored -> getMetadataClientDriver().getRegistrationClient().getWritableBookies()) + .thenCompose(writableBookies -> { + if (writableBookies.getValue().contains(b)) { + // if the bookie still shows up at writable path, wait for it to disappear + return writableFuture; + } else { + return FutureUtils.Void(); + } + }); + } } public TestStatsProvider getTestStatsProvider() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java index 238992998..76a518021 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java @@ -46,6 +46,12 @@ import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.exceptions.Code; +import org.apache.bookkeeper.meta.exceptions.MetadataException; +import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver; +import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -58,6 +64,7 @@ import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.KeeperException; +import org.junit.After; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,15 +163,71 @@ public LedgerManager newLedgerManager() { } } + static class TestMetadataClientDriver extends ZKMetadataClientDriver { + + @Override + public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException { + if (null == lmFactory) { + try { + lmFactory = new TestLedgerManagerFactory() + .initialize(conf, layoutManager, TestLedgerManagerFactory.CUR_VERSION); + } catch (IOException e) { + throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); + } + } + return lmFactory; + } + } + + static class TestMetadataBookieDriver extends ZKMetadataBookieDriver { + + @Override + public synchronized LedgerManagerFactory getLedgerManagerFactory() throws MetadataException { + if (null == lmFactory) { + try { + lmFactory = new TestLedgerManagerFactory() + .initialize(conf, layoutManager, TestLedgerManagerFactory.CUR_VERSION); + } catch (IOException e) { + throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); + } + } + return lmFactory; + } + } + final DigestType digestType; - public ParallelLedgerRecoveryTest() { + public ParallelLedgerRecoveryTest() throws Exception { super(3); + + this.digestType = DigestType.CRC32; + } + + @Override + protected void startBKCluster() throws Exception { + MetadataDrivers.registerClientDriver("zk", TestMetadataClientDriver.class, true); + MetadataDrivers.registerBookieDriver("zk", TestMetadataBookieDriver.class, true); + baseConf.setMetadataServiceUri( + "zk://" + zkUtil.getZooKeeperConnectString() + baseConf.getZkLedgersRootPath()); baseConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class); + baseClientConf.setMetadataServiceUri( + "zk://" + zkUtil.getZooKeeperConnectString() + baseConf.getZkLedgersRootPath()); baseClientConf.setLedgerManagerFactoryClass(TestLedgerManagerFactory.class); baseClientConf.setReadEntryTimeout(60000); baseClientConf.setAddEntryTimeout(60000); - this.digestType = DigestType.CRC32; + + super.startBKCluster(); + } + + @After + @Override + public void tearDown() throws Exception { + try { + super.tearDown(); + } finally { + MetadataDrivers.registerClientDriver("zk", ZKMetadataClientDriver.class, true); + MetadataDrivers.registerBookieDriver("zk", ZKMetadataBookieDriver.class, true); + } } @Test diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java index 70b9cb3f2..0d25ccce7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java @@ -39,6 +39,8 @@ "zk+flat://127.0.0.1/path/to/ledgers"; private static final String LONGHIERARCHICAL_METADATA_SERVICE_URI = "zk+longhierarchical://127.0.0.1/path/to/ledgers"; + private static final String MS_METADATA_SERVICE_URI = + "zk+ms://127.0.0.1/path/to/ledgers"; private AbstractConfiguration conf; @@ -95,6 +97,16 @@ public void testLongHierarchicalLedgerManagerUri() throws Exception { conf.getMetadataServiceUri()); } + @SuppressWarnings({ "unchecked", "deprecation" }) + @Test + public void testMsLedgerManagerUri() throws Exception { + conf.setLedgerManagerFactoryClass( + org.apache.bookkeeper.meta.MSLedgerManagerFactory.class); + assertEquals( + MS_METADATA_SERVICE_URI, + conf.getMetadataServiceUri()); + } + @SuppressWarnings({ "unchecked" }) @Test(expected = IllegalArgumentException.class) public void testUnknownZkLedgerManagerFactory() throws Exception { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java index 385d8028b..99336b478 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java @@ -18,56 +18,521 @@ */ package org.apache.bookkeeper.discover; +import static org.apache.bookkeeper.common.concurrent.FutureUtils.collect; +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.apache.bookkeeper.common.testing.MoreAsserts.assertSetEquals; +import static org.apache.bookkeeper.discover.ZKRegistrationClient.ZK_CONNECT_BACKOFF_MS; +import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.apache.bookkeeper.meta.LayoutManager; -import org.apache.bookkeeper.meta.LedgerLayout; +import com.google.common.collect.Lists; +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException.ZKException; +import org.apache.bookkeeper.common.testing.executors.MockExecutorController; +import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener; +import org.apache.bookkeeper.discover.ZKRegistrationClient.WatchTask; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.versioning.LongVersion; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase; +import org.apache.zookeeper.AsyncCallback.Children2Callback; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.Watcher.WatcherType; +import org.apache.zookeeper.data.Stat; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; /** * Unit test of {@link RegistrationClient}. */ -public class TestZkRegistrationClient { - private final LedgerLayout ledgerLayout; - private final LayoutManager layoutManager; - private final ZKRegistrationClient zkRegistrationClient; +@RunWith(PowerMockRunner.class) +@PrepareForTest({ ZKRegistrationClient.class, ZkUtils.class }) +@Slf4j +public class TestZkRegistrationClient extends MockZooKeeperTestCase { - public TestZkRegistrationClient() { - this.ledgerLayout = mock(LedgerLayout.class); - this.layoutManager = mock(LayoutManager.class); - this.zkRegistrationClient = new ZKRegistrationClient(); - zkRegistrationClient.setLayoutManager(layoutManager); + @Rule + public final TestName runtime = new TestName(); + + private String ledgersPath; + private String regPath; + private String regReadonlyPath; + private ZKRegistrationClient zkRegistrationClient; + private ScheduledExecutorService mockExecutor; + private MockExecutorController controller; + + @Before + public void setup() throws Exception { + super.setup(); + + this.ledgersPath = "/" + runtime.getMethodName(); + this.regPath = ledgersPath + "/" + AVAILABLE_NODE; + this.regReadonlyPath = regPath + "/" + READONLY; + this.mockExecutor = mock(ScheduledExecutorService.class); + this.controller = new MockExecutorController() + .controlExecute(mockExecutor) + .controlSubmit(mockExecutor) + .controlSchedule(mockExecutor) + .controlScheduleAtFixedRate(mockExecutor, 10); + this.zkRegistrationClient = new ZKRegistrationClient( + mockZk, + ledgersPath, + mockExecutor + ); + } + + @After + public void teardown() { + if (null != zkRegistrationClient) { + zkRegistrationClient.close(); + } + } + + private static Set<BookieSocketAddress> prepareNBookies(int num) { + Set<BookieSocketAddress> bookies = new HashSet<>(); + for (int i = 0; i < num; i++) { + bookies.add(new BookieSocketAddress("127.0.0.1", 3181 + i)); + } + return bookies; + } + + @Test + public void testGetWritableBookies() throws Exception { + Set<BookieSocketAddress> addresses = prepareNBookies(10); + List<String> children = Lists.newArrayList(); + for (BookieSocketAddress address : addresses) { + children.add(address.toString()); + } + Stat stat = mock(Stat.class); + when(stat.getCversion()).thenReturn(1234); + mockGetChildren( + regPath, false, + Code.OK.intValue(), children, stat); + + Versioned<Set<BookieSocketAddress>> result = + result(zkRegistrationClient.getWritableBookies()); + + assertEquals(new LongVersion(1234), result.getVersion()); + assertSetEquals( + addresses, result.getValue()); } @Test - public void testGetLayoutManager() throws Exception { - assertEquals(layoutManager, zkRegistrationClient.getLayoutManager()); + public void testGetReadOnlyBookies() throws Exception { + Set<BookieSocketAddress> addresses = prepareNBookies(10); + List<String> children = Lists.newArrayList(); + for (BookieSocketAddress address : addresses) { + children.add(address.toString()); + } + Stat stat = mock(Stat.class); + when(stat.getCversion()).thenReturn(1234); + mockGetChildren( + regReadonlyPath, false, + Code.OK.intValue(), children, stat); + + Versioned<Set<BookieSocketAddress>> result = + result(zkRegistrationClient.getReadOnlyBookies()); + + assertEquals(new LongVersion(1234), result.getVersion()); + assertSetEquals( + addresses, result.getValue()); } @Test - public void testReadLedgerLayout() throws Exception { - when(layoutManager.readLedgerLayout()).thenReturn(ledgerLayout); - assertEquals(ledgerLayout, zkRegistrationClient.getLayoutManager().readLedgerLayout()); + public void testGetWritableBookiesFailure() throws Exception { + mockGetChildren( + regPath, false, + Code.NONODE.intValue(), null, null); + + try { + result(zkRegistrationClient.getWritableBookies()); + fail("Should fail to get writable bookies"); + } catch (ZKException zke) { + // expected to throw zookeeper exception + } } @Test - public void testStoreLedgerLayout() throws Exception { - zkRegistrationClient.getLayoutManager().storeLedgerLayout(ledgerLayout); + public void testGetReadOnlyBookiesFailure() throws Exception { + mockGetChildren( + regReadonlyPath, false, + Code.NONODE.intValue(), null, null); - verify(layoutManager, times(1)) - .storeLedgerLayout(eq(ledgerLayout)); + try { + result(zkRegistrationClient.getReadOnlyBookies()); + fail("Should fail to get writable bookies"); + } catch (ZKException zke) { + // expected to throw zookeeper exception + } } @Test - public void testDeleteLedgerLayout() throws Exception { - zkRegistrationClient.getLayoutManager().deleteLedgerLayout(); + public void testWatchWritableBookiesSuccess() throws Exception { + testWatchBookiesSuccess(true); + } - verify(layoutManager, times(1)) - .deleteLedgerLayout(); + @Test + public void testWatchReadonlyBookiesSuccess() throws Exception { + testWatchBookiesSuccess(false); } + + @SuppressWarnings("unchecked") + private void testWatchBookiesSuccess(boolean isWritable) + throws Exception { + // + // 1. test watch bookies with a listener + // + + LinkedBlockingQueue<Versioned<Set<BookieSocketAddress>>> updates = + spy(new LinkedBlockingQueue<>()); + RegistrationListener listener = bookies -> { + try { + updates.put(bookies); + } catch (InterruptedException e) { + log.warn("Interrupted on enqueue bookie updates", e); + } + }; + + Set<BookieSocketAddress> addresses = prepareNBookies(10); + List<String> children = Lists.newArrayList(); + for (BookieSocketAddress address : addresses) { + children.add(address.toString()); + } + Stat stat = mock(Stat.class); + when(stat.getCversion()).thenReturn(1234); + + mockGetChildren( + isWritable ? regPath : regReadonlyPath, + true, + Code.OK.intValue(), children, stat); + + if (isWritable) { + result(zkRegistrationClient.watchWritableBookies(listener)); + } else { + result(zkRegistrationClient.watchReadOnlyBookies(listener)); + } + + Versioned<Set<BookieSocketAddress>> update = updates.take(); + verify(updates, times(1)).put(any(Versioned.class)); + assertEquals(new LongVersion(1234), update.getVersion()); + assertSetEquals( + addresses, update.getValue()); + + verify(mockZk, times(1)) + .getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any()); + + // + // 2. test watch bookies with a second listener. the second listener returns cached bookies + // without calling `getChildren` again + // + + // register another listener + LinkedBlockingQueue<Versioned<Set<BookieSocketAddress>>> secondUpdates = + spy(new LinkedBlockingQueue<>()); + RegistrationListener secondListener = bookies -> { + try { + secondUpdates.put(bookies); + } catch (InterruptedException e) { + log.warn("Interrupted on enqueue bookie updates", e); + } + }; + if (isWritable) { + result(zkRegistrationClient.watchWritableBookies(secondListener)); + } else { + result(zkRegistrationClient.watchReadOnlyBookies(secondListener)); + } + Versioned<Set<BookieSocketAddress>> secondListenerUpdate = secondUpdates.take(); + // first listener will not be notified with any update + verify(updates, times(1)).put(any(Versioned.class)); + // second listener will receive same update as the first listener received before + verify(secondUpdates, times(1)).put(any(Versioned.class)); + assertSame(update.getVersion(), secondListenerUpdate.getVersion()); + assertSame(update.getValue(), secondListenerUpdate.getValue()); + + // the second listener will return the cached value without issuing another getChildren call + verify(mockZk, times(1)) + .getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any()); + + // + // 3. simulate session expire, it will trigger watcher to refetch bookies again. + // but since there is no updates on bookies, the registered listeners will not be notified. + // + + notifyWatchedEvent( + EventType.None, + KeeperState.Expired, + isWritable ? regPath : regReadonlyPath); + + // if session expires, the watcher task will get into backoff state + controller.advance(Duration.ofMillis(ZK_CONNECT_BACKOFF_MS)); + + // the same updates returns, the getChildren calls increase to 2 + // but since there is no updates, so no notification is sent. + verify(mockZk, times(2)) + .getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any()); + assertNull(updates.poll()); + // both listener and secondListener will not receive any old update + verify(updates, times(1)).put(any(Versioned.class)); + verify(secondUpdates, times(1)).put(any(Versioned.class)); + + // + // 4. notify with new bookies. both listeners will be notified with new bookies. + // + + Set<BookieSocketAddress> newAddresses = prepareNBookies(20); + List<String> newChildren = Lists.newArrayList(); + for (BookieSocketAddress address : newAddresses) { + newChildren.add(address.toString()); + } + Stat newStat = mock(Stat.class); + when(newStat.getCversion()).thenReturn(1235); + + mockGetChildren( + isWritable ? regPath : regReadonlyPath, + true, + Code.OK.intValue(), newChildren, newStat); + + // trigger watcher + notifyWatchedEvent( + EventType.NodeChildrenChanged, + KeeperState.SyncConnected, + isWritable ? regPath : regReadonlyPath); + + update = updates.take(); + assertEquals(new LongVersion(1235), update.getVersion()); + assertSetEquals( + newAddresses, update.getValue()); + secondListenerUpdate = secondUpdates.take(); + assertSame(update.getVersion(), secondListenerUpdate.getVersion()); + assertSame(update.getValue(), secondListenerUpdate.getValue()); + + verify(mockZk, times(3)) + .getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any()); + verify(updates, times(2)).put(any(Versioned.class)); + verify(secondUpdates, times(2)).put(any(Versioned.class)); + + // + // 5. unwatch the second listener and notify with new bookies again. only first listener will + // be notified with new bookies. + // + + newAddresses = prepareNBookies(25); + newChildren.clear(); + newChildren = Lists.newArrayList(); + for (BookieSocketAddress address : newAddresses) { + newChildren.add(address.toString()); + } + newStat = mock(Stat.class); + when(newStat.getCversion()).thenReturn(1236); + + mockGetChildren( + isWritable ? regPath : regReadonlyPath, + true, + Code.OK.intValue(), newChildren, newStat); + + if (isWritable) { + assertEquals(2, zkRegistrationClient.getWatchWritableBookiesTask().getNumListeners()); + zkRegistrationClient.unwatchWritableBookies(secondListener); + assertEquals(1, zkRegistrationClient.getWatchWritableBookiesTask().getNumListeners()); + } else { + assertEquals(2, zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners()); + zkRegistrationClient.unwatchReadOnlyBookies(secondListener); + assertEquals(1, zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners()); + } + // the watch task will not be closed since there is still a listener + verify(mockZk, times(0)) + .removeWatches( + eq(isWritable ? regPath : regReadonlyPath), + same(isWritable ? zkRegistrationClient.getWatchWritableBookiesTask() + : zkRegistrationClient.getWatchReadOnlyBookiesTask()), + eq(WatcherType.Children), + eq(true), + any(VoidCallback.class), + any() + ); + + // trigger watcher + notifyWatchedEvent( + EventType.NodeChildrenChanged, + KeeperState.SyncConnected, + isWritable ? regPath : regReadonlyPath); + + update = updates.take(); + assertEquals(new LongVersion(1236), update.getVersion()); + assertSetEquals( + newAddresses, update.getValue()); + secondListenerUpdate = secondUpdates.poll(); + assertNull(secondListenerUpdate); + + verify(mockZk, times(4)) + .getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any()); + verify(updates, times(3)).put(any(Versioned.class)); + verify(secondUpdates, times(2)).put(any(Versioned.class)); + + // + // 6. unwatch the first listener. the watch task will be closed and zk watcher will be removed. + // + // + + WatchTask expectedWatcher; + if (isWritable) { + expectedWatcher = zkRegistrationClient.getWatchWritableBookiesTask(); + assertFalse(expectedWatcher.isClosed()); + zkRegistrationClient.unwatchWritableBookies(listener); + assertNull(zkRegistrationClient.getWatchWritableBookiesTask()); + } else { + expectedWatcher = zkRegistrationClient.getWatchReadOnlyBookiesTask(); + assertFalse(expectedWatcher.isClosed()); + zkRegistrationClient.unwatchReadOnlyBookies(listener); + assertNull(zkRegistrationClient.getWatchReadOnlyBookiesTask()); + } + // the watch task will not be closed since there is still a listener + assertTrue(expectedWatcher.isClosed()); + verify(mockZk, times(1)) + .removeWatches( + eq(isWritable ? regPath : regReadonlyPath), + same(expectedWatcher), + eq(WatcherType.Children), + eq(true), + any(VoidCallback.class), + any() + ); + } + + @Test + public void testWatchWritableBookiesTwice() throws Exception { + testWatchBookiesTwice(true); + } + + @Test + public void testWatchReadonlyBookiesTwice() throws Exception { + testWatchBookiesTwice(false); + } + + private void testWatchBookiesTwice(boolean isWritable) + throws Exception { + int zkCallbackDelayMs = 100; + + Set<BookieSocketAddress> addresses = prepareNBookies(10); + List<String> children = Lists.newArrayList(); + for (BookieSocketAddress address : addresses) { + children.add(address.toString()); + } + Stat stat = mock(Stat.class); + when(stat.getCversion()).thenReturn(1234); + + mockGetChildren( + isWritable ? regPath : regReadonlyPath, + true, + Code.OK.intValue(), children, stat, zkCallbackDelayMs); + + CompletableFuture<Versioned<Set<BookieSocketAddress>>> firstResult = new CompletableFuture<>(); + RegistrationListener firstListener = bookies -> firstResult.complete(bookies); + + CompletableFuture<Versioned<Set<BookieSocketAddress>>> secondResult = new CompletableFuture<>(); + RegistrationListener secondListener = bookies -> secondResult.complete(bookies); + + List<CompletableFuture<Void>> watchFutures = Lists.newArrayListWithExpectedSize(2); + if (isWritable) { + watchFutures.add(zkRegistrationClient.watchWritableBookies(firstListener)); + watchFutures.add(zkRegistrationClient.watchWritableBookies(secondListener)); + } else { + watchFutures.add(zkRegistrationClient.watchReadOnlyBookies(firstListener)); + watchFutures.add(zkRegistrationClient.watchReadOnlyBookies(secondListener)); + } + + // trigger zkCallbackExecutor to execute getChildren callback + zkCallbackController.advance(Duration.ofMillis(zkCallbackDelayMs)); + + result(collect(watchFutures)); + assertEquals(firstResult.get().getVersion(), secondResult.get().getVersion()); + assertSetEquals(firstResult.get().getValue(), secondResult.get().getValue()); + } + + @Test + public void testWatchWritableBookiesFailure() throws Exception { + testWatchBookiesFailure(true); + } + + @Test + public void testWatchReadonlyBookiesFailure() throws Exception { + testWatchBookiesFailure(false); + } + + private void testWatchBookiesFailure(boolean isWritable) + throws Exception { + int zkCallbackDelayMs = 100; + + mockGetChildren( + isWritable ? regPath : regReadonlyPath, + true, + Code.NONODE.intValue(), null, null, zkCallbackDelayMs); + + CompletableFuture<Versioned<Set<BookieSocketAddress>>> listenerResult = new CompletableFuture<>(); + RegistrationListener listener = bookies -> listenerResult.complete(bookies); + + CompletableFuture<Void> watchFuture; + + WatchTask watchTask; + if (isWritable) { + watchFuture = zkRegistrationClient.watchWritableBookies(listener); + watchTask = zkRegistrationClient.getWatchWritableBookiesTask(); + } else { + watchFuture = zkRegistrationClient.watchReadOnlyBookies(listener); + watchTask = zkRegistrationClient.getWatchReadOnlyBookiesTask(); + } + assertNotNull(watchTask); + assertEquals(1, watchTask.getNumListeners()); + + // trigger zkCallbackExecutor to execute getChildren callback + zkCallbackController.advance(Duration.ofMillis(zkCallbackDelayMs)); + + try { + result(watchFuture); + fail("Should fail to watch writable bookies if reg path doesn't exist"); + } catch (ZKException zke) { + // expected + } + assertEquals(0, watchTask.getNumListeners()); + assertTrue(watchTask.isClosed()); + if (isWritable) { + assertNull(zkRegistrationClient.getWatchWritableBookiesTask()); + } else { + assertNull(zkRegistrationClient.getWatchReadOnlyBookiesTask()); + } + } + } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index deecc197a..7a28079b8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -95,7 +95,6 @@ public LedgerIdGenerator getLedgerIdGenerator() throws IOException { return Arrays.asList(new Object[][] { { FlatLedgerManagerFactory.class }, { HierarchicalLedgerManagerFactory.class }, - { LegacyHierarchicalLedgerManagerFactory.class }, { LongHierarchicalLedgerManagerFactory.class }, { MSLedgerManagerFactory.class }, }); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java index bd89fb3df..90207d88c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java @@ -67,7 +67,7 @@ public String getScheme() { } @Override - public RegistrationClient getRegistrationClient() throws MetadataException { + public RegistrationClient getRegistrationClient() { return mock(RegistrationClient.class); } @@ -76,6 +76,11 @@ public LedgerManagerFactory getLedgerManagerFactory() throws MetadataException { return mock(LedgerManagerFactory.class); } + @Override + public LayoutManager getLayoutManager() { + return mock(LayoutManager.class); + } + @Override public void close() { } @@ -97,7 +102,7 @@ public String getScheme() { } @Override - public RegistrationClient getRegistrationClient() throws MetadataException { + public RegistrationClient getRegistrationClient() { return mock(RegistrationClient.class); } @@ -106,6 +111,11 @@ public LedgerManagerFactory getLedgerManagerFactory() throws MetadataException { return mock(LedgerManagerFactory.class); } + @Override + public LayoutManager getLayoutManager() { + return mock(LayoutManager.class); + } + @Override public void close() { } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java index 4cdf11d1e..626a05581 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriverTest.java @@ -20,8 +20,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -33,6 +34,7 @@ import org.apache.bookkeeper.discover.ZKRegistrationClient; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; +import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -70,7 +72,8 @@ public void testGetRegClient() throws Exception { ZKRegistrationClient mockRegClient = PowerMockito.mock(ZKRegistrationClient.class); PowerMockito.whenNew(ZKRegistrationClient.class) - .withNoArguments() + .withParameterTypes(ZooKeeper.class, String.class, ScheduledExecutorService.class) + .withArguments(any(ZooKeeper.class), anyString(), any(ScheduledExecutorService.class)) .thenReturn(mockRegClient); RegistrationClient client = driver.getRegistrationClient(); @@ -78,13 +81,7 @@ public void testGetRegClient() throws Exception { assertSame(mockRegClient, driver.regClient); PowerMockito.verifyNew(ZKRegistrationClient.class, times(1)) - .withNoArguments(); - verify(mockRegClient, times(1)) - .initialize( - same(conf), - same(mockExecutor), - same(NullStatsLogger.INSTANCE), - eq(Optional.of(driver.zk))); + .withArguments(eq(mockZkc), eq(ledgersRootPath), eq(mockExecutor)); driver.close(); verify(mockRegClient, times(1)).close(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseStaticTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseStaticTest.java index 2ce4e34fd..0ed3f7d31 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseStaticTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseStaticTest.java @@ -82,6 +82,16 @@ public void testResolveLedgerManagerFactoryFlat() { ); } + @SuppressWarnings("deprecation") + @Test + public void testResolveLedgerManagerFactoryMs() { + assertEquals( + org.apache.bookkeeper.meta.MSLedgerManagerFactory.class, + ZKMetadataDriverBase.resolveLedgerManagerFactory( + URI.create("zk+ms://127.0.0.1/ledgers")) + ); + } + @Test public void testResolveLedgerManagerFactoryHierarchical() { assertEquals( diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java index f1ed0c025..7a477c0ac 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.meta.zk; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -77,7 +78,7 @@ public void testInitialize() throws Exception { driver.ledgersRootPath); assertTrue(driver.ownZKHandle); - String readonlyPath = "/path/to/ledgers/" + AVAILABLE_NODE; + String readonlyPath = "/path/to/ledgers/" + AVAILABLE_NODE + "/" + READONLY; assertSame(mockZkc, driver.zk); verifyStatic(ZooKeeperClient.class, times(1)); ZooKeeperClient.newBuilder(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java index 453769426..22d5f1b4f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java @@ -38,12 +38,14 @@ public abstract class ZKMetadataDriverTestBase { protected AbstractConfiguration<?> conf; + protected String ledgersRootPath; protected String metadataServiceUri; protected ZooKeeperClient.Builder mockZkBuilder; protected ZooKeeperClient mockZkc; public void setup(AbstractConfiguration<?> conf) throws Exception { - metadataServiceUri = "zk://127.0.0.1/path/to/ledgers"; + ledgersRootPath = "/path/to/ledgers"; + metadataServiceUri = "zk://127.0.0.1" + ledgersRootPath; this.conf = conf; conf.setMetadataServiceUri(metadataServiceUri); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java index eb282296c..4190bf5c4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java @@ -26,10 +26,15 @@ import com.google.common.collect.Maps; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.api.BKException.Code; +import org.apache.bookkeeper.common.testing.executors.MockExecutorController; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.AsyncCallback.Children2Callback; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; @@ -50,11 +55,20 @@ protected final ConcurrentMap<String, Set<Watcher>> watchers = Maps.newConcurrentMap(); protected ZooKeeper mockZk; + protected ScheduledExecutorService zkCallbackExecutor; + protected MockExecutorController zkCallbackController; protected void setup() throws Exception { this.mockZk = mock(ZooKeeper.class); PowerMockito.mockStatic(ZkUtils.class); + + this.zkCallbackExecutor = mock(ScheduledExecutorService.class); + this.zkCallbackController = new MockExecutorController() + .controlExecute(zkCallbackExecutor) + .controlSubmit(zkCallbackExecutor) + .controlSchedule(zkCallbackExecutor) + .controlScheduleAtFixedRate(zkCallbackExecutor, 10); } private void addWatcher(String path, Watcher watcher) { @@ -218,4 +232,45 @@ protected boolean notifyWatchedEvent(EventType eventType, return true; } + protected void mockGetChildren(String expectedPath, + boolean expectedWatcher, + int retCode, + List<String> retChildren, + Stat retStat) { + mockGetChildren( + expectedPath, expectedWatcher, retCode, retChildren, retStat, 0); + } + + protected void mockGetChildren(String expectedPath, + boolean expectedWatcher, + int retCode, + List<String> retChildren, + Stat retStat, + long delayMs) { + doAnswer(invocationOnMock -> { + String p = invocationOnMock.getArgument(0); + Watcher w = invocationOnMock.getArgument(1); + Children2Callback callback = invocationOnMock.getArgument(2); + Object ctx = invocationOnMock.getArgument(3); + + if (Code.OK == retCode) { + addWatcher(p, w); + } + + this.zkCallbackExecutor.schedule(() -> callback.processResult( + retCode, + p, + ctx, + retChildren, + retStat + ), delayMs, TimeUnit.MILLISECONDS); + return null; + + }).when(mockZk).getChildren( + eq(expectedPath), + expectedWatcher ? any(Watcher.class) : eq(null), + any(Children2Callback.class), + any()); + } + } diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java index 1d269f5ec..7d551ed55 100644 --- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java +++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java @@ -19,15 +19,19 @@ package org.apache.bookkeeper.tools.cli.helpers; import static org.mockito.Answers.CALLS_REAL_METHODS; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.spy; +import java.net.URI; import org.apache.bookkeeper.client.api.BookKeeper; import org.apache.bookkeeper.client.api.BookKeeperBuilder; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.meta.MetadataClientDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; import org.junit.Before; import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; @@ -38,12 +42,13 @@ * A test base for testing client commands. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ ClientCommand.class, BookKeeper.class }) +@PrepareForTest({ ClientCommand.class, BookKeeper.class, MetadataDrivers.class }) public abstract class ClientCommandTestBase extends CommandTestBase { protected ClientConfiguration clientConf; protected BookKeeperBuilder mockBkBuilder; protected BookKeeper mockBk; + protected MetadataClientDriver metadataClientDriver; @Before public void setup() throws Exception { @@ -60,6 +65,12 @@ public void setup() throws Exception { BookKeeper.class, "newBuilder", eq(clientConf)) .thenReturn(mockBkBuilder); when(mockBkBuilder.build()).thenReturn(mockBk); + + PowerMockito.mockStatic(MetadataDrivers.class); + this.metadataClientDriver = mock(MetadataClientDriver.class); + PowerMockito.when( + MetadataDrivers.getClientDriver(any(URI.class))) + .thenReturn(metadataClientDriver); } } diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java index 1677309e1..f2c6dc6e7 100644 --- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java +++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java @@ -24,17 +24,19 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import java.net.URI; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import org.apache.bookkeeper.client.api.BookKeeper; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.discover.RegistrationClient; +import org.apache.bookkeeper.meta.MetadataClientDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.util.ReflectionUtils; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,19 +48,20 @@ * Unit test of {@link DiscoveryCommand}. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ DiscoveryCommand.class, ReflectionUtils.class }) +@PrepareForTest({ DiscoveryCommand.class, MetadataDrivers.class }) public class DiscoveryCommandTest { private DiscoveryCommand cmd; private ServerConfiguration serverConf; private ClientConfiguration clientConf; private RegistrationClient regClient; + private MetadataClientDriver clientDriver; private ScheduledExecutorService executor; @Before public void setup() throws Exception { PowerMockito.mockStatic(Executors.class); - PowerMockito.mockStatic(ReflectionUtils.class); + PowerMockito.mockStatic(MetadataDrivers.class); this.cmd = mock(DiscoveryCommand.class, CALLS_REAL_METHODS); @@ -74,7 +77,10 @@ public void setup() throws Exception { .thenReturn(executor); this.regClient = mock(RegistrationClient.class); - PowerMockito.when(ReflectionUtils.newInstance(any())) + this.clientDriver = mock(MetadataClientDriver.class); + PowerMockito.when(MetadataDrivers.getClientDriver(any(URI.class))) + .thenReturn(clientDriver); + when(clientDriver.getRegistrationClient()) .thenReturn(regClient); } @@ -82,9 +88,10 @@ public void setup() throws Exception { public void testRun() throws Exception { cmd.run(serverConf); verify(cmd, times(1)).run(eq(regClient)); - verify(regClient, times(1)) + verify(clientDriver, times(1)) .initialize(eq(clientConf), eq(executor), eq(NullStatsLogger.INSTANCE), eq(Optional.empty())); - verify(regClient, times(1)).close(); + verify(clientDriver, times(1)).getRegistrationClient(); + verify(clientDriver, times(1)).close(); verify(executor, times(1)).shutdown(); } diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTestBase.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTestBase.java index 7f3588ebc..a3dfb9d71 100644 --- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTestBase.java +++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTestBase.java @@ -37,7 +37,7 @@ * A test base for discovery related commands. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ DiscoveryCommand.class, ReflectionUtils.class }) +@PrepareForTest({ DiscoveryCommand.class }) public abstract class DiscoveryCommandTestBase extends ClientCommandTestBase { protected RegistrationClient regClient; @@ -48,14 +48,13 @@ public void setup() throws Exception { super.setup(); PowerMockito.mockStatic(Executors.class); - PowerMockito.mockStatic(ReflectionUtils.class); this.executor = mock(ScheduledExecutorService.class); PowerMockito.when(Executors.newSingleThreadScheduledExecutor()) .thenReturn(executor); this.regClient = mock(RegistrationClient.class); - PowerMockito.when(ReflectionUtils.newInstance(any())) + PowerMockito.when(metadataClientDriver.getRegistrationClient()) .thenReturn(regClient); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
