This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch fix/endpoint-discovery-impl
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/fix/endpoint-discovery-impl by
this push:
new a492e10 Let component publish information
a492e10 is described below
commit a492e107aae3b6337beadf73c30fd7dd85cebe16
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Dec 12 19:03:52 2019 +0100
Let component publish information
---
.../component/AbstractLifecycleComponent.java | 6 ++++
.../common/component/LifecycleListener.java | 2 ++
...Listener.java => LifecycleListenerAdapter.java} | 32 ++++++++++++-----
.../org/apache/bookkeeper/proto/BookieServer.java | 6 ++++
.../java/org/apache/bookkeeper/server/Main.java | 41 +++++++++++++++-------
.../bookkeeper/server/service/BookieService.java | 3 ++
6 files changed, 69 insertions(+), 21 deletions(-)
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/AbstractLifecycleComponent.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/AbstractLifecycleComponent.java
index 5b6f19e..ea2440a 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/AbstractLifecycleComponent.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/AbstractLifecycleComponent.java
@@ -73,6 +73,12 @@ public abstract class AbstractLifecycleComponent<ConfT
extends ComponentConfigur
listeners.remove(listener);
}
+ protected void publishEndpointInfo(String key, String value) {
+ listeners.forEach(l -> {
+ l.publishEndpointInfo(key, value);
+ });
+ }
+
@Override
public void start() {
if (!lifecycle.canMoveToStarted()) {
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListener.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListener.java
index 5056548..351be01 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListener.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListener.java
@@ -31,6 +31,8 @@ public interface LifecycleListener {
void afterStop();
+ void publishEndpointInfo(String key, String value);
+
void beforeClose();
void afterClose();
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListener.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListenerAdapter.java
similarity index 61%
copy from
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListener.java
copy to
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListenerAdapter.java
index 5056548..bf99c7d 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListener.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/LifecycleListenerAdapter.java
@@ -19,20 +19,36 @@
package org.apache.bookkeeper.common.component;
/**
- * Listener listening of the lifecycle changes.
+ * Default implementation for Listener listening of the lifecycle changes.
*/
-public interface LifecycleListener {
+public class LifecycleListenerAdapter implements LifecycleListener {
- void beforeStart();
+ @Override
+ public void beforeStart() {
+ }
- void afterStart();
+ @Override
+ public void afterStart() {
+ }
- void beforeStop();
+ @Override
+ public void beforeStop() {
+ }
- void afterStop();
+ @Override
+ public void afterStop() {
+ }
- void beforeClose();
+ @Override
+ public void publishEndpointInfo(String key, String value) {
+ }
- void afterClose();
+ @Override
+ public void beforeClose() {
+ }
+
+ @Override
+ public void afterClose() {
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 1819e47..22870cf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -33,6 +33,7 @@ import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.Bookie;
@@ -118,6 +119,11 @@ public class BookieServer {
statsLogger.scope(SERVER_SCOPE), shFactory,
bookie.getAllocator());
this.nettyServer.setRequestProcessor(this.requestProcessor);
}
+
+ public void publishEndpointInfo(BiConsumer<String, String>
serviceInfoPublisher) {
+ serviceInfoPublisher.accept("bookie.binary.address",
+
"bk;//"+nettyServer.bookieAddress.getHostName()+":"+nettyServer.bookieAddress.getPort());
+ }
/**
* Currently the uncaught exception handler is used for DeathWatcher to
notify
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index 8bbd492..0272eaa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -24,10 +24,10 @@ import static
org.apache.bookkeeper.server.component.ServerLifecycleComponent.lo
import java.io.File;
import java.net.MalformedURLException;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
@@ -36,6 +36,8 @@ import org.apache.bookkeeper.bookie.ScrubberStats;
import org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.common.component.LifecycleComponentStack;
+import org.apache.bookkeeper.common.component.LifecycleListener;
+import org.apache.bookkeeper.common.component.LifecycleListenerAdapter;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.UncheckedConfigurationException;
import org.apache.bookkeeper.discover.BookieServiceInfo;
@@ -292,17 +294,8 @@ public class Main {
* @return lifecycle stack
*/
public static LifecycleComponentStack
buildBookieServer(BookieConfiguration conf) throws Exception {
- LifecycleComponentStack.Builder serverBuilder =
LifecycleComponentStack.newBuilder().withName("bookie-server");
- // 1. build stats provider
- StatsProviderService statsProviderService =
- new StatsProviderService(conf);
- StatsLogger rootStatsLogger =
statsProviderService.getStatsProvider().getStatsLogger("");
-
- serverBuilder.addComponent(statsProviderService);
- log.info("Load lifecycle component : {}",
StatsProviderService.class.getName());
-
- final Map<String, String> allBookieServicesInfo = new HashMap<>();
+ final Map<String, String> allBookieServicesInfo = new
ConcurrentHashMap<>();
final Supplier<BookieServiceInfo> bookieServiceInfoProvider = () ->
new BookieServiceInfo() {
@Override
public Iterator<String> keys() {
@@ -314,11 +307,32 @@ public class Main {
return allBookieServicesInfo.getOrDefault(key, defaultValue);
}
};
-
+ final LifecycleListener bookieInfoServiceListener = new
LifecycleListenerAdapter() {
+
+ @Override
+ public void publishEndpointInfo(String key, String value) {
+ log.info("Publishing endpointInfo {}={}", key, value);
+ allBookieServicesInfo.put(key, value);
+ }
+
+ };
+
+ LifecycleComponentStack.Builder serverBuilder =
LifecycleComponentStack.newBuilder().withName("bookie-server");
+
+ // 1. build stats provider
+ StatsProviderService statsProviderService =
+ new StatsProviderService(conf);
+ StatsLogger rootStatsLogger =
statsProviderService.getStatsProvider().getStatsLogger("");
+ statsProviderService.addLifecycleListener(bookieInfoServiceListener);
+
+ serverBuilder.addComponent(statsProviderService);
+ log.info("Load lifecycle component : {}",
StatsProviderService.class.getName());
+
// 2. build bookie server
BookieService bookieService =
new BookieService(conf, rootStatsLogger,
bookieServiceInfoProvider);
+ bookieService.addLifecycleListener(bookieInfoServiceListener);
serverBuilder.addComponent(bookieService);
log.info("Load lifecycle component : {}",
BookieService.class.getName());
@@ -347,7 +361,7 @@ public class Main {
.build();
HttpService httpService =
new HttpService(provider, conf, rootStatsLogger);
-
+ httpService.addLifecycleListener(bookieInfoServiceListener);
serverBuilder.addComponent(httpService);
log.info("Load lifecycle component : {}",
HttpService.class.getName());
}
@@ -361,6 +375,7 @@ public class Main {
conf,
rootStatsLogger);
for (ServerLifecycleComponent component : components) {
+ component.addLifecycleListener(bookieInfoServiceListener);
serverBuilder.addComponent(component);
log.info("Load lifecycle component : {}",
component.getClass().getName());
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
index eea5f0f..ef00973 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/BookieService.java
@@ -58,6 +58,9 @@ public class BookieService extends ServerLifecycleComponent {
protected void doStart() {
try {
this.server.start();
+ this.server.publishEndpointInfo((key, value) -> {
+ this.publishEndpointInfo(key, value);
+ });
} catch (InterruptedException exc) {
throw new RuntimeException("Failed to start bookie server", exc);
}