This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch fix/endpoint-discovery-impl
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/fix/endpoint-discovery-impl by
this push:
new 0f855c9 Integration with the components stack
0f855c9 is described below
commit 0f855c917830bffdda9ebe572f42400c916156ec
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Dec 12 16:36:45 2019 +0100
Integration with the components stack
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 37 ++++++----------------
.../bookkeeper/bookie/BookieStateManager.java | 18 +++++++----
.../apache/bookkeeper/bookie/ReadOnlyBookie.java | 9 ++++--
.../org/apache/bookkeeper/proto/BookieServer.java | 15 +++++----
.../java/org/apache/bookkeeper/server/Main.java | 20 +++++++++++-
.../bookkeeper/server/service/BookieService.java | 7 ++--
.../resolver/BKRegistrationNameResolverTest.java | 3 +-
7 files changed, 62 insertions(+), 47 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 6125970..0c3535a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -57,6 +57,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
@@ -73,6 +74,7 @@ import org.apache.bookkeeper.bookie.stats.BookieStats;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -123,6 +125,7 @@ public class Bookie extends BookieCriticalThread {
static final long METAENTRY_ID_LEDGER_EXPLICITLAC = -0x8000;
private final LedgerDirsManager ledgerDirsManager;
+ protected final Supplier<BookieServiceInfo> bookieServiceInfoProvider;
private final LedgerDirsManager indexDirsManager;
LedgerDirsMonitor dirsMonitor;
@@ -605,7 +608,7 @@ public class Bookie extends BookieCriticalThread {
public Bookie(ServerConfiguration conf)
throws IOException, InterruptedException, BookieException {
- this(conf, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
+ this(conf, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT,
() -> BookieServiceInfo.EMPTY);
}
private static LedgerStorage buildLedgerStorage(ServerConfiguration conf)
throws IOException {
@@ -667,10 +670,12 @@ public class Bookie extends BookieCriticalThread {
return ledgerStorage;
}
-
- public Bookie(ServerConfiguration conf, StatsLogger statsLogger,
ByteBufAllocator allocator)
+
+ public Bookie(ServerConfiguration conf, StatsLogger statsLogger,
+ ByteBufAllocator allocator, Supplier<BookieServiceInfo>
bookieServiceInfoProvider)
throws IOException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
+ this.bookieServiceInfoProvider = bookieServiceInfoProvider;
this.statsLogger = statsLogger;
this.conf = conf;
this.journalDirectories = Lists.newArrayList();
@@ -791,7 +796,8 @@ public class Bookie extends BookieCriticalThread {
}
StateManager initializeStateManager() throws IOException {
- return new BookieStateManager(conf, statsLogger, metadataDriver,
ledgerDirsManager);
+ return new BookieStateManager(conf, statsLogger, metadataDriver,
+ ledgerDirsManager, bookieServiceInfoProvider);
}
void readJournal() throws IOException, BookieException {
@@ -1578,29 +1584,6 @@ public class Bookie extends BookieCriticalThread {
}
/**
- * @param args
- * @throws IOException
- * @throws InterruptedException
- */
- public static void main(String[] args)
- throws IOException, InterruptedException, BookieException {
- Bookie b = new Bookie(new ServerConfiguration());
- b.start();
- CounterCallback cb = new CounterCallback();
- long start = System.currentTimeMillis();
- for (int i = 0; i < 100000; i++) {
- ByteBuf buff = Unpooled.buffer(1024);
- buff.writeLong(1);
- buff.writeLong(i);
- cb.incCount();
- b.addEntry(buff, false /* ackBeforeSync */, cb, null, new byte[0]);
- }
- cb.waitZero();
- long end = System.currentTimeMillis();
- System.out.println("Took " + (end - start) + "ms");
- }
-
- /**
* Returns exit code - cause of failure.
*
* @return {@link ExitCode}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
index 56f513f..fbd043c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -62,7 +62,7 @@ import org.slf4j.LoggerFactory;
public class BookieStateManager implements StateManager {
private static final Logger LOG =
LoggerFactory.getLogger(BookieStateManager.class);
private final ServerConfiguration conf;
- private final BookieServiceInfo bookieServiceInfo =
BookieServiceInfo.EMPTY;
+ private final Supplier<BookieServiceInfo> bookieServiceInfoProvider;
private final List<File> statusDirs;
// use an executor to execute the state changes task
@@ -92,7 +92,8 @@ public class BookieStateManager implements StateManager {
public BookieStateManager(ServerConfiguration conf,
StatsLogger statsLogger,
MetadataBookieDriver metadataDriver,
- LedgerDirsManager ledgerDirsManager) throws
IOException {
+ LedgerDirsManager ledgerDirsManager,
+ Supplier<BookieServiceInfo>
bookieServiceInfoProvider) throws IOException {
this(
conf,
statsLogger,
@@ -104,18 +105,21 @@ public class BookieStateManager implements StateManager {
} catch (UnknownHostException e) {
throw new UncheckedIOException("Failed to resolve bookie
id", e);
}
- });
+ },
+ bookieServiceInfoProvider);
}
public BookieStateManager(ServerConfiguration conf,
StatsLogger statsLogger,
Supplier<RegistrationManager> rm,
List<File> statusDirs,
- Supplier<String> bookieIdSupplier) throws
IOException {
+ Supplier<String> bookieIdSupplier,
+ Supplier<BookieServiceInfo>
bookieServiceInfoProvider) throws IOException {
this.conf = conf;
this.rm = rm;
this.statusDirs = statusDirs;
// ZK ephemeral node for this Bookie.
this.bookieId = bookieIdSupplier.get();
+ this.bookieServiceInfoProvider = bookieServiceInfoProvider;
// 1 : up, 0 : readonly, -1 : unregistered
this.serverStatusGauge = new Gauge<Number>() {
@Override
@@ -145,7 +149,7 @@ public class BookieStateManager implements StateManager {
BookieStateManager(ServerConfiguration conf, MetadataBookieDriver
metadataDriver) throws IOException {
this(conf, NullStatsLogger.INSTANCE, metadataDriver, new
LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()),
- NullStatsLogger.INSTANCE));
+ NullStatsLogger.INSTANCE), () -> BookieServiceInfo.EMPTY);
}
@Override
@@ -265,7 +269,7 @@ public class BookieStateManager implements StateManager {
rmRegistered.set(false);
try {
- rm.get().registerBookie(bookieId, isReadOnly, bookieServiceInfo);
+ rm.get().registerBookie(bookieId, isReadOnly,
bookieServiceInfoProvider.get());
rmRegistered.set(true);
} catch (BookieException e) {
throw new IOException(e);
@@ -335,7 +339,7 @@ public class BookieStateManager implements StateManager {
return;
}
try {
- rm.get().registerBookie(bookieId, true, bookieServiceInfo);
+ rm.get().registerBookie(bookieId, true,
bookieServiceInfoProvider.get());
} catch (BookieException e) {
LOG.error("Error in transition to ReadOnly Mode."
+ " Shutting down", e);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index cb6e9f0..519f667 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -24,7 +24,9 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
+import java.util.function.Supplier;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -40,9 +42,10 @@ public class ReadOnlyBookie extends Bookie {
private static final Logger LOG =
LoggerFactory.getLogger(ReadOnlyBookie.class);
- public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger,
ByteBufAllocator allocator)
+ public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger,
+ ByteBufAllocator allocator, Supplier<BookieServiceInfo>
bookieServiceInfoProvider)
throws IOException, KeeperException, InterruptedException,
BookieException {
- super(conf, statsLogger, allocator);
+ super(conf, statsLogger, allocator, bookieServiceInfoProvider);
if (conf.isReadOnlyModeEnabled()) {
stateManager.forceToReadOnly();
} else {
@@ -55,7 +58,7 @@ public class ReadOnlyBookie extends Bookie {
@Override
StateManager initializeStateManager() throws IOException {
- return new BookieStateManager(conf, statsLogger, metadataDriver,
getLedgerDirsManager()) {
+ return new BookieStateManager(conf, statsLogger, metadataDriver,
getLedgerDirsManager(), bookieServiceInfoProvider) {
@Override
public void doTransitionToWritableMode() {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 14d6393..1819e47 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -33,6 +33,7 @@ import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieCriticalThread;
@@ -42,6 +43,7 @@ import org.apache.bookkeeper.bookie.ReadOnlyBookie;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
import
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -82,10 +84,10 @@ public class BookieServer {
public BookieServer(ServerConfiguration conf) throws IOException,
KeeperException, InterruptedException, BookieException,
UnavailableException, CompatibilityException, SecurityException {
- this(conf, NullStatsLogger.INSTANCE);
+ this(conf, NullStatsLogger.INSTANCE, () -> BookieServiceInfo.EMPTY);
}
- public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
+ public BookieServer(ServerConfiguration conf, StatsLogger statsLogger,
Supplier<BookieServiceInfo> bookieServiceInfoProvider)
throws IOException, KeeperException, InterruptedException,
BookieException, UnavailableException, CompatibilityException,
SecurityException {
this.conf = conf;
@@ -102,7 +104,7 @@ public class BookieServer {
this.statsLogger = statsLogger;
this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
try {
- this.bookie = newBookie(conf, allocator);
+ this.bookie = newBookie(conf, allocator,
bookieServiceInfoProvider);
} catch (IOException | KeeperException | InterruptedException |
BookieException e) {
// interrupted on constructing a bookie
this.nettyServer.shutdown();
@@ -129,11 +131,12 @@ public class BookieServer {
this.uncaughtExceptionHandler = exceptionHandler;
}
- protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator
allocator)
+ protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator
allocator,
+ Supplier<BookieServiceInfo> bookieServiceInfoProvider)
throws IOException, KeeperException, InterruptedException,
BookieException {
return conf.isForceReadOnlyBookie()
- ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE),
allocator)
- : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator);
+ ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE),
allocator, bookieServiceInfoProvider)
+ : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator,
bookieServiceInfoProvider);
}
public void start() throws InterruptedException {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index ec163f8..8bbd492 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -24,8 +24,12 @@ import static
org.apache.bookkeeper.server.component.ServerLifecycleComponent.lo
import java.io.File;
import java.net.MalformedURLException;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.ExitCode;
import org.apache.bookkeeper.bookie.ScrubberStats;
@@ -34,6 +38,7 @@ import
org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.common.component.LifecycleComponentStack;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.UncheckedConfigurationException;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
@@ -297,9 +302,22 @@ public class Main {
serverBuilder.addComponent(statsProviderService);
log.info("Load lifecycle component : {}",
StatsProviderService.class.getName());
+ final Map<String, String> allBookieServicesInfo = new HashMap<>();
+ final Supplier<BookieServiceInfo> bookieServiceInfoProvider = () ->
new BookieServiceInfo() {
+ @Override
+ public Iterator<String> keys() {
+ return allBookieServicesInfo.keySet().iterator();
+ }
+
+ @Override
+ public String get(String key, String defaultValue) {
+ return allBookieServicesInfo.getOrDefault(key, defaultValue);
+ }
+ };
+
// 2. build bookie server
BookieService bookieService =
- new BookieService(conf, rootStatsLogger);
+ new BookieService(conf, rootStatsLogger,
bookieServiceInfoProvider);
serverBuilder.addComponent(bookieService);
log.info("Load lifecycle component : {}",
BookieService.class.getName());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
index 645da03..eea5f0f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.server.service;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
@@ -35,10 +37,11 @@ public class BookieService extends ServerLifecycleComponent
{
private final BookieServer server;
public BookieService(BookieConfiguration conf,
- StatsLogger statsLogger)
+ StatsLogger statsLogger,
+ Supplier<BookieServiceInfo> bookieServiceInfoProvider)
throws Exception {
super(NAME, conf, statsLogger);
- this.server = new BookieServer(conf.getServerConf(), statsLogger);
+ this.server = new BookieServer(conf.getServerConf(), statsLogger,
bookieServiceInfoProvider);
}
@Override
diff --git
a/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
index 726c8b0..31bdac0 100644
---
a/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
+++
b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -102,7 +103,7 @@ public class BKRegistrationNameResolverTest extends
BookKeeperClusterTestCase {
InetSocketAddress address = new InetSocketAddress("127.0.0.1",
3181 + i);
addressSet.add(address);
bookieDriver.getRegistrationManager().registerBookie(
- "127.0.0.1:" + (3181 + i), false
+ "127.0.0.1:" + (3181 + i), false, BookieServiceInfo.EMPTY
);
}