This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch fix/endpoint-discovery-impl
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/fix/endpoint-discovery-impl by 
this push:
     new 0f855c9  Integration with the components stack
0f855c9 is described below

commit 0f855c917830bffdda9ebe572f42400c916156ec
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Dec 12 16:36:45 2019 +0100

    Integration with the components stack
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  | 37 ++++++----------------
 .../bookkeeper/bookie/BookieStateManager.java      | 18 +++++++----
 .../apache/bookkeeper/bookie/ReadOnlyBookie.java   |  9 ++++--
 .../org/apache/bookkeeper/proto/BookieServer.java  | 15 +++++----
 .../java/org/apache/bookkeeper/server/Main.java    | 20 +++++++++++-
 .../bookkeeper/server/service/BookieService.java   |  7 ++--
 .../resolver/BKRegistrationNameResolverTest.java   |  3 +-
 7 files changed, 62 insertions(+), 47 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 6125970..0c3535a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -57,6 +57,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
@@ -73,6 +74,7 @@ import org.apache.bookkeeper.bookie.stats.BookieStats;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -123,6 +125,7 @@ public class Bookie extends BookieCriticalThread {
     static final long METAENTRY_ID_LEDGER_EXPLICITLAC  = -0x8000;
 
     private final LedgerDirsManager ledgerDirsManager;
+    protected final Supplier<BookieServiceInfo> bookieServiceInfoProvider;
     private final LedgerDirsManager indexDirsManager;
     LedgerDirsMonitor dirsMonitor;
 
@@ -605,7 +608,7 @@ public class Bookie extends BookieCriticalThread {
 
     public Bookie(ServerConfiguration conf)
             throws IOException, InterruptedException, BookieException {
-        this(conf, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
+        this(conf, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT, 
() -> BookieServiceInfo.EMPTY);
     }
 
     private static LedgerStorage buildLedgerStorage(ServerConfiguration conf) 
throws IOException {
@@ -667,10 +670,12 @@ public class Bookie extends BookieCriticalThread {
 
         return ledgerStorage;
     }
-
-    public Bookie(ServerConfiguration conf, StatsLogger statsLogger, 
ByteBufAllocator allocator)
+        
+    public Bookie(ServerConfiguration conf, StatsLogger statsLogger,
+            ByteBufAllocator allocator, Supplier<BookieServiceInfo> 
bookieServiceInfoProvider)
             throws IOException, InterruptedException, BookieException {
         super("Bookie-" + conf.getBookiePort());
+        this.bookieServiceInfoProvider = bookieServiceInfoProvider;
         this.statsLogger = statsLogger;
         this.conf = conf;
         this.journalDirectories = Lists.newArrayList();
@@ -791,7 +796,8 @@ public class Bookie extends BookieCriticalThread {
     }
 
     StateManager initializeStateManager() throws IOException {
-        return new BookieStateManager(conf, statsLogger, metadataDriver, 
ledgerDirsManager);
+        return new BookieStateManager(conf, statsLogger, metadataDriver,
+                ledgerDirsManager, bookieServiceInfoProvider);
     }
 
     void readJournal() throws IOException, BookieException {
@@ -1578,29 +1584,6 @@ public class Bookie extends BookieCriticalThread {
     }
 
     /**
-     * @param args
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public static void main(String[] args)
-            throws IOException, InterruptedException, BookieException {
-        Bookie b = new Bookie(new ServerConfiguration());
-        b.start();
-        CounterCallback cb = new CounterCallback();
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < 100000; i++) {
-            ByteBuf buff = Unpooled.buffer(1024);
-            buff.writeLong(1);
-            buff.writeLong(i);
-            cb.incCount();
-            b.addEntry(buff, false /* ackBeforeSync */, cb, null, new byte[0]);
-        }
-        cb.waitZero();
-        long end = System.currentTimeMillis();
-        System.out.println("Took " + (end - start) + "ms");
-    }
-
-    /**
      * Returns exit code - cause of failure.
      *
      * @return {@link ExitCode}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
index 56f513f..fbd043c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -62,7 +62,7 @@ import org.slf4j.LoggerFactory;
 public class BookieStateManager implements StateManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(BookieStateManager.class);
     private final ServerConfiguration conf;
-    private final BookieServiceInfo bookieServiceInfo = 
BookieServiceInfo.EMPTY;
+    private final Supplier<BookieServiceInfo> bookieServiceInfoProvider;
     private final List<File> statusDirs;
 
     // use an executor to execute the state changes task
@@ -92,7 +92,8 @@ public class BookieStateManager implements StateManager {
     public BookieStateManager(ServerConfiguration conf,
                               StatsLogger statsLogger,
                               MetadataBookieDriver metadataDriver,
-                              LedgerDirsManager ledgerDirsManager) throws 
IOException {
+                              LedgerDirsManager ledgerDirsManager,
+                              Supplier<BookieServiceInfo> 
bookieServiceInfoProvider) throws IOException {
         this(
             conf,
             statsLogger,
@@ -104,18 +105,21 @@ public class BookieStateManager implements StateManager {
                 } catch (UnknownHostException e) {
                     throw new UncheckedIOException("Failed to resolve bookie 
id", e);
                 }
-            });
+            },
+            bookieServiceInfoProvider);
     }
     public BookieStateManager(ServerConfiguration conf,
                               StatsLogger statsLogger,
                               Supplier<RegistrationManager> rm,
                               List<File> statusDirs,
-                              Supplier<String> bookieIdSupplier) throws 
IOException {
+                              Supplier<String> bookieIdSupplier,
+                              Supplier<BookieServiceInfo> 
bookieServiceInfoProvider) throws IOException {
         this.conf = conf;
         this.rm = rm;
         this.statusDirs = statusDirs;
         // ZK ephemeral node for this Bookie.
         this.bookieId = bookieIdSupplier.get();
+        this.bookieServiceInfoProvider = bookieServiceInfoProvider;
         // 1 : up, 0 : readonly, -1 : unregistered
         this.serverStatusGauge = new Gauge<Number>() {
             @Override
@@ -145,7 +149,7 @@ public class BookieStateManager implements StateManager {
     BookieStateManager(ServerConfiguration conf, MetadataBookieDriver 
metadataDriver) throws IOException {
         this(conf, NullStatsLogger.INSTANCE, metadataDriver, new 
LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()),
-                NullStatsLogger.INSTANCE));
+                NullStatsLogger.INSTANCE), () -> BookieServiceInfo.EMPTY);
     }
 
     @Override
@@ -265,7 +269,7 @@ public class BookieStateManager implements StateManager {
 
         rmRegistered.set(false);
         try {
-            rm.get().registerBookie(bookieId, isReadOnly, bookieServiceInfo);
+            rm.get().registerBookie(bookieId, isReadOnly, 
bookieServiceInfoProvider.get());
             rmRegistered.set(true);
         } catch (BookieException e) {
             throw new IOException(e);
@@ -335,7 +339,7 @@ public class BookieStateManager implements StateManager {
             return;
         }
         try {
-            rm.get().registerBookie(bookieId, true, bookieServiceInfo);
+            rm.get().registerBookie(bookieId, true, 
bookieServiceInfoProvider.get());
         } catch (BookieException e) {
             LOG.error("Error in transition to ReadOnly Mode."
                     + " Shutting down", e);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index cb6e9f0..519f667 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -24,7 +24,9 @@ package org.apache.bookkeeper.bookie;
 import io.netty.buffer.ByteBufAllocator;
 
 import java.io.IOException;
+import java.util.function.Supplier;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -40,9 +42,10 @@ public class ReadOnlyBookie extends Bookie {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ReadOnlyBookie.class);
 
-    public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger, 
ByteBufAllocator allocator)
+    public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger,
+            ByteBufAllocator allocator, Supplier<BookieServiceInfo> 
bookieServiceInfoProvider)
             throws IOException, KeeperException, InterruptedException, 
BookieException {
-        super(conf, statsLogger, allocator);
+        super(conf, statsLogger, allocator, bookieServiceInfoProvider);
         if (conf.isReadOnlyModeEnabled()) {
             stateManager.forceToReadOnly();
         } else {
@@ -55,7 +58,7 @@ public class ReadOnlyBookie extends Bookie {
 
     @Override
     StateManager initializeStateManager() throws IOException {
-        return new BookieStateManager(conf, statsLogger, metadataDriver, 
getLedgerDirsManager()) {
+        return new BookieStateManager(conf, statsLogger, metadataDriver, 
getLedgerDirsManager(), bookieServiceInfoProvider) {
 
             @Override
             public void doTransitionToWritableMode() {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 14d6393..1819e47 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -33,6 +33,7 @@ import java.net.UnknownHostException;
 import java.security.AccessControlException;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
@@ -42,6 +43,7 @@ import org.apache.bookkeeper.bookie.ReadOnlyBookie;
 import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
 import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import 
org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -82,10 +84,10 @@ public class BookieServer {
     public BookieServer(ServerConfiguration conf) throws IOException,
             KeeperException, InterruptedException, BookieException,
             UnavailableException, CompatibilityException, SecurityException {
-        this(conf, NullStatsLogger.INSTANCE);
+        this(conf, NullStatsLogger.INSTANCE, () -> BookieServiceInfo.EMPTY);
     }
 
-    public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
+    public BookieServer(ServerConfiguration conf, StatsLogger statsLogger, 
Supplier<BookieServiceInfo> bookieServiceInfoProvider)
             throws IOException, KeeperException, InterruptedException,
             BookieException, UnavailableException, CompatibilityException, 
SecurityException {
         this.conf = conf;
@@ -102,7 +104,7 @@ public class BookieServer {
         this.statsLogger = statsLogger;
         this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
         try {
-            this.bookie = newBookie(conf, allocator);
+            this.bookie = newBookie(conf, allocator, 
bookieServiceInfoProvider);
         } catch (IOException | KeeperException | InterruptedException | 
BookieException e) {
             // interrupted on constructing a bookie
             this.nettyServer.shutdown();
@@ -129,11 +131,12 @@ public class BookieServer {
         this.uncaughtExceptionHandler = exceptionHandler;
     }
 
-    protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator 
allocator)
+    protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator 
allocator,
+            Supplier<BookieServiceInfo> bookieServiceInfoProvider)
         throws IOException, KeeperException, InterruptedException, 
BookieException {
         return conf.isForceReadOnlyBookie()
-            ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE), 
allocator)
-            : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator);
+            ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE), 
allocator, bookieServiceInfoProvider)
+            : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator, 
bookieServiceInfoProvider);
     }
 
     public void start() throws InterruptedException {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index ec163f8..8bbd492 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -24,8 +24,12 @@ import static 
org.apache.bookkeeper.server.component.ServerLifecycleComponent.lo
 import java.io.File;
 import java.net.MalformedURLException;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.bookie.ScrubberStats;
@@ -34,6 +38,7 @@ import 
org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.UncheckedConfigurationException;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
 import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
@@ -297,9 +302,22 @@ public class Main {
         serverBuilder.addComponent(statsProviderService);
         log.info("Load lifecycle component : {}", 
StatsProviderService.class.getName());
 
+        final Map<String, String> allBookieServicesInfo = new HashMap<>();
+        final Supplier<BookieServiceInfo> bookieServiceInfoProvider = () -> 
new BookieServiceInfo() {
+            @Override
+            public Iterator<String> keys() {
+                return allBookieServicesInfo.keySet().iterator();
+            }
+
+            @Override
+            public String get(String key, String defaultValue) {
+                return allBookieServicesInfo.getOrDefault(key, defaultValue);
+            }
+        };
+        
         // 2. build bookie server
         BookieService bookieService =
-            new BookieService(conf, rootStatsLogger);
+            new BookieService(conf, rootStatsLogger, 
bookieServiceInfoProvider);
 
         serverBuilder.addComponent(bookieService);
         log.info("Load lifecycle component : {}", 
BookieService.class.getName());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
index 645da03..eea5f0f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.server.service;
 
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
@@ -35,10 +37,11 @@ public class BookieService extends ServerLifecycleComponent 
{
     private final BookieServer server;
 
     public BookieService(BookieConfiguration conf,
-                         StatsLogger statsLogger)
+                         StatsLogger statsLogger,
+                         Supplier<BookieServiceInfo> bookieServiceInfoProvider)
             throws Exception {
         super(NAME, conf, statsLogger);
-        this.server = new BookieServer(conf.getServerConf(), statsLogger);
+        this.server = new BookieServer(conf.getServerConf(), statsLogger, 
bookieServiceInfoProvider);
     }
 
     @Override
diff --git 
a/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
 
b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
index 726c8b0..31bdac0 100644
--- 
a/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
+++ 
b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.meta.MetadataBookieDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -102,7 +103,7 @@ public class BKRegistrationNameResolverTest extends 
BookKeeperClusterTestCase {
             InetSocketAddress address = new InetSocketAddress("127.0.0.1", 
3181 + i);
             addressSet.add(address);
             bookieDriver.getRegistrationManager().registerBookie(
-                "127.0.0.1:" + (3181 + i), false
+                "127.0.0.1:" + (3181 + i), false, BookieServiceInfo.EMPTY
             );
         }
 

Reply via email to