This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2b75ca0e02c [improve] PIP-335: Pulsar with Oxia integration test
(#22045)
2b75ca0e02c is described below
commit 2b75ca0e02c10262813de509b96f5678faffc934
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Feb 9 10:50:33 2024 -0800
[improve] PIP-335: Pulsar with Oxia integration test (#22045)
---
.../docker-images/latest-version-image/Dockerfile | 2 +-
.../latest-version-image/scripts/init-cluster.sh | 38 ---
.../latest-version-image/scripts/run-bookie.sh | 1 -
.../latest-version-image/scripts/run-broker.sh | 1 -
.../scripts/run-functions-worker.sh | 1 -
.../latest-version-image/scripts/run-proxy.sh | 1 -
.../latest-version-image/scripts/run-websocket.sh | 1 -
.../containers/PulsarInitMetadataContainer.java | 76 +++++
.../tests/integration/oxia/OxiaContainer.java | 72 +++++
.../tests/integration/oxia/OxiaSmokeTest.java | 48 +++
.../integration/topologies/PulsarCluster.java | 330 ++++++++++++---------
.../integration/topologies/PulsarClusterSpec.java | 3 +
.../src/test/resources/pulsar-messaging.xml | 2 +
13 files changed, 397 insertions(+), 179 deletions(-)
diff --git a/tests/docker-images/latest-version-image/Dockerfile
b/tests/docker-images/latest-version-image/Dockerfile
index 4973bec0441..f019af5c926 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -50,7 +50,7 @@ COPY conf/supervisord.conf /etc/supervisord.conf
COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf
conf/functions_worker.conf \
conf/proxy.conf conf/websocket.conf /etc/supervisord/conf.d/
-COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
+COPY scripts/run-global-zk.sh scripts/run-local-zk.sh \
scripts/run-bookie.sh scripts/run-broker.sh
scripts/run-functions-worker.sh scripts/run-proxy.sh \
scripts/run-standalone.sh scripts/run-websocket.sh \
/pulsar/bin/
diff --git a/tests/docker-images/latest-version-image/scripts/init-cluster.sh
b/tests/docker-images/latest-version-image/scripts/init-cluster.sh
deleted file mode 100755
index 926845d5a77..00000000000
--- a/tests/docker-images/latest-version-image/scripts/init-cluster.sh
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-set -x
-
-ZNODE="/initialized-$clusterName"
-
-bin/watch-znode.py -z $zkServers -p / -w
-
-bin/watch-znode.py -z $zkServers -p $ZNODE -e
-if [ $? != 0 ]; then
- echo Initializing cluster
- bin/apply-config-from-env.py conf/bookkeeper.conf &&
- bin/pulsar initialize-cluster-metadata --cluster $clusterName
--zookeeper $zkServers \
- --configuration-store $configurationStore --web-service-url
http://$pulsarNode:8080/ \
- --broker-service-url pulsar://$pulsarNode:6650/ &&
- bin/watch-znode.py -z $zkServers -p $ZNODE -c
- echo Initialized
-else
- echo Already Initialized
-fi
diff --git a/tests/docker-images/latest-version-image/scripts/run-bookie.sh
b/tests/docker-images/latest-version-image/scripts/run-bookie.sh
index 64466eb2d9a..e454e667645 100755
--- a/tests/docker-images/latest-version-image/scripts/run-bookie.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-bookie.sh
@@ -29,6 +29,5 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/bookie.conf
fi
-bin/watch-znode.py -z $zkServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git a/tests/docker-images/latest-version-image/scripts/run-broker.sh
b/tests/docker-images/latest-version-image/scripts/run-broker.sh
index 6ed5d60c39e..4f89f145f2b 100755
--- a/tests/docker-images/latest-version-image/scripts/run-broker.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-broker.sh
@@ -25,6 +25,5 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/broker.conf
fi
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git
a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
index 3fadf960ee3..cd9d7593dbf 100755
--- a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
@@ -26,6 +26,5 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/functions_worker.conf
fi
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git a/tests/docker-images/latest-version-image/scripts/run-proxy.sh
b/tests/docker-images/latest-version-image/scripts/run-proxy.sh
index 4836a890bda..f44ed0bb658 100755
--- a/tests/docker-images/latest-version-image/scripts/run-proxy.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-proxy.sh
@@ -25,5 +25,4 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/proxy.conf
fi
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git a/tests/docker-images/latest-version-image/scripts/run-websocket.sh
b/tests/docker-images/latest-version-image/scripts/run-websocket.sh
index a49ee111768..34e4b9016af 100755
--- a/tests/docker-images/latest-version-image/scripts/run-websocket.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-websocket.sh
@@ -25,5 +25,4 @@ if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/websocket.conf
fi
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
new file mode 100644
index 00000000000..4251ed3bd57
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.tests.integration.containers;
+
+import java.io.IOException;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+
+/**
+ * Initialize the Pulsar metadata
+ */
+@Slf4j
+public class PulsarInitMetadataContainer extends
GenericContainer<PulsarInitMetadataContainer> {
+
+ public static final String NAME = "init-metadata";
+
+ private final String clusterName;
+ private final String metadataStoreUrl;
+ private final String configurationMetadataStoreUrl;
+ private final String brokerHostname;
+
+ public PulsarInitMetadataContainer(Network network,
+ String clusterName,
+ String metadataStoreUrl,
+ String configurationMetadataStoreUrl,
+ String brokerHostname) {
+ this.clusterName = clusterName;
+ this.metadataStoreUrl = metadataStoreUrl;
+ this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
+ this.brokerHostname = brokerHostname;
+ setDockerImageName(PulsarContainer.DEFAULT_IMAGE_NAME);
+ withNetwork(network);
+
+ setCommand("sleep 1000000");
+ }
+
+
+ public void initialize() throws Exception {
+ start();
+ ExecResult res = this.execInContainer(
+ "/pulsar/bin/pulsar", "initialize-cluster-metadata",
+ "--cluster", clusterName,
+ "--metadata-store", metadataStoreUrl,
+ "--configuration-metadata-store",
configurationMetadataStoreUrl,
+ "--web-service-url", "http://" + brokerHostname + ":8080/",
+ "--broker-service-url", "pulsar://" + brokerHostname + ":6650/"
+ );
+
+ if (res.getExitCode() == 0) {
+ log.info("Successfully initialized cluster");
+ } else {
+ log.warn("Failed to initialize Pulsar cluster. exit code: " +
res.getExitCode());
+ log.warn("STDOUT: " + res.getStdout());
+ log.warn("STDERR: " + res.getStderr());
+ throw new IOException("Failed to initialized Pulsar Cluster");
+ }
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
new file mode 100644
index 00000000000..18d2dd77b7d
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.tests.integration.oxia;
+
+import java.time.Duration;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class OxiaContainer extends ChaosContainer<OxiaContainer> {
+
+ public static final String NAME = "oxia";
+
+ public static final int OXIA_PORT = 6648;
+ public static final int METRICS_PORT = 8080;
+ private static final int DEFAULT_SHARDS = 1;
+
+ private static final String DEFAULT_IMAGE_NAME = "streamnative/oxia:main";
+
+ public OxiaContainer(String clusterName) {
+ this(clusterName, DEFAULT_IMAGE_NAME, DEFAULT_SHARDS);
+ }
+
+ @SuppressWarnings("resource")
+ OxiaContainer(String clusterName, String imageName, int shards) {
+ super(clusterName, imageName);
+ if (shards <= 0) {
+ throw new IllegalArgumentException("shards must be greater than
zero");
+ }
+ addExposedPorts(OXIA_PORT, METRICS_PORT);
+ this.withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName("oxia");
+ createContainerCmd.withName(getContainerName());
+ });
+ setCommand("oxia", "standalone",
+ "--shards=" + shards,
+ "--wal-sync-data=false");
+ waitingFor(
+ Wait.forHttp("/metrics")
+ .forPort(METRICS_PORT)
+ .forStatusCode(200)
+ .withStartupTimeout(Duration.ofSeconds(30)));
+
+ PulsarContainer.configureLeaveContainerRunning(this);
+ }
+
+ public String getServiceAddress() {
+ return OxiaContainer.NAME + ":" + OXIA_PORT;
+ }
+
+ @Override
+ public String getContainerName() {
+ return clusterName + "-oxia";
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java
new file mode 100644
index 00000000000..d55c437b4f8
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.oxia;
+
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.testng.annotations.Test;
+
+/**
+ * Test pulsar produce/consume semantics
+ */
+@Slf4j
+public class OxiaSmokeTest extends PulsarTestSuite {
+
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder
specBuilder) {
+ specBuilder.enableOxia(true);
+ return specBuilder;
+ }
+
+ //
+ // Test Basic Publish & Consume Operations
+ //
+
+ @Test(dataProvider = "ServiceUrlAndTopics")
+ public void testPublishAndConsume(Supplier<String> serviceUrl, boolean
isPersistent) throws Exception {
+ super.testPublishAndConsume(serviceUrl.get(), isPersistent);
+ }
+
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 88ff778732e..bc9b1e267b9 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
+import lombok.Cleanup;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
@@ -44,9 +45,11 @@ import
org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.CSContainer;
import org.apache.pulsar.tests.integration.containers.ProxyContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import
org.apache.pulsar.tests.integration.containers.PulsarInitMetadataContainer;
import org.apache.pulsar.tests.integration.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.oxia.OxiaContainer;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
@@ -69,9 +72,12 @@ public class PulsarCluster {
* @return the built pulsar cluster
*/
public static PulsarCluster forSpec(PulsarClusterSpec spec) {
- CSContainer csContainer = new CSContainer(spec.clusterName)
- .withNetwork(Network.newNetwork())
- .withNetworkAliases(CSContainer.NAME);
+ CSContainer csContainer = null;
+ if (!spec.enableOxia) {
+ csContainer = new CSContainer(spec.clusterName)
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases(CSContainer.NAME);
+ }
return new PulsarCluster(spec, csContainer, false);
}
@@ -86,6 +92,8 @@ public class PulsarCluster {
private final String clusterName;
private final Network network;
private final ZKContainer zkContainer;
+
+ private final OxiaContainer oxiaContainer;
private final CSContainer csContainer;
private final boolean sharedCsContainer;
private final Map<String, BKContainer> bookieContainers;
@@ -95,22 +103,44 @@ public class PulsarCluster {
private Map<String, GenericContainer<?>> externalServices =
Collections.emptyMap();
private Map<String, Map<String, String>> externalServiceEnvs;
+ private final String metadataStoreUrl;
+ private final String configurationMetadataStoreUrl;
+
private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer,
boolean sharedCsContainer) {
this.spec = spec;
this.sharedCsContainer = sharedCsContainer;
this.clusterName = spec.clusterName();
- this.network = csContainer.getNetwork();
-
- this.zkContainer = new ZKContainer(clusterName);
- this.zkContainer
- .withNetwork(network)
- .withNetworkAliases(appendClusterName(ZKContainer.NAME))
- .withEnv("clusterName", clusterName)
- .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
- .withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT)
- .withEnv("forceSync", "no")
- .withEnv("pulsarNode", appendClusterName("pulsar-broker-0"));
+ if (csContainer != null ) {
+ this.network = csContainer.getNetwork();
+ } else {
+ this.network = Network.newNetwork();
+ }
+
+
+
+ if (spec.enableOxia) {
+ this.zkContainer = null;
+ this.oxiaContainer = new OxiaContainer(clusterName);
+ this.oxiaContainer
+ .withNetwork(network)
+ .withNetworkAliases(appendClusterName(OxiaContainer.NAME));
+ metadataStoreUrl = "oxia://" + oxiaContainer.getServiceAddress();
+ configurationMetadataStoreUrl = metadataStoreUrl;
+ } else {
+ this.oxiaContainer = null;
+ this.zkContainer = new ZKContainer(clusterName);
+ this.zkContainer
+ .withNetwork(network)
+ .withNetworkAliases(appendClusterName(ZKContainer.NAME))
+ .withEnv("clusterName", clusterName)
+ .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
+ .withEnv("configurationStore", CSContainer.NAME + ":" +
CS_PORT)
+ .withEnv("forceSync", "no")
+ .withEnv("pulsarNode",
appendClusterName("pulsar-broker-0"));
+ metadataStoreUrl = appendClusterName(ZKContainer.NAME);
+ configurationMetadataStoreUrl = CSContainer.NAME + ":" + CS_PORT;
+ }
this.csContainer = csContainer;
@@ -121,27 +151,29 @@ public class PulsarCluster {
this.proxyContainer = new
ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME,
spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName("pulsar-proxy"))
- .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
- .withEnv("zookeeperServers",
appendClusterName(ZKContainer.NAME))
- .withEnv("configurationStoreServers", CSContainer.NAME + ":" +
CS_PORT)
+ .withEnv("metadataStoreUrl", metadataStoreUrl)
+ .withEnv("configurationMetadataStoreUrl",
configurationMetadataStoreUrl)
.withEnv("clusterName", clusterName);
- // enable mTLS
+ // enable mTLS
if (spec.enableTls) {
proxyContainer
- .withEnv("webServicePortTls",
String.valueOf(BROKER_HTTPS_PORT))
- .withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS))
- .withEnv("forwardAuthorizationCredentials", "true")
- .withEnv("tlsRequireTrustedClientCertOnConnect", "true")
- .withEnv("tlsAllowInsecureConnection", "false")
- .withEnv("tlsCertificateFilePath",
"/pulsar/certificate-authority/server-keys/proxy.cert.pem")
- .withEnv("tlsKeyFilePath",
"/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")
- .withEnv("tlsTrustCertsFilePath",
"/pulsar/certificate-authority/certs/ca.cert.pem")
- .withEnv("brokerClientAuthenticationPlugin",
AuthenticationTls.class.getName())
- .withEnv("brokerClientAuthenticationParameters",
String.format("tlsCertFile:%s,tlsKeyFile:%s",
"/pulsar/certificate-authority/client-keys/admin.cert.pem",
"/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))
- .withEnv("tlsEnabledWithBroker", "true")
- .withEnv("brokerClientTrustCertsFilePath",
"/pulsar/certificate-authority/certs/ca.cert.pem")
- .withEnv("brokerClientCertificateFilePath",
"/pulsar/certificate-authority/server-keys/proxy.cert.pem")
- .withEnv("brokerClientKeyFilePath",
"/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");
+ .withEnv("webServicePortTls",
String.valueOf(BROKER_HTTPS_PORT))
+ .withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS))
+ .withEnv("forwardAuthorizationCredentials", "true")
+ .withEnv("tlsRequireTrustedClientCertOnConnect", "true")
+ .withEnv("tlsAllowInsecureConnection", "false")
+ .withEnv("tlsCertificateFilePath",
"/pulsar/certificate-authority/server-keys/proxy.cert.pem")
+ .withEnv("tlsKeyFilePath",
"/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")
+ .withEnv("tlsTrustCertsFilePath",
"/pulsar/certificate-authority/certs/ca.cert.pem")
+ .withEnv("brokerClientAuthenticationPlugin",
AuthenticationTls.class.getName())
+ .withEnv("brokerClientAuthenticationParameters",
String.format("tlsCertFile:%s,tlsKeyFile:%s",
+
"/pulsar/certificate-authority/client-keys/admin.cert.pem",
+
"/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))
+ .withEnv("tlsEnabledWithBroker", "true")
+ .withEnv("brokerClientTrustCertsFilePath",
"/pulsar/certificate-authority/certs/ca.cert.pem")
+ .withEnv("brokerClientCertificateFilePath",
+
"/pulsar/certificate-authority/server-keys/proxy.cert.pem")
+ .withEnv("brokerClientKeyFilePath",
"/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");
}
if (spec.proxyEnvs != null) {
@@ -157,7 +189,7 @@ public class PulsarCluster {
BKContainer bookieContainer = new BKContainer(clusterName,
name)
.withNetwork(network)
.withNetworkAliases(appendClusterName(name))
- .withEnv("zkServers",
appendClusterName(ZKContainer.NAME))
+ .withEnv("metadataServiceUri", "metadata-store:" +
metadataStoreUrl)
.withEnv("useHostNameAsBookieID", "true")
// Disable fsyncs for tests since they're slow
within the containers
.withEnv("journalSyncData", "false")
@@ -179,50 +211,56 @@ public class PulsarCluster {
// create brokers
brokerContainers.putAll(
- runNumContainers("broker", spec.numBrokers(), (name) -> {
- BrokerContainer brokerContainer = new
BrokerContainer(clusterName, appendClusterName(name), spec.enableTls)
- .withNetwork(network)
- .withNetworkAliases(appendClusterName(name))
- .withEnv("zkServers",
appendClusterName(ZKContainer.NAME))
- .withEnv("zookeeperServers",
appendClusterName(ZKContainer.NAME))
- .withEnv("configurationStoreServers", CSContainer.NAME
+ ":" + CS_PORT)
- .withEnv("clusterName", clusterName)
-
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
- .withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
- // used in s3 tests
- .withEnv("AWS_ACCESS_KEY_ID",
"accesskey").withEnv("AWS_SECRET_KEY", "secretkey")
- .withEnv("maxMessageSize", "" + spec.maxMessageSize);
- if (spec.enableTls) {
- // enable mTLS
- brokerContainer
- .withEnv("webServicePortTls",
String.valueOf(BROKER_HTTPS_PORT))
- .withEnv("brokerServicePortTls",
String.valueOf(BROKER_PORT_TLS))
- .withEnv("authenticateOriginalAuthData", "true")
- .withEnv("tlsAllowInsecureConnection", "false")
- .withEnv("tlsRequireTrustedClientCertOnConnect",
"true")
- .withEnv("tlsTrustCertsFilePath",
"/pulsar/certificate-authority/certs/ca.cert.pem")
- .withEnv("tlsCertificateFilePath",
"/pulsar/certificate-authority/server-keys/broker.cert.pem")
- .withEnv("tlsKeyFilePath",
"/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
- }
- if (spec.queryLastMessage) {
-
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
- brokerContainer.withEnv("bookkeeperUseV2WireProtocol",
"false");
- }
- if (spec.brokerEnvs != null) {
- brokerContainer.withEnv(spec.brokerEnvs);
- }
- if (spec.brokerMountFiles != null) {
-
spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind);
- }
- if (spec.brokerAdditionalPorts() != null) {
-
spec.brokerAdditionalPorts().forEach(brokerContainer::addExposedPort);
- }
- return brokerContainer;
- }
- ));
+ runNumContainers("broker", spec.numBrokers(), (name) -> {
+ BrokerContainer brokerContainer =
+ new BrokerContainer(clusterName,
appendClusterName(name), spec.enableTls)
+ .withNetwork(network)
+
.withNetworkAliases(appendClusterName(name))
+ .withEnv("metadataStoreUrl",
metadataStoreUrl)
+
.withEnv("configurationMetadataStoreUrl", configurationMetadataStoreUrl)
+ .withEnv("clusterName",
clusterName)
+
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
+
.withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
+ // used in s3 tests
+ .withEnv("AWS_ACCESS_KEY_ID",
"accesskey").withEnv("AWS_SECRET_KEY",
+ "secretkey")
+ .withEnv("maxMessageSize", "" +
spec.maxMessageSize);
+ if (spec.enableTls) {
+ // enable mTLS
+ brokerContainer
+ .withEnv("webServicePortTls",
String.valueOf(BROKER_HTTPS_PORT))
+ .withEnv("brokerServicePortTls",
String.valueOf(BROKER_PORT_TLS))
+
.withEnv("authenticateOriginalAuthData", "true")
+ .withEnv("tlsAllowInsecureConnection",
"false")
+
.withEnv("tlsRequireTrustedClientCertOnConnect", "true")
+ .withEnv("tlsTrustCertsFilePath",
"/pulsar/certificate-authority/certs/ca"
+ + ".cert.pem")
+ .withEnv("tlsCertificateFilePath",
+
"/pulsar/certificate-authority/server-keys/broker.cert.pem")
+ .withEnv("tlsKeyFilePath",
+
"/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
+ }
+ if (spec.queryLastMessage) {
+
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
+
brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
+ }
+ if (spec.brokerEnvs != null) {
+ brokerContainer.withEnv(spec.brokerEnvs);
+ }
+ if (spec.brokerMountFiles != null) {
+
spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind);
+ }
+ if (spec.brokerAdditionalPorts() != null) {
+
spec.brokerAdditionalPorts().forEach(brokerContainer::addExposedPort);
+ }
+ return brokerContainer;
+ }
+ ));
spec.classPathVolumeMounts.forEach((key, value) -> {
- zkContainer.withClasspathResourceMapping(key, value,
BindMode.READ_WRITE);
+ if (zkContainer != null) {
+ zkContainer.withClasspathResourceMapping(key, value,
BindMode.READ_WRITE);
+ }
proxyContainer.withClasspathResourceMapping(key, value,
BindMode.READ_WRITE);
bookieContainers.values().forEach(c ->
c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
@@ -278,20 +316,33 @@ public class PulsarCluster {
}
public void start() throws Exception {
- // start the local zookeeper
- zkContainer.start();
- log.info("Successfully started local zookeeper container.");
- // start the configuration store
- if (!sharedCsContainer) {
- csContainer.start();
- log.info("Successfully started configuration store container.");
+ if (!spec.enableOxia) {
+ // start the local zookeeper
+ zkContainer.start();
+ log.info("Successfully started local zookeeper container.");
+
+ // start the configuration store
+ if (!sharedCsContainer) {
+ csContainer.start();
+ log.info("Successfully started configuration store
container.");
+ }
+ } else {
+ oxiaContainer.start();
}
- // init the cluster
- zkContainer.execCmd(
- "bin/init-cluster.sh");
- log.info("Successfully initialized the cluster.");
+ {
+ // Run cluster metadata initialization
+ @Cleanup
+ PulsarInitMetadataContainer init = new PulsarInitMetadataContainer(
+ network,
+ clusterName,
+ metadataStoreUrl,
+ configurationMetadataStoreUrl,
+ appendClusterName("pulsar-broker-0")
+ );
+ init.initialize();
+ }
// start bookies
bookieContainers.values().forEach(BKContainer::start);
@@ -318,7 +369,8 @@ public class PulsarCluster {
serviceContainer.withNetwork(network);
serviceContainer.withNetworkAliases(service.getKey());
if (null != externalServiceEnvs && null !=
externalServiceEnvs.get(service.getKey())) {
- Map<String, String> env =
externalServiceEnvs.getOrDefault(service.getKey(), Collections.emptyMap());
+ Map<String, String> env =
+ externalServiceEnvs.getOrDefault(service.getKey(),
Collections.emptyMap());
serviceContainer.withEnv(env);
}
PulsarContainer.configureLeaveContainerRunning(serviceContainer);
@@ -390,6 +442,10 @@ public class PulsarCluster {
zkContainer.stop();
}
+ if (oxiaContainer != null) {
+ oxiaContainer.stop();
+ }
+
try {
network.close();
} catch (Exception e) {
@@ -403,7 +459,8 @@ public class PulsarCluster {
.forEach(GenericContainer::stop);
}
- public synchronized void setupFunctionWorkers(String suffix,
FunctionRuntimeType runtimeType, int numFunctionWorkers) {
+ public synchronized void setupFunctionWorkers(String suffix,
FunctionRuntimeType runtimeType,
+ int numFunctionWorkers) {
switch (runtimeType) {
case THREAD:
startFunctionWorkersWithThreadContainerFactory(suffix,
numFunctionWorkers);
@@ -418,23 +475,23 @@ public class PulsarCluster {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
- "functions-worker-process-" + suffix,
- numFunctionWorkers,
- (name) -> new WorkerContainer(clusterName, name)
- .withNetwork(network)
- .withNetworkAliases(name)
- // worker settings
- .withEnv("PF_workerId", name)
- .withEnv("PF_workerHostname", name)
- .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
- .withEnv("PF_pulsarFunctionsCluster", clusterName)
- .withEnv("PF_pulsarServiceUrl", serviceUrl)
- .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
- // script
- .withEnv("clusterName", clusterName)
- .withEnv("zookeeperServers", ZKContainer.NAME)
- // bookkeeper tools
- .withEnv("zkServers", ZKContainer.NAME)
+ "functions-worker-process-" + suffix,
+ numFunctionWorkers,
+ (name) -> new WorkerContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ // worker settings
+ .withEnv("PF_workerId", name)
+ .withEnv("PF_workerHostname", name)
+ .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
+ .withEnv("PF_pulsarFunctionsCluster", clusterName)
+ .withEnv("PF_pulsarServiceUrl", serviceUrl)
+ .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+ // script
+ .withEnv("clusterName", clusterName)
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ // bookkeeper tools
+ .withEnv("zkServers", ZKContainer.NAME)
));
this.startWorkers();
}
@@ -443,25 +500,26 @@ public class PulsarCluster {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
- "functions-worker-thread-" + suffix,
- numFunctionWorkers,
- (name) -> new WorkerContainer(clusterName, name)
- .withNetwork(network)
- .withNetworkAliases(name)
- // worker settings
- .withEnv("PF_workerId", name)
- .withEnv("PF_workerHostname", name)
- .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
- .withEnv("PF_pulsarFunctionsCluster", clusterName)
- .withEnv("PF_pulsarServiceUrl", serviceUrl)
- .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
- .withEnv("PF_functionRuntimeFactoryClassName",
"org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory")
- .withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName",
"pf-container-group")
- // script
- .withEnv("clusterName", clusterName)
- .withEnv("zookeeperServers", ZKContainer.NAME)
- // bookkeeper tools
- .withEnv("zkServers", ZKContainer.NAME)
+ "functions-worker-thread-" + suffix,
+ numFunctionWorkers,
+ (name) -> new WorkerContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ // worker settings
+ .withEnv("PF_workerId", name)
+ .withEnv("PF_workerHostname", name)
+ .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
+ .withEnv("PF_pulsarFunctionsCluster", clusterName)
+ .withEnv("PF_pulsarServiceUrl", serviceUrl)
+ .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+ .withEnv("PF_functionRuntimeFactoryClassName",
+
"org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory")
+
.withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName",
"pf-container-group")
+ // script
+ .withEnv("clusterName", clusterName)
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ // bookkeeper tools
+ .withEnv("zkServers", ZKContainer.NAME)
));
this.startWorkers();
}
@@ -502,9 +560,9 @@ public class PulsarCluster {
containers.forEach((name, container) -> {
PulsarContainer.configureLeaveContainerRunning(container);
container
- .withNetwork(network)
- .withNetworkAliases(name)
- .start();
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ .start();
log.info("Successfully start container {}.", name);
});
}
@@ -576,15 +634,16 @@ public class PulsarCluster {
return zkContainer;
}
- public ContainerExecResult runAdminCommandOnAnyBroker(String...commands)
throws Exception {
+ public ContainerExecResult runAdminCommandOnAnyBroker(String... commands)
throws Exception {
return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
}
- public ContainerExecResult
runPulsarBaseCommandOnAnyBroker(String...commands) throws Exception {
+ public ContainerExecResult runPulsarBaseCommandOnAnyBroker(String...
commands) throws Exception {
return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT,
commands);
}
- private ContainerExecResult runCommandOnAnyBrokerWithScript(String
scriptType, String...commands) throws Exception {
+ private ContainerExecResult runCommandOnAnyBrokerWithScript(String
scriptType, String... commands)
+ throws Exception {
BrokerContainer container = getAnyBroker();
String[] cmds = new String[commands.length + 1];
cmds[0] = scriptType;
@@ -618,8 +677,8 @@ public class PulsarCluster {
public ContainerExecResult createNamespace(String nsName) throws Exception
{
return runAdminCommandOnAnyBroker(
- "namespaces", "create", "public/" + nsName,
- "--clusters", clusterName);
+ "namespaces", "create", "public/" + nsName,
+ "--clusters", clusterName);
}
public ContainerExecResult createPartitionedTopic(String topicName, int
partitions) throws Exception {
@@ -630,8 +689,8 @@ public class PulsarCluster {
public ContainerExecResult enableDeduplication(String nsName, boolean
enabled) throws Exception {
return runAdminCommandOnAnyBroker(
- "namespaces", "set-deduplication", "public/" + nsName,
- enabled ? "--enable" : "--disable");
+ "namespaces", "set-deduplication", "public/" + nsName,
+ enabled ? "--enable" : "--disable");
}
public void dumpFunctionLogs(String name) {
@@ -644,7 +703,8 @@ public class PulsarCluster {
});
log.info("Function {} logs {}", name, logs);
} catch (com.github.dockerjava.api.exception.NotFoundException
notFound) {
- log.info("Cannot download {} logs from {} not found exception
{}", name, container.getContainerName(), notFound.toString());
+ log.info("Cannot download {} logs from {} not found exception
{}", name, container.getContainerName(),
+ notFound.toString());
} catch (Throwable err) {
log.info("Cannot download {} logs from {}", name,
container.getContainerName(), err);
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index 81e7ae70eff..b705b347cff 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -175,4 +175,7 @@ public class PulsarClusterSpec {
*/
@Default
boolean enableTls = false;
+
+ @Default
+ boolean enableOxia = false;
}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml
b/tests/integration/src/test/resources/pulsar-messaging.xml
index cfbdb225870..603756fab68 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -29,6 +29,8 @@
<class
name="org.apache.pulsar.tests.integration.messaging.ReaderMessagingTest" />
<class
name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest"
/>
<class name="org.apache.pulsar.tests.integration.admin.AdminTest"
/>
+
+ <class
name="org.apache.pulsar.tests.integration.oxia.OxiaSmokeTest" />
</classes>
</test>
</suite>
\ No newline at end of file