This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f75993a84f [fix][ci] Fix PulsarFunctionLocalRunTest that broke after
ClusterData validation changes (#19212)
7f75993a84f is described below
commit 7f75993a84f068febb92f63b338fb0614543affc
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jan 13 00:45:45 2023 +0200
[fix][ci] Fix PulsarFunctionLocalRunTest that broke after ClusterData
validation changes (#19212)
---
.../worker/PulsarFunctionLocalRunTest.java | 30 +++++++++++++++++-----
1 file changed, 23 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index a8a1c8384dd..6555cc841e9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -272,7 +272,7 @@ public class PulsarFunctionLocalRunTest {
primaryHost = pulsar.getWebServiceAddress();
// create cluster metadata
- ClusterData clusterData =
ClusterData.builder().serviceUrl(urlTls.toString()).build();
+ ClusterData clusterData =
ClusterData.builder().serviceUrlTls(urlTls.toString()).build();
admin.clusters().createCluster(config.getClusterName(), clusterData);
ClientBuilder clientBuilder = PulsarClient.builder()
@@ -308,14 +308,30 @@ public class PulsarFunctionLocalRunTest {
void shutdown() throws Exception {
try {
log.info("--- Shutting down ---");
- fileServer.stop();
- pulsarClient.close();
- admin.close();
- pulsar.close();
- bkEnsemble.stop();
+ if (fileServer != null) {
+ fileServer.stop();
+ fileServer = null;
+ }
+ if (pulsarClient != null) {
+ pulsarClient.close();
+ pulsarClient = null;
+ }
+ if (admin != null) {
+ admin.close();
+ admin = null;
+ }
+ if (pulsar != null) {
+ pulsar.close();
+ pulsar = null;
+ }
+ if (bkEnsemble != null) {
+ bkEnsemble.stop();
+ bkEnsemble = null;
+ }
} finally {
if (tempDirectory != null) {
tempDirectory.delete();
+ tempDirectory = null;
}
}
}
@@ -1099,7 +1115,7 @@ public class PulsarFunctionLocalRunTest {
public void testPulsarSinkWithFunction() throws Throwable {
testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName(),
"builtin://exclamation",
"org.apache.pulsar.functions.api.examples.RecordFunction");
}
-
+
public static class TestErrorSink implements Sink<byte[]> {
private Map config;
@Override