This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f4e2344 [TABLE SERVICE] start table service as an extra component of
bookie
f4e2344 is described below
commit f4e234439ef45bdedb244bc2375e2f9d0426c20c
Author: Sijie Guo <[email protected]>
AuthorDate: Tue May 22 11:43:31 2018 -0700
[TABLE SERVICE] start table service as an extra component of bookie
Descriptions of the changes in this PR:
*Motivation*
table service was built over bookkeeper ledgers. it is an extension to
bookies' functionalities, so it is much convenient to start the service as one
additional component with bookie, just like how we start `AutoRecovery` along
with bookie.
*Solution*
- include `stream/server` module as part of bookkeeper server/all binary
distributions
- abstract `StorageServer` to allow controlling whether to start bookie or
not
- provide a ServerLifecycleComponent of `StorageServer`, so it can be used
as an extra component of bookie
*Tests*
- improve the integration tests to start table service as part of bookie
containers
- move `LocationClientTest` to container based integration tests
Master Issue: #1205
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
This closes #1422 from sijie/start_table_service
---
bookkeeper-dist/all/pom.xml | 7 ++
bookkeeper-dist/server/pom.xml | 7 ++
bookkeeper-dist/src/assemble/bin-all.xml | 3 +
bookkeeper-dist/src/assemble/bin-server.xml | 4 +
.../src/main/resources/LICENSE-all.bin.txt | 47 +++++++-
.../src/main/resources/LICENSE-server.bin.txt | 44 +++++++-
.../google-auth-library-credentials-0.4.0/LICENSE | 28 +++++
.../src/main/resources/deps/protobuf-3.0.0/LICENSE | 32 ++++++
.../src/main/resources/deps/protobuf-3.3.1/LICENSE | 32 ++++++
conf/bk_server.conf | 28 ++++-
pom.xml | 8 +-
.../bookkeeper/stream/cluster/StreamCluster.java | 38 ++-----
.../bookkeeper/stream/server/StorageServer.java | 118 ++++++++++++++-------
.../server/StreamStorageLifecycleComponent.java | 64 +++++++++++
.../stream/server/conf/BookieConfiguration.java | 4 +-
.../server/conf/StorageServerConfiguration.java | 11 ++
.../stream/server/service/BookieWatchService.java | 95 +++++++++++++++++
.../server/service/CuratorProviderService.java | 3 +
.../server/service/DLNamespaceProviderService.java | 1 +
.../service/RegistrationServiceProvider.java | 2 +
.../storage/api/cluster/ClusterInitializer.java | 47 ++++++++
.../storage/api/cluster/ClusterMetadataStore.java | 9 +-
stream/storage/impl/pom.xml | 10 --
.../impl/cluster/InMemClusterMetadataStore.java | 6 +-
.../storage/impl/cluster/ZkClusterInitializer.java | 95 +++++++++++++++++
.../impl/cluster/ZkClusterMetadataStore.java | 5 +-
.../cluster/ClusterControllerLeaderImplTest.java | 3 +-
.../tests/containers/BookieContainer.java | 34 ++++--
.../cluster/BookKeeperClusterTestBase.java | 23 +++-
.../integration/stream}/LocationClientTest.java | 26 +++--
.../integration/stream/StreamClusterTestBase.java | 73 +++++++++++++
.../tests/integration/topologies/BKCluster.java | 77 ++++++++++++--
.../integration/topologies/BKClusterSpec.java | 67 ++++++++++++
33 files changed, 924 insertions(+), 127 deletions(-)
diff --git a/bookkeeper-dist/all/pom.xml b/bookkeeper-dist/all/pom.xml
index ef25654..eb69948 100644
--- a/bookkeeper-dist/all/pom.xml
+++ b/bookkeeper-dist/all/pom.xml
@@ -92,6 +92,13 @@
<version>${project.version}</version>
</dependency>
+ <!-- stream.storage -->
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>stream-storage-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- bookkeeper benchmark -->
<dependency>
<groupId>org.apache.bookkeeper</groupId>
diff --git a/bookkeeper-dist/server/pom.xml b/bookkeeper-dist/server/pom.xml
index 5ae8fa1..7b00dbe 100644
--- a/bookkeeper-dist/server/pom.xml
+++ b/bookkeeper-dist/server/pom.xml
@@ -76,6 +76,13 @@
<version>${project.version}</version>
</dependency>
+ <!-- stream.storage -->
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>stream-storage-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- slf4j binding -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/bookkeeper-dist/src/assemble/bin-all.xml
b/bookkeeper-dist/src/assemble/bin-all.xml
index fd45d67..4a7c4dc 100644
--- a/bookkeeper-dist/src/assemble/bin-all.xml
+++ b/bookkeeper-dist/src/assemble/bin-all.xml
@@ -54,11 +54,14 @@
<directory>../src/main/resources/deps</directory>
<outputDirectory>/deps</outputDirectory>
<includes>
+ <include>google-auth-library-credentials-0.4.0/LICENSE</include>
<include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
<include>jsr-305/LICENSE</include>
<include>netty-3.10.1.Final/*</include>
<include>netty-4.1.12.Final/*</include>
<include>paranamer-2.8/LICENSE.txt</include>
+ <include>protobuf-3.0.0/LICENSE</include>
+ <include>protobuf-3.3.1/LICENSE</include>
<include>protobuf-3.4.0/LICENSE</include>
<include>scala-library-2.11.7/LICENSE.md</include>
<include>scala-parser-combinators_2.11-1.0.4/LICENSE.md</include>
diff --git a/bookkeeper-dist/src/assemble/bin-server.xml
b/bookkeeper-dist/src/assemble/bin-server.xml
index 7bbd3b2..ce5a646 100644
--- a/bookkeeper-dist/src/assemble/bin-server.xml
+++ b/bookkeeper-dist/src/assemble/bin-server.xml
@@ -49,8 +49,11 @@
<directory>../src/main/resources/deps</directory>
<outputDirectory>/deps</outputDirectory>
<includes>
+ <include>google-auth-library-credentials-0.4.0/LICENSE</include>
<include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
<include>netty-4.1.12.Final/*</include>
+ <include>protobuf-3.0.0/LICENSE</include>
+ <include>protobuf-3.3.1/LICENSE</include>
<include>protobuf-3.4.0/LICENSE</include>
<include>slf4j-1.7.25/LICENSE.txt</include>
</includes>
@@ -85,6 +88,7 @@
<!-- Include 'groupId' in the dependencies Jar names to better identify
the provenance of the jar -->
<outputFileNameMapping>${artifact.groupId}-${artifact.artifactId}-${artifact.version}${dashClassifier?}.${artifact.extension}</outputFileNameMapping>
<excludes>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
<!-- All these dependencies are already included in netty-all -->
<exclude>io.netty:netty-buffer</exclude>
<exclude>io.netty:netty-codec</exclude>
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index 7f523db..289916d 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -276,6 +276,25 @@ Apache Software License, Version 2.
- lib/net.jpountz.lz4-lz4-1.3.0.jar [38]
- lib/org.codehaus.jackson-jackson-core-asl-1.9.11.jar [39]
- lib/org.codehaus.jackson-jackson-mapper-asl-1.9.11.jar [40]
+- lib/com.google.api.grpc-proto-google-common-protos-0.1.9.jar [41]
+- lib/com.google.code.gson-gson-2.7.jar [42]
+- lib/com.google.instrumentation-instrumentation-api-0.4.3.jar [43]
+- lib/com.squareup.okhttp-okhttp-2.5.0.jar [44]
+- lib/com.squareup.okio-okio-1.6.0.jar [45]
+- lib/io.grpc-grpc-all-1.5.0.jar [46]
+- lib/io.grpc-grpc-auth-1.5.0.jar [46]
+- lib/io.grpc-grpc-context-1.5.0.jar [46]
+- lib/io.grpc-grpc-core-1.5.0.jar [46]
+- lib/io.grpc-grpc-netty-1.5.0.jar [46]
+- lib/io.grpc-grpc-okhttp-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-lite-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-nano-1.5.0.jar [46]
+- lib/io.grpc-grpc-stub-1.5.0.jar [46]
+- lib/org.apache.curator-curator-client-4.0.1.jar [47]
+- lib/org.apache.curator-curator-framework-4.0.1.jar [47]
+- lib/org.apache.curator-curator-recipes-4.0.1.jar [47]
+- lib/org.inferred-freebuilder-1.14.9.jar [48]
[1] Source available at
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
[2] Source available at
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -316,6 +335,15 @@ Apache Software License, Version 2.
[38] Source available at https://github.com/lz4/lz4-java/tree/1.3.0
[39] Source available at https://github.com/codehaus/jackson/tree/1.9
[40] Source available at https://github.com/codehaus/jackson/tree/1.9
+[41] Source available at https://github.com/googleapis/googleapis
+[42] Source available at https://github.com/google/gson/tree/gson-parent-2.7
+[43] Source available at
https://github.com/census-instrumentation/opencensus-java/tree/v0.4.3
+[44] Source available at https://github.com/square/okhttp/tree/parent-2.5.0
+[45] Source available at https://github.com/square/okio/tree/okio-parent-1.6.0
+[46] Source available at https://github.com/grpc/grpc-java/tree/v1.5.0
+[47] Source available at
https://github.com/apache/curator/tree/apache-curator-4.0.1
+[48] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
+
------------------------------------------------------------------------------------
lib/io.netty-netty-3.10.1.Final.jar contains the extensions to Java
Collections Framework which has
@@ -459,10 +487,20 @@ Bundled as lib/com.google.code.findbugs-jsr305-3.0.2.jar
Source available at
https://storage.googleapis.com/google-code-archive-source/v2/code.google.com/jsr-305/source-archive.zip
------------------------------------------------------------------------------------
This product bundles Google Protocal Buffers, which is available under a
"3-clause BSD"
-license. For details, see deps/protobuf-3.4.0/LICENSE.
+license.
Bundled as lib/com.google.protobuf-protobuf-java-3.4.0.jar
Source available at https://github.com/google/protobuf/tree/v3.4.0
+For details, see deps/protobuf-3.4.0/LICENSE.
+
+Bundled as lib/com.google.protobuf.nano-protobuf-javanano-3.0.0-alpha-5.jar
+Source available at https://github.com/google/protobuf/tree/3.0.0-pre
+For details, see deps/protobuf-3.0.0/LICENSE.
+
+Bundled as com.google.protobuf-protobuf-java-util-3.3.1.jar
+Source available at https://github.com/google/protobuf/tree/v3.3.1
+For details, see deps/protobuf-3.3.1/LICENSE.
+
------------------------------------------------------------------------------------
This product bundles Paranamer, which is available under a "3-clause BSD"
license.
For details, see deps/paranamer-2.8/LICENSE.txt.
@@ -501,4 +539,11 @@ Bundled as
- lib/org.slf4j-slf4j-api-1.7.25.jar
- lib/org.slf4j-slf4j-log4j12-1.7.25.jar
Source available at https://github.com/qos-ch/slf4j/tree/v_1.7.25
+------------------------------------------------------------------------------------
+This product bundles the Google Auth Library, which is available under a
"3-clause BSD"
+license. For details, see deps/google-auth-library-credentials-0.4.0/LICENSE
+
+Bundled as
+ - lib/com.google.auth-google-auth-library-credentials-0.4.0.jar
+Source available at
https://github.com/google/google-auth-library-java/tree/0.4.0
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index 1f437ab..168f8fd 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -241,6 +241,25 @@ Apache Software License, Version 2.
- lib/net.jpountz.lz4-lz4-1.3.0.jar [25]
- lib/org.codehaus.jackson-jackson-core-asl-1.9.11.jar [26]
- lib/org.codehaus.jackson-jackson-mapper-asl-1.9.11.jar [27]
+- lib/com.google.api.grpc-proto-google-common-protos-0.1.9.jar [28]
+- lib/com.google.code.gson-gson-2.7.jar [29]
+- lib/com.google.instrumentation-instrumentation-api-0.4.3.jar [30]
+- lib/com.squareup.okhttp-okhttp-2.5.0.jar [31]
+- lib/com.squareup.okio-okio-1.6.0.jar [32]
+- lib/io.grpc-grpc-all-1.5.0.jar [33]
+- lib/io.grpc-grpc-auth-1.5.0.jar [33]
+- lib/io.grpc-grpc-context-1.5.0.jar [33]
+- lib/io.grpc-grpc-core-1.5.0.jar [33]
+- lib/io.grpc-grpc-netty-1.5.0.jar [33]
+- lib/io.grpc-grpc-okhttp-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-lite-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-nano-1.5.0.jar [33]
+- lib/io.grpc-grpc-stub-1.5.0.jar [33]
+- lib/org.apache.curator-curator-client-4.0.1.jar [34]
+- lib/org.apache.curator-curator-framework-4.0.1.jar [34]
+- lib/org.apache.curator-curator-recipes-4.0.1.jar [34]
+- lib/org.inferred-freebuilder-1.14.9.jar [35]
[1] Source available at
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
[2] Source available at
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -269,6 +288,14 @@ Apache Software License, Version 2.
[25] Source available at https://github.com/lz4/lz4-java/tree/1.3.0
[26] Source available at https://github.com/codehaus/jackson/tree/1.9
[27] Source available at https://github.com/codehaus/jackson/tree/1.9
+[28] Source available at https://github.com/googleapis/googleapis
+[29] Source available at https://github.com/google/gson/tree/gson-parent-2.7
+[30] Source available at
https://github.com/census-instrumentation/opencensus-java/tree/v0.4.3
+[31] Source available at https://github.com/square/okhttp/tree/parent-2.5.0
+[32] Source available at https://github.com/square/okio/tree/okio-parent-1.6.0
+[33] Source available at https://github.com/grpc/grpc-java/tree/v1.5.0
+[34] Source available at
https://github.com/apache/curator/tree/apache-curator-4.0.1
+[35] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
------------------------------------------------------------------------------------
lib/io.netty-netty-all-4.1.12.Final.jar bundles some 3rd party dependencies
@@ -372,10 +399,19 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
------------------------------------------------------------------------------------
This product bundles Google Protocal Buffers, which is available under a
"3-clause BSD"
-license. For details, see deps/protobuf-3.4.0/LICENSE.
+license.
Bundled as lib/com.google.protobuf-protobuf-java-3.4.0.jar
Source available at https://github.com/google/protobuf/tree/v3.4.0
+For details, see deps/protobuf-3.4.0/LICENSE.
+
+Bundled as lib/com.google.protobuf.nano-protobuf-javanano-3.0.0-alpha-5.jar
+Source available at https://github.com/google/protobuf/tree/3.0.0-pre
+For details, see deps/protobuf-3.0.0/LICENSE.
+
+Bundled as com.google.protobuf-protobuf-java-util-3.3.1.jar
+Source available at https://github.com/google/protobuf/tree/v3.3.1
+For details, see deps/protobuf-3.3.1/LICENSE.
------------------------------------------------------------------------------------
This product bundles the JCP Standard Java Servlet API, which is available
under a
CDDL 1.1 license. For details, see deps/javax.servlet-api-3.1.0/CDDL+GPL-1.1.
@@ -390,4 +426,10 @@ Bundled as
- lib/org.slf4j-slf4j-api-1.7.25.jar
- lib/org.slf4j-slf4j-log4j12-1.7.25.jar
Source available at https://github.com/qos-ch/slf4j/tree/v_1.7.25
+------------------------------------------------------------------------------------
+This product bundles the Google Auth Library, which is available under a
"3-clause BSD"
+license. For details, see deps/google-auth-library-credentials-0.4.0/LICENSE
+Bundled as
+ - lib/com.google.auth-google-auth-library-credentials-0.4.0.jar
+Source available at
https://github.com/google/google-auth-library-java/tree/0.4.0
diff --git
a/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
b/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
new file mode 100644
index 0000000..12edf23
--- /dev/null
+++
b/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
@@ -0,0 +1,28 @@
+Copyright 2014, Google Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
b/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
new file mode 100644
index 0000000..2dcab42
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
@@ -0,0 +1,32 @@
+Copyright 2014, Google Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it. This code is not
+standalone and requires a support library to be linked with it. This
+support library is itself covered by the above license.
diff --git a/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
b/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
new file mode 100644
index 0000000..2dcab42
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
@@ -0,0 +1,32 @@
+Copyright 2014, Google Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it. This code is not
+standalone and requires a support library to be linked with it. This
+support library is itself covered by the above license.
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 1be3fe3..104a3d1 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -92,7 +92,10 @@ bookiePort=3181
# Configure a list of server components to enable and load on a bookie server.
# This provides the plugin run extra services along with a bookie server.
#
-# extraServerComponents=
+# NOTE: if bookie fails to load any of extra components configured below,
bookie will continue
+# function by ignoring the components configured below.
+#
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
+extraServerComponents=
#############################################################################
## Thread settings
@@ -888,3 +891,26 @@ zkEnableSecurity=false
# The time to backoff when replication worker encounters exceptions on
replicating a ledger, in milliseconds.
# rwRereplicateBackoffMs=5000
+
+
+##################################################################
+##################################################################
+# Settings below are used by stream/table service
+##################################################################
+##################################################################
+
+### Grpc Server ###
+
+# the grpc server port to listen on. default is 4181
+storageserver.grpc.port=4181
+
+### Storage ###
+
+# local storage directories for storing table ranges data (e.g. rocksdb sst
files)
+storage.range.store.dirs=data/bookkeeper/ranges
+
+# whether the storage server capable of serving readonly tables. default is
false.
+storage.serve.readonly.tables=false
+
+# the cluster controller schedule interval, in milliseconds. default is 30
seconds.
+storage.cluster.controller.schedule.interval.ms=30000
diff --git a/pom.xml b/pom.xml
index fa25247..a33fb7f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
<curator.version>4.0.1</curator.version>
<dropwizard.version>3.1.0</dropwizard.version>
<finagle.version>6.44.0</finagle.version>
- <freebuilder.version>1.12.3</freebuilder.version>
+ <freebuilder.version>1.14.9</freebuilder.version>
<google.code.version>3.0.2</google.code.version>
<google.errorprone.version>2.1.2</google.errorprone.version>
<grpc.version>1.5.0</grpc.version>
@@ -362,6 +362,12 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- rocksdb dependencies -->
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 63e2679..11bc8af 100644
---
a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.net.BindException;
import java.net.URI;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -43,20 +42,15 @@ import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
-import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.server.StorageServer;
import org.apache.bookkeeper.stream.storage.StorageConstants;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
-import
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LocalDLMEmulator;
-import org.apache.zookeeper.KeeperException;
/**
* A Cluster that runs a few storage nodes.
@@ -141,28 +135,9 @@ public class StreamCluster
}
private void initializeCluster() throws Exception {
- try (CuratorFramework client = CuratorFrameworkFactory.newClient(
- zkEnsemble,
- new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
- )) {
- client.start();
-
- ZkClusterMetadataStore store = new ZkClusterMetadataStore(client,
zkEnsemble, ZK_METADATA_ROOT_PATH);
-
- ClusterMetadata metadata;
- try {
- metadata = store.getClusterMetadata();
- log.info("Loaded cluster metadata : \n{}", metadata);
- } catch (StorageRuntimeException sre) {
- if (sre.getCause() instanceof KeeperException.NoNodeException)
{
- log.info("Initializing the stream cluster.");
- store.initializeCluster(spec.numServers() * 2);
- log.info("Successfully initialized the stream cluster :
\n{}", store.getClusterMetadata());
- } else {
- throw sre;
- }
- }
- }
+ new ZkClusterInitializer(zkEnsemble).initializeCluster(
+ URI.create("zk://" + zkEnsemble),
+ spec.numServers() * 2);
// format the bookkeeper cluster
MetadataDrivers.runFunctionWithMetadataBookieDriver(newBookieConfiguration(zkEnsemble),
driver -> {
@@ -210,11 +185,10 @@ public class StreamCluster
log.info("Attempting to start storage server at (bookie port =
{}, grpc port = {})"
+ " : bkDir = {}, rangesStoreDir = {},
serveReadOnlyTables = {}",
bookiePort, grpcPort, bkDir, rangesStoreDir,
spec.serveReadOnlyTable);
- server = StorageServer.startStorageServer(
+ server = StorageServer.buildStorageServer(
serverConf,
grpcPort,
- spec.numServers() * 2,
- Optional.empty());
+ spec.numServers() * 2);
server.start();
log.info("Started storage server at (bookie port = {}, grpc
port = {})",
bookiePort, grpcPort);
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index d5ca397..e0b87ef 100644
---
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -13,6 +13,7 @@
*/
package org.apache.bookkeeper.stream.server;
+import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
import com.beust.jcommander.JCommander;
@@ -21,15 +22,16 @@ import java.io.File;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.common.component.LifecycleComponentStack;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import
org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
@@ -37,6 +39,7 @@ import
org.apache.bookkeeper.stream.server.conf.DLConfiguration;
import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
import org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec;
import org.apache.bookkeeper.stream.server.service.BookieService;
+import org.apache.bookkeeper.stream.server.service.BookieWatchService;
import org.apache.bookkeeper.stream.server.service.ClusterControllerService;
import org.apache.bookkeeper.stream.server.service.CuratorProviderService;
import org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService;
@@ -58,6 +61,7 @@ import
org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.distributedlog.DistributedLogConfiguration;
/**
* A storage server is a server that run storage service and serving rpc
requests.
@@ -71,7 +75,7 @@ public class StorageServer {
private String serverConfigFile;
@Parameter(names = {"-p", "--port"}, description = "Port to listen on
for gPRC server")
- private int port = 3182;
+ private int port = 4181;
@Parameter(names = {"-h", "--help"}, description = "Show this help
message")
private boolean help = false;
@@ -137,11 +141,10 @@ public class StorageServer {
LifecycleComponent storageServer;
try {
- storageServer = startStorageServer(
+ storageServer = buildStorageServer(
conf,
grpcPort,
- 1024,
- Optional.empty());
+ 1024);
} catch (ConfigurationException e) {
log.error("Invalid storage configuration", e);
return ExitCode.INVALID_CONF.code();
@@ -164,11 +167,23 @@ public class StorageServer {
return ExitCode.OK.code();
}
- public static LifecycleComponent startStorageServer(CompositeConfiguration
conf,
+ public static LifecycleComponent buildStorageServer(CompositeConfiguration
conf,
+ int grpcPort,
+ int
numStorageContainers)
+ throws UnknownHostException, ConfigurationException {
+ return buildStorageServer(conf, grpcPort, numStorageContainers, true,
NullStatsLogger.INSTANCE);
+ }
+
+ public static LifecycleComponent buildStorageServer(CompositeConfiguration
conf,
int grpcPort,
int
numStorageContainers,
- Optional<String>
instanceName)
+ boolean
startBookieAndStartProvider,
+ StatsLogger
externalStatsLogger)
throws ConfigurationException, UnknownHostException {
+
+ LifecycleComponentStack.Builder serverBuilder =
LifecycleComponentStack.newBuilder()
+ .withName("storage-server");
+
BookieConfiguration bkConf = BookieConfiguration.of(conf);
bkConf.validate();
@@ -188,42 +203,47 @@ public class StorageServer {
StorageResources storageResources = StorageResources.create();
// Create the stats provider
- StatsProviderService statsProviderService = new
StatsProviderService(bkConf);
- StatsLogger rootStatsLogger =
statsProviderService.getStatsProvider().getStatsLogger("");
+ StatsLogger rootStatsLogger;
+ if (startBookieAndStartProvider) {
+ StatsProviderService statsProviderService = new
StatsProviderService(bkConf);
+ rootStatsLogger =
statsProviderService.getStatsProvider().getStatsLogger("");
+ serverBuilder.addComponent(statsProviderService);
+ } else {
+ rootStatsLogger = checkNotNull(externalStatsLogger,
+ "External stats logger is not provided while not starting
stats provider");
+ }
// Create the bookie service
- BookieService bookieService = new BookieService(bkConf,
rootStatsLogger);
+ ServerConfiguration bkServerConf;
+ if (startBookieAndStartProvider) {
+ BookieService bookieService = new BookieService(bkConf,
rootStatsLogger);
+ serverBuilder.addComponent(bookieService);
+ bkServerConf = bookieService.serverConf();
+ } else {
+ bkServerConf = new ServerConfiguration();
+ bkServerConf.loadConf(bkConf.getUnderlyingConf());
+ }
+
+ // Create the bookie watch service
+ BookieWatchService bkWatchService;
+ {
+ DistributedLogConfiguration dlogConf = new
DistributedLogConfiguration();
+ bkWatchService = new BookieWatchService(
+ dlogConf.getEnsembleSize(),
+ bkConf,
+ NullStatsLogger.INSTANCE);
+ }
// Create the curator provider service
CuratorProviderService curatorProviderService = new
CuratorProviderService(
- bookieService.serverConf(), dlConf,
rootStatsLogger.scope("curator"));
+ bkServerConf, dlConf, rootStatsLogger.scope("curator"));
// Create the distributedlog namespace service
DLNamespaceProviderService dlNamespaceProvider = new
DLNamespaceProviderService(
- bookieService.serverConf(),
+ bkServerConf,
dlConf,
rootStatsLogger.scope("dlog"));
- // Create a registration service provider
- RegistrationServiceProvider regService = new
RegistrationServiceProvider(
- bookieService.serverConf(),
- dlConf,
- rootStatsLogger.scope("registration").scope("provider"));
-
- // Create a cluster controller service
- ClusterControllerService clusterControllerService = new
ClusterControllerService(
- storageConf,
- () -> new ClusterControllerImpl(
- new ZkClusterMetadataStore(
- curatorProviderService.get(),
-
ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
- ZK_METADATA_ROOT_PATH),
- regService.get(),
- new DefaultStorageContainerController(),
- new
ZkClusterControllerLeaderSelector(curatorProviderService.get(),
ZK_METADATA_ROOT_PATH),
- storageConf),
- rootStatsLogger.scope("cluster_controller"));
-
// Create range (stream) store
RangeStoreBuilder rangeStoreBuilder = RangeStoreBuilder.newBuilder()
.withStatsLogger(rootStatsLogger.scope("storage"))
@@ -241,7 +261,7 @@ public class StorageServer {
storageConf,
new ZkClusterMetadataStore(
curatorProviderService.get(),
-
ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
+ ZKMetadataDriverBase.resolveZkServers(bkServerConf),
ZK_METADATA_ROOT_PATH),
registry,
rootStatsLogger.scope("sc").scope("manager")))
@@ -267,26 +287,44 @@ public class StorageServer {
GrpcService grpcService = new GrpcService(
serverConf, serverSpec, rpcStatsLogger);
+ // Create a registration service provider
+ RegistrationServiceProvider regService = new
RegistrationServiceProvider(
+ bkServerConf,
+ dlConf,
+ rootStatsLogger.scope("registration").scope("provider"));
+
// Create a registration state service only when service is ready.
RegistrationStateService regStateService = new
RegistrationStateService(
myEndpoint,
- bookieService.serverConf(),
+ bkServerConf,
bkConf,
regService,
rootStatsLogger.scope("registration"));
+ // Create a cluster controller service
+ ClusterControllerService clusterControllerService = new
ClusterControllerService(
+ storageConf,
+ () -> new ClusterControllerImpl(
+ new ZkClusterMetadataStore(
+ curatorProviderService.get(),
+ ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+ ZK_METADATA_ROOT_PATH),
+ regService.get(),
+ new DefaultStorageContainerController(),
+ new
ZkClusterControllerLeaderSelector(curatorProviderService.get(),
ZK_METADATA_ROOT_PATH),
+ storageConf),
+ rootStatsLogger.scope("cluster_controller"));
+
// Create all the service stack
- return LifecycleComponentStack.newBuilder()
- .withName("storage-server")
- .addComponent(statsProviderService) // stats provider
- .addComponent(bookieService) // bookie server
+ return serverBuilder
+ .addComponent(bkWatchService) // service that watches
bookies
.addComponent(curatorProviderService) // service that provides
curator client
.addComponent(dlNamespaceProvider) // service that provides
dl namespace
- .addComponent(regService) // service that provides
registration client
- .addComponent(clusterControllerService) // service that run
cluster controller service
.addComponent(storageService) // range (stream) store
.addComponent(grpcService) // range (stream) server
(gRPC)
+ .addComponent(regService) // service that provides
registration client
.addComponent(regStateService) // service that manages
server state
+ .addComponent(clusterControllerService) // service that run
cluster controller service
.build();
}
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
new file mode 100644
index 0000000..5e6b0b5
--- /dev/null
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.server;
+
+import java.net.UnknownHostException;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
+import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+
+/**
+ * This is a {@link ServerLifecycleComponent} to allow run stream storage
component as part of bookie server.
+ */
+public class StreamStorageLifecycleComponent extends ServerLifecycleComponent {
+
+ private final LifecycleComponent streamStorage;
+
+ public StreamStorageLifecycleComponent(BookieConfiguration conf,
StatsLogger statsLogger)
+ throws UnknownHostException, ConfigurationException {
+ super("stream-storage", conf, statsLogger);
+
+ StorageServerConfiguration ssConf =
StorageServerConfiguration.of(conf.getUnderlyingConf());
+
+ this.streamStorage = StorageServer.buildStorageServer(
+ conf.getUnderlyingConf(),
+ ssConf.getGrpcPort(),
+ 1024, /* indicator */
+ false,
+ statsLogger.scope("stream"));
+ }
+
+ @Override
+ protected void doStart() {
+ this.streamStorage.start();
+ }
+
+ @Override
+ protected void doStop() {
+ this.streamStorage.stop();
+ }
+
+ @Override
+ protected void doClose() {
+ this.streamStorage.close();
+ }
+}
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
index b6803f2..e3c24c5 100644
---
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
@@ -21,13 +21,11 @@ import
org.apache.commons.configuration.CompositeConfiguration;
*/
public class BookieConfiguration extends ComponentConfiguration {
- private static final String COMPONENT_PREFIX = "bookie" + DELIMITER;
-
public static BookieConfiguration of(CompositeConfiguration conf) {
return new BookieConfiguration(conf);
}
protected BookieConfiguration(CompositeConfiguration conf) {
- super(conf, COMPONENT_PREFIX);
+ super(conf, "");
}
}
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
index a0ef963..bc300c9 100644
---
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
@@ -24,6 +24,8 @@ public class StorageServerConfiguration extends
ComponentConfiguration {
private static final String COMPONENT_PREFIX = "storageserver" + DELIMITER;
+ private static final String GRPC_PORT = "grpc.port";
+
public static StorageServerConfiguration of(CompositeConfiguration conf) {
return new StorageServerConfiguration(conf);
}
@@ -31,4 +33,13 @@ public class StorageServerConfiguration extends
ComponentConfiguration {
private StorageServerConfiguration(CompositeConfiguration conf) {
super(conf, COMPONENT_PREFIX);
}
+
+ /**
+ * Returns the grpc port that serves requests coming into the stream
storage server.
+ *
+ * @return grpc port
+ */
+ public int getGrpcPort() {
+ return getInt(GRPC_PORT, 4181);
+ }
}
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
new file mode 100644
index 0000000..cfac0da
--- /dev/null
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.server.service;
+
+import com.google.common.base.Stopwatch;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
+
+/**
+ * A service that watches bookies and wait for minimum number of bookies to be
alive.
+ */
+@Slf4j
+public class BookieWatchService
+ extends AbstractLifecycleComponent<BookieConfiguration> {
+
+ private final int minNumBookies;
+
+ public BookieWatchService(int minNumBookies,
+ BookieConfiguration conf,
+ StatsLogger statsLogger) {
+ super("bookie-watcher", conf, statsLogger);
+ this.minNumBookies = minNumBookies;
+ }
+
+ @Override
+ protected void doStart() {
+ ClientConfiguration clientConf = new ClientConfiguration();
+ clientConf.loadConf(conf.getUnderlyingConf());
+
+ @Cleanup("shutdown") ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+
+ try {
+ MetadataDrivers.runFunctionWithMetadataClientDriver(clientConf,
clientDriver -> {
+ try {
+ waitingForNumBookies(clientDriver.getRegistrationClient(),
minNumBookies);
+ } catch (Exception e) {
+ log.error("Encountered exceptions on waiting {} bookies to
be alive", minNumBookies);
+ throw new RuntimeException("Encountered exceptions on
waiting "
+ + minNumBookies + " bookies to be alive", e);
+ }
+ return (Void) null;
+ }, executorService);
+ } catch (MetadataException | ExecutionException e) {
+ throw new RuntimeException("Failed to start bookie watch service",
e);
+ }
+ }
+
+ private static void waitingForNumBookies(RegistrationClient client, int
minNumBookies) throws Exception {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Set<BookieSocketAddress> bookies =
FutureUtils.result(client.getWritableBookies()).getValue();
+ while (bookies.size() < minNumBookies) {
+ TimeUnit.SECONDS.sleep(1);
+ bookies =
FutureUtils.result(client.getWritableBookies()).getValue();
+ log.info("Only {} bookies are live since {} seconds elapsed, wait
for another 1 second",
+ bookies.size(), stopwatch.elapsed(TimeUnit.SECONDS));
+ }
+ }
+
+ @Override
+ protected void doStop() {}
+
+ @Override
+ protected void doClose() {}
+
+}
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
index 7d404f0..9bd6ab4 100644
---
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.stream.server.service;
import java.io.IOException;
import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
@@ -33,6 +34,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* A service to provide a curator client.
*/
+@Slf4j
public class CuratorProviderService
extends AbstractLifecycleComponent<DLConfiguration>
implements Supplier<CuratorFramework> {
@@ -57,6 +59,7 @@ public class CuratorProviderService
@Override
protected void doStart() {
curatorClient.start();
+ log.info("Provided curator clients to zookeeper {}.", zkServers);
}
@Override
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index e0359ce..4550df5 100644
---
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -69,6 +69,7 @@ public class DLNamespaceProviderService
}
private final ServerConfiguration bkServerConf;
+ @Getter
private final DistributedLogConfiguration dlConf;
private final DynamicDistributedLogConfiguration dlDynConf;
@Getter
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
index 8db59fd..f1cda91 100644
---
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
@@ -105,6 +105,8 @@ public class RegistrationServiceProvider
}
}
+
+
@Override
protected void doStop() {
}
diff --git
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
new file mode 100644
index 0000000..61e5fa3
--- /dev/null
+++
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+import java.net.URI;
+
+/**
+ * Initializing cluster metadata.
+ */
+public interface ClusterInitializer {
+
+ /**
+ * Retrieves whether the initializer thinks that it can initialize the
metadata service
+ * specified by the given {@code metadataServiceUri}. Typically the
implementations will
+ * return <tt>true</tt> if they understand the subprotocol specified in
the URI and
+ * <tt>false</tt> if they do not.
+ *
+ * @param metatadataServiceUri the metadata service uri
+ * @return <tt>true</tt> if the implementation understands the given URI;
<tt>false</tt> otherwise.
+ */
+ boolean acceptsURI(URI metatadataServiceUri);
+
+ /**
+ * Create a new cluster under metadata service specified by {@code
metadataServiceUri}.
+ *
+ * @param metadataServiceUri metadata service uri
+ * @param numStorageContainers number storage containers
+ */
+ void initializeCluster(URI metadataServiceUri, int numStorageContainers);
+
+}
diff --git
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
index 935552a..d46f633 100644
---
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
+++
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.stream.storage.api.cluster;
+import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
@@ -29,12 +30,18 @@ import
org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
*/
public interface ClusterMetadataStore extends AutoCloseable {
+
+ default void initializeCluster(int numStorageContainers) {
+ initializeCluster(numStorageContainers, Optional.empty());
+ }
+
/**
* Initialize the cluster metadata with the provided
<i>numStorageContainers</i>.
*
* @param numStorageContainers number of storage containers.
+ * @param segmentStorePath segment store path
*/
- void initializeCluster(int numStorageContainers);
+ void initializeCluster(int numStorageContainers, Optional<String>
segmentStorePath);
/**
* Get the current cluster assignment data.
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index a75403c..03870c2 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -27,11 +27,6 @@
<artifactId>stream-storage-service-impl</artifactId>
<name>Apache BookKeeper :: Stream Storage :: Storage :: Impl</name>
- <properties>
- <!-- dependencies -->
- <helix-core.version>0.6.7</helix-core.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
@@ -53,11 +48,6 @@
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.helix</groupId>
- <artifactId>helix-core</artifactId>
- <version>${helix-core.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-core</artifactId>
<version>${project.parent.version}</version>
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
index da74867..4313ce1 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
@@ -20,11 +20,10 @@ package org.apache.bookkeeper.stream.storage.impl.cluster;
import com.google.common.collect.Maps;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
-import lombok.AccessLevel;
import lombok.Data;
-import lombok.Getter;
import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
@@ -55,7 +54,8 @@ public class InMemClusterMetadataStore implements
ClusterMetadataStore {
}
@Override
- public synchronized void initializeCluster(int numStorageContainers) {
+ public synchronized void initializeCluster(int numStorageContainers,
+ Optional<String>
segmentStorePath) {
this.metadata = ClusterMetadata.newBuilder()
.setNumStorageContainers(numStorageContainers)
.build();
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
new file mode 100644
index 0000000..832f15d
--- /dev/null
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static
org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ZooKeeper Based Cluster Initializer.
+ */
+@Slf4j
+public class ZkClusterInitializer implements ClusterInitializer {
+
+ private final String zkExternalConnectString;
+
+ public ZkClusterInitializer(String zkServers) {
+ this.zkExternalConnectString = zkServers;
+ }
+
+ @Override
+ public boolean acceptsURI(URI metadataServiceUri) {
+ return metadataServiceUri.getScheme().toLowerCase().startsWith("zk");
+ }
+
+ @Override
+ public void initializeCluster(URI metadataServiceUri, int
numStorageContainers) {
+ String zkInternalConnectString =
ZKMetadataDriverBase.getZKServersFromServiceUri(metadataServiceUri);
+ // 1) `zkExternalConnectString` are the public endpoints, where the
tool can interact with.
+ // It allows the tools running outside of the cluster. It is useful
for being used in dockerized environment.
+ // 2) `zkInternalConnectString` are the internal endpoints, where the
services can interact with.
+ // It is used by dlog to bind a namespace.
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+ zkExternalConnectString,
+ new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
+ )) {
+ client.start();
+
+ ZkClusterMetadataStore store =
+ new ZkClusterMetadataStore(client, zkInternalConnectString,
ZK_METADATA_ROOT_PATH);
+
+ ClusterMetadata metadata;
+ try {
+ metadata = store.getClusterMetadata();
+ log.info("Loaded cluster metadata : \n{}", metadata);
+ } catch (StorageRuntimeException sre) {
+ if (sre.getCause() instanceof KeeperException.NoNodeException)
{
+ log.info("Initializing the stream cluster with {} storage
containers with segment store path {}.",
+ numStorageContainers);
+
+ String ledgersPath = metadataServiceUri.getPath();
+ Optional<String> segmentStorePath;
+ if (Strings.isNullOrEmpty(ledgersPath) || "/" ==
ledgersPath) {
+ segmentStorePath = Optional.empty();
+ } else {
+ segmentStorePath = Optional.of(ledgersPath);
+ }
+
+ store.initializeCluster(numStorageContainers,
segmentStorePath);
+ log.info("Successfully initialized the stream cluster :
\n{}", store.getClusterMetadata());
+ } else {
+ throw sre;
+ }
+ }
+ }
+
+ }
+}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
index bcf70c3..59af968 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
@@ -29,6 +29,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@@ -92,7 +93,7 @@ public class ZkClusterMetadataStore implements
ClusterMetadataStore {
}
@Override
- public void initializeCluster(int numStorageContainers) {
+ public void initializeCluster(int numStorageContainers, Optional<String>
segmentStorePath) {
ClusterMetadata metadata = ClusterMetadata.newBuilder()
.setNumStorageContainers(numStorageContainers)
.build();
@@ -101,7 +102,7 @@ public class ZkClusterMetadataStore implements
ClusterMetadataStore {
try {
// we are using dlog for the storage backend, so we need to
initialize the dlog namespace
BKDLConfig dlogConfig = new BKDLConfig(
- zkServers, getSegmentsRootPath(zkRootPath));
+ zkServers,
segmentStorePath.orElse(getSegmentsRootPath(zkRootPath)));
DLMetadata dlogMetadata = DLMetadata.create(dlogConfig);
client.transaction()
diff --git
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
index 4d2828b..73d6e17 100644
---
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.Collections;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -82,7 +83,7 @@ public class ClusterControllerLeaderImplTest {
ClusterMetadataStore originalStore = metadataStore;
this.metadataStore = new ClusterMetadataStore() {
@Override
- public void initializeCluster(int numStorageContainers) {
+ public void initializeCluster(int numStorageContainers,
Optional<String> segmentStorePath) {
originalStore.initializeCluster(numStorageContainers);
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
index 874ec2a..d70b4f2 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
@@ -20,11 +20,14 @@ package org.apache.bookkeeper.tests.containers;
import static java.time.temporal.ChronoUnit.SECONDS;
+import com.google.common.base.Strings;
import java.net.URI;
import java.time.Duration;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.DockerUtils;
import org.apache.bookkeeper.tests.containers.wait.HttpWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
/**
* Test Container for Bookies.
@@ -40,13 +43,16 @@ public class BookieContainer<SELF extends
BookieContainer<SELF>> extends ChaosCo
private final String hostname;
private final String metadataServiceUri;
+ private final String extraServerComponents;
public BookieContainer(String clusterName,
String hostname,
- String metadataServiceUri) {
+ String metadataServiceUri,
+ String extraServerComponents) {
super(clusterName, IMAGE_NAME);
this.hostname = hostname;
this.metadataServiceUri = metadataServiceUri;
+ this.extraServerComponents = extraServerComponents;
}
@Override
@@ -54,6 +60,14 @@ public class BookieContainer<SELF extends
BookieContainer<SELF>> extends ChaosCo
return clusterName + "-" + hostname;
}
+ public String getExternalGrpcEndpointStr() {
+ return getContainerIpAddress() + ":" + getMappedPort(BOOKIE_GRPC_PORT);
+ }
+
+ public String getInternalGrpcEndpointStr() {
+ return DockerUtils.getContainerIP(dockerClient, containerId) + ":" +
BOOKIE_GRPC_PORT;
+ }
+
@Override
protected void configure() {
addExposedPorts(
@@ -65,20 +79,28 @@ public class BookieContainer<SELF extends
BookieContainer<SELF>> extends ChaosCo
addEnv("BK_httpServerPort", "" + BOOKIE_HTTP_PORT);
addEnv("BK_metadataServiceUri", metadataServiceUri);
addEnv("BK_useHostNameAsBookieID", "true");
+ addEnv("BK_extraServerComponents", extraServerComponents);
if (metadataServiceUri.toLowerCase().startsWith("zk")) {
URI uri = URI.create(metadataServiceUri);
addEnv("BK_zkServers", uri.getAuthority());
addEnv("BK_zkLedgersRootPath", uri.getPath());
}
+ // grpc port
+ addEnv("BK_storageserver.grpc.port", "" + BOOKIE_GRPC_PORT);
}
@Override
public void start() {
- this.waitStrategy = new HttpWaitStrategy()
- .forPath("/heartbeat")
- .forStatusCode(200)
- .forPort(BOOKIE_HTTP_PORT)
- .withStartupTimeout(Duration.of(60, SECONDS));
+ if (Strings.isNullOrEmpty(extraServerComponents)) {
+ this.waitStrategy = new HttpWaitStrategy()
+ .forPath("/heartbeat")
+ .forStatusCode(200)
+ .forPort(BOOKIE_HTTP_PORT)
+ .withStartupTimeout(Duration.of(60, SECONDS));
+ } else {
+ this.waitStrategy = new HostPortWaitStrategy()
+ .withStartupTimeout(Duration.of(300, SECONDS));
+ }
this.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(hostname);
createContainerCmd.withName(getContainerName());
diff --git
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
index 36d5d86..ebc0c87 100644
---
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
+++
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.tests.integration.topologies.BKCluster;
+import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -50,15 +51,31 @@ public abstract class BookKeeperClusterTestBase {
@BeforeClass
public static void setupCluster() throws Exception {
- bkCluster = new BKCluster(RandomStringUtils.randomAlphabetic(8), 0);
+ BKClusterSpec spec = BKClusterSpec.builder()
+ .clusterName(RandomStringUtils.randomAlphabetic(8))
+ .numBookies(0)
+ .build();
+
+ setupCluster(spec);
+ }
+
+ protected static void setupCluster(BKClusterSpec spec) throws Exception {
+ log.info("Setting up cluster {} with {} bookies :
extraServerComponents = {}",
+ spec.clusterName(), spec.numBookies(),
spec.extraServerComponents());
+
+ bkCluster = BKCluster.forSpec(spec);
bkCluster.start();
+ log.info("Cluster {} is setup", spec.clusterName());
+
metadataServiceUri = URI.create(bkCluster.getExternalServiceUri());
ClientConfiguration conf = new ClientConfiguration()
.setMetadataServiceUri(metadataServiceUri.toString());
executor = Executors.newSingleThreadScheduledExecutor();
metadataClientDriver =
MetadataDrivers.getClientDriver(metadataServiceUri);
metadataClientDriver.initialize(conf, executor,
NullStatsLogger.INSTANCE, Optional.empty());
+
+ log.info("Initialized metadata client driver : {}",
metadataServiceUri);
}
@AfterClass
@@ -74,7 +91,7 @@ public abstract class BookKeeperClusterTestBase {
}
}
- private boolean findIfBookieRegistered(String bookieName) throws Exception
{
+ private static boolean findIfBookieRegistered(String bookieName) throws
Exception {
Set<BookieSocketAddress> bookies =
FutureUtils.result(metadataClientDriver.getRegistrationClient().getWritableBookies()).getValue();
Optional<BookieSocketAddress> registered =
@@ -82,7 +99,7 @@ public abstract class BookKeeperClusterTestBase {
return registered.isPresent();
}
- protected void waitUntilBookieUnregistered(String bookieName) throws
Exception {
+ protected static void waitUntilBookieUnregistered(String bookieName)
throws Exception {
Stopwatch sw = Stopwatch.createStarted();
while (findIfBookieRegistered(bookieName)) {
TimeUnit.MILLISECONDS.sleep(1000);
diff --git
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
similarity index 83%
rename from
stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
rename to
tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
index bf31321..42b428e 100644
---
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
+++
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
import static org.junit.Assert.assertEquals;
@@ -33,25 +33,27 @@ import org.apache.bookkeeper.common.util.Revisioned;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import
org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
/**
* Integration test for location test.
*/
@Slf4j
-public class LocationClientTest extends StorageServerTestBase {
+public class LocationClientTest extends StreamClusterTestBase {
private OrderedScheduler scheduler;
private LocationClient client;
- @Override
- protected void doSetup() throws Exception {
+ @Before
+ public void setup() {
scheduler = OrderedScheduler.newSchedulerBuilder()
.name("location-client-test")
.numThreads(1)
.build();
StorageClientSettings settings = StorageClientSettings.newBuilder()
- .addEndpoints(cluster.getRpcEndpoints().toArray(new
Endpoint[cluster.getRpcEndpoints().size()]))
+ .addEndpoints(getExsternalStreamEndpoints().toArray(new
Endpoint[getNumBookies()]))
.usePlaintext(true)
.build();
client = new LocationClientImpl(
@@ -59,8 +61,8 @@ public class LocationClientTest extends StorageServerTestBase
{
scheduler);
}
- @Override
- protected void doTeardown() throws Exception {
+ @After
+ public void teardown() {
if (null != client) {
client.close();
}
@@ -80,13 +82,15 @@ public class LocationClientTest extends
StorageServerTestBase {
assertEquals(StatusCode.SUCCESS, oneResponse.getStatusCode());
Endpoint endpoint = oneResponse.getEndpoint().getRwEndpoint();
- log.info("Current cluster endpoints = {}", cluster.getRpcEndpoints());
+ log.info("Current cluster endpoints = {}",
getInternalStreamEndpoints());
log.info("Response : rw endpoint = {}", endpoint);
- assertTrue(cluster.getRpcEndpoints().contains(endpoint));
+ assertTrue(getInternalStreamEndpoints().contains(endpoint));
assertEquals(1, oneResponse.getEndpoint().getRoEndpointCount());
endpoint = oneResponse.getEndpoint().getRoEndpoint(0);
log.info("Response : ro endpoint = {}", endpoint);
- assertTrue(cluster.getRpcEndpoints().contains(endpoint));
+ assertTrue(getInternalStreamEndpoints().contains(endpoint));
}
+
+
}
diff --git
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
new file mode 100644
index 0000000..b86cc72
--- /dev/null
+++
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.stream;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import
org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
+import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Similar as {@link
org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase},
+ * but enabled stream storage for testing stream storage related features.
+ */
+@Slf4j
+public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ BKClusterSpec spec = BKClusterSpec.builder()
+ .clusterName(RandomStringUtils.randomAlphabetic(8))
+ .numBookies(3)
+
.extraServerComponents("org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent")
+ .build();
+ BookKeeperClusterTestBase.setupCluster(spec);
+ }
+
+ @AfterClass
+ public static void teardownCluster() {
+ BookKeeperClusterTestBase.teardownCluster();
+ }
+
+ protected static int getNumBookies() {
+ return bkCluster.getBookieContainers().size();
+ }
+
+ protected static List<Endpoint> getExsternalStreamEndpoints() {
+ return bkCluster.getBookieContainers().values().stream()
+ .map(container ->
+ NetUtils.parseEndpoint(container.getExternalGrpcEndpointStr()))
+ .collect(Collectors.toList());
+ }
+
+ protected static List<Endpoint> getInternalStreamEndpoints() {
+ return bkCluster.getBookieContainers().values().stream()
+ .map(container ->
+ NetUtils.parseEndpoint(container.getInternalGrpcEndpointStr()))
+ .collect(Collectors.toList());
+ }
+
+
+}
diff --git
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index c1e9e84..80ae906 100644
---
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,9 +18,11 @@
package org.apache.bookkeeper.tests.integration.topologies;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -29,6 +31,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.tests.containers.BookieContainer;
import org.apache.bookkeeper.tests.containers.MetadataStoreContainer;
import org.apache.bookkeeper.tests.containers.ZKContainer;
@@ -40,21 +44,37 @@ import org.testcontainers.containers.Network;
@Slf4j
public class BKCluster {
+ /**
+ * BookKeeper Cluster Spec.
+ *
+ * @param spec bookkeeper cluster spec.
+ * @return the built bookkeeper cluster
+ */
+ public static BKCluster forSpec(BKClusterSpec spec) {
+ return new BKCluster(spec);
+ }
+
+ private final BKClusterSpec spec;
@Getter
private final String clusterName;
private final Network network;
private final MetadataStoreContainer metadataContainer;
private final Map<String, BookieContainer> bookieContainers;
private final int numBookies;
+ private final String extraServerComponents;
+ private volatile boolean enableContainerLog;
- public BKCluster(String clusterName, int numBookies) {
- this.clusterName = clusterName;
+ private BKCluster(BKClusterSpec spec) {
+ this.spec = spec;
+ this.clusterName = spec.clusterName();
this.network = Network.newNetwork();
this.metadataContainer = (MetadataStoreContainer) new
ZKContainer(clusterName)
.withNetwork(network)
.withNetworkAliases(ZKContainer.HOST_NAME);
this.bookieContainers = Maps.newTreeMap();
- this.numBookies = numBookies;
+ this.numBookies = spec.numBookies();
+ this.extraServerComponents = spec.extraServerComponents();
+ this.enableContainerLog = spec.enableContainerLog();
}
public String getExternalServiceUri() {
@@ -67,10 +87,28 @@ public class BKCluster {
public void start() throws Exception {
// start the metadata store
+ if (enableContainerLog) {
+ this.metadataContainer.tailContainerLog();
+ }
this.metadataContainer.start();
+ log.info("Successfully started metadata store container.");
// init a new cluster
initNewCluster(metadataContainer.getExternalServiceUri());
+ log.info("Successfully initialized metadata service uri : {}",
+ metadataContainer.getExternalServiceUri());
+
+ if (!Strings.isNullOrEmpty(extraServerComponents)) {
+ int numStorageContainers = numBookies > 0 ? 2 * numBookies : 8;
+ // initialize the stream storage.
+ new ZkClusterInitializer(
+
ZKMetadataDriverBase.getZKServersFromServiceUri(URI.create(metadataContainer.getExternalServiceUri()))
+ ).initializeCluster(
+ URI.create(metadataContainer.getInternalServiceUri()),
+ numStorageContainers);
+ log.info("Successfully initialized stream storage metadata with {}
storage containers",
+ numStorageContainers);
+ }
// create bookies
createBookies("bookie", numBookies);
@@ -123,20 +161,36 @@ public class BKCluster {
return container;
}
- public synchronized BookieContainer createBookie(String bookieName) {
- BookieContainer container = getBookie(bookieName);
- if (null == container) {
- container = (BookieContainer) new BookieContainer(clusterName,
bookieName, ZKContainer.SERVICE_URI)
- .withNetwork(network)
- .withNetworkAliases(bookieName);
+ public BookieContainer createBookie(String bookieName) {
+ boolean shouldStart = false;
+ BookieContainer container;
+ synchronized (this) {
+ container = getBookie(bookieName);
+ if (null == container) {
+ shouldStart = true;
+ log.info("Creating bookie {}", bookieName);
+ container = (BookieContainer) new BookieContainer(
+ clusterName, bookieName, ZKContainer.SERVICE_URI,
extraServerComponents
+ ).withNetwork(network).withNetworkAliases(bookieName);
+ if (enableContainerLog) {
+ container.tailContainerLog();
+ }
+
+ log.info("Created bookie {}", bookieName);
+ bookieContainers.put(bookieName, container);
+ }
+ }
+
+ if (shouldStart) {
+ log.info("Starting bookie {}", bookieName);
container.start();
- bookieContainers.put(bookieName, container);
}
return container;
}
- public synchronized Map<String, BookieContainer> createBookies(String
bookieNamePrefix, int numBookies)
+ public Map<String, BookieContainer> createBookies(String bookieNamePrefix,
int numBookies)
throws Exception {
+ log.info("Creating {} bookies with bookie name prefix '{}'",
numBookies, bookieNamePrefix);
List<CompletableFuture<Void>> startFutures =
Lists.newArrayListWithExpectedSize(numBookies);
Map<String, BookieContainer> containers = Maps.newHashMap();
for (int i = 0; i < numBookies; i++) {
@@ -144,6 +198,7 @@ public class BKCluster {
startFutures.add(
CompletableFuture.runAsync(() -> {
String bookieName = String.format("%s-%03d",
bookieNamePrefix, idx);
+ log.info("Starting bookie {} at cluster {}", bookieName,
clusterName);
BookieContainer container = createBookie(bookieName);
synchronized (containers) {
containers.put(bookieName, container);
diff --git
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
new file mode 100644
index 0000000..e919d1a
--- /dev/null
+++
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.topologies;
+
+import lombok.Builder;
+import lombok.Builder.Default;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * Spec to build {@link BKCluster}.
+ */
+@Builder
+@Accessors(fluent = true)
+@Getter
+@Setter
+public class BKClusterSpec {
+
+ /**
+ * Returns the cluster name.
+ *
+ * @return the cluster name.
+ */
+ String clusterName;
+
+ /**
+ * Returns the number of bookies.
+ *
+ * @return the number of bookies;
+ */
+ @Default
+ int numBookies = 0;
+
+ /**
+ * Returns the extra server components.
+ *
+ * @return the extra server components.
+ */
+ @Default
+ String extraServerComponents = "";
+
+ /**
+ * Returns the flag whether to enable/disable container log.
+ *
+ * @return the flag whether to enable/disable container log.
+ */
+ @Default
+ boolean enableContainerLog = false;
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].