This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3742483 Fix querystate key not found and add tests for function state
(#4014)
3742483 is described below
commit 374248374e512118c935bfa43c8c58e424c8f241
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Apr 10 13:48:36 2019 -0700
Fix querystate key not found and add tests for function state (#4014)
* fix return code when querying function state for a key that doesn't exist
* add tests
* fix formatting
---
.../functions/worker/PulsarFunctionStateTest.java | 407 +++++++++++++++++++++
.../functions/worker/rest/api/ComponentImpl.java | 2 +
.../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 10 +-
3 files changed, 418 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
new file mode 100644
index 0000000..69f19a0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpServer;
+import lombok.ToString;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static
org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Test Pulsar function state
+ *
+ */
+public class PulsarFunctionStateTest {
+ LocalBookkeeperEnsemble bkEnsemble;
+
+ ServiceConfiguration config;
+ WorkerConfig workerConfig;
+ URL urlTls;
+ PulsarService pulsar;
+ PulsarAdmin admin;
+ PulsarClient pulsarClient;
+ BrokerStats brokerStatsClient;
+ WorkerService functionsWorkerService;
+ final String tenant = "external-repl-prop";
+ String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+ String primaryHost;
+ String workerId;
+
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ private final List<Integer> bookiePorts = new LinkedList<>();
+ private final int brokerWebServicePort = PortManager.nextFreePort();
+ private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
+ private final int brokerServicePort = PortManager.nextFreePort();
+ private final int brokerServiceTlsPort = PortManager.nextFreePort();
+ private final int workerServicePort = PortManager.nextFreePort();
+
+ private final String TLS_SERVER_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/broker-cert.pem";
+ private final String TLS_SERVER_KEY_FILE_PATH =
"./src/test/resources/authentication/tls/broker-key.pem";
+ private final String TLS_CLIENT_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/client-cert.pem";
+ private final String TLS_CLIENT_KEY_FILE_PATH =
"./src/test/resources/authentication/tls/client-key.pem";
+ private final String TLS_TRUST_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/cacert.pem";
+
+ private static final Logger log =
LoggerFactory.getLogger(PulsarFunctionStateTest.class);
+
+ @DataProvider(name = "validRoleName")
+ public Object[][] validRoleName() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+ @BeforeMethod
+ void setup(Method method) throws Exception {
+
+ // delete all function temp files
+ File dir = new File(System.getProperty("java.io.tmpdir"));
+ File[] foundFiles = dir.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith("function");
+ }
+ });
+
+ for (File file : foundFiles) {
+ file.delete();
+ }
+
+ log.info("--- Setting up method {} ---", method.getName());
+
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> {
+ int port = PortManager.nextFreePort();
+ bookiePorts.add(port);
+ return port;
+ });
+ bkEnsemble.start(true);
+
+ String brokerServiceUrl = "https://127.0.0.1:" +
brokerWebServiceTlsPort;
+
+ config = spy(new ServiceConfiguration());
+ config.setClusterName("use");
+ Set<String> superUsers = Sets.newHashSet("superUser");
+ config.setSuperUserRoles(superUsers);
+ config.setWebServicePort(brokerWebServicePort);
+ config.setWebServicePortTls(brokerWebServiceTlsPort);
+ config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+ config.setBrokerServicePort(brokerServicePort);
+ config.setBrokerServicePortTls(brokerServiceTlsPort);
+ config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+ config.setTlsAllowInsecureConnection(true);
+ config.setAdvertisedAddress("localhost");
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderTls.class.getName());
+ config.setAuthenticationEnabled(true);
+ config.setAuthenticationProviders(providers);
+
+ config.setAuthorizationEnabled(true);
+
config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+
+ config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+ config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ config.setBrokerClientAuthenticationParameters(
+ "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," +
"tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+ config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ config.setBrokerClientTlsEnabled(true);
+
+
+
+ functionsWorkerService = createPulsarFunctionWorker(config);
+ urlTls = new URL(brokerServiceUrl);
+ Optional<WorkerService> functionWorkerService =
Optional.of(functionsWorkerService);
+ pulsar = new PulsarService(config, functionWorkerService);
+ pulsar.start();
+
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
+
+ admin = spy(
+
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+
.allowTlsInsecureConnection(true).authentication(authTls).build());
+
+ brokerStatsClient = admin.brokerStats();
+ primaryHost = String.format("http://%s:%d", "localhost",
brokerWebServicePort);
+
+ // update cluster metadata
+ ClusterData clusterData = new ClusterData(urlTls.toString());
+ admin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+ ClientBuilder clientBuilder =
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+ if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+ &&
isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+ clientBuilder.enableTls(workerConfig.isUseTls());
+
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
+
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+ workerConfig.getClientAuthenticationParameters());
+ }
+ pulsarClient = clientBuilder.build();
+
+ TenantInfo propAdmin = new TenantInfo();
+ propAdmin.getAdminRoles().add("superUser");
+
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+ admin.tenants().updateTenant(tenant, propAdmin);
+
+ System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
+ }
+
+ @AfterMethod
+ void shutdown() throws Exception {
+ log.info("--- Shutting down ---");
+ pulsarClient.close();
+ admin.close();
+ functionsWorkerService.stop();
+ pulsar.close();
+ bkEnsemble.stop();
+ }
+
+ private WorkerService createPulsarFunctionWorker(ServiceConfiguration
config) {
+
+ workerConfig = new WorkerConfig();
+ workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+ workerConfig.setSchedulerClassName(
+
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+ workerConfig.setThreadContainerFactory(new
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+ // worker talks to local broker
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" +
config.getBrokerServicePortTls().get());
+ workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" +
config.getWebServicePortTls().get());
+ workerConfig.setFailureCheckFreqMs(100);
+ workerConfig.setNumFunctionPackageReplicas(1);
+ workerConfig.setClusterCoordinationTopicName("coordinate");
+ workerConfig.setFunctionAssignmentTopicName("assignment");
+ workerConfig.setFunctionMetadataTopicName("metadata");
+ workerConfig.setInstanceLivenessCheckFreqMs(100);
+ workerConfig.setWorkerPort(workerServicePort);
+ workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+ String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+ this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname +
"-" + workerConfig.getWorkerPort();
+ workerConfig.setWorkerHostname(hostname);
+ workerConfig.setWorkerId(workerId);
+
+
workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ workerConfig.setClientAuthenticationParameters(
+ String.format("tlsCertFile:%s,tlsKeyFile:%s",
TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
+ workerConfig.setUseTls(true);
+ workerConfig.setTlsAllowInsecureConnection(true);
+ workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+ workerConfig.setAuthenticationEnabled(true);
+ workerConfig.setAuthorizationEnabled(true);
+
+ workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:4181");
+
+ return new WorkerService(workerConfig);
+ }
+
+ protected static FunctionConfig createFunctionConfig(String tenant, String
namespace, String functionName, String sourceTopic, String sinkTopic, String
subscriptionName) {
+
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(functionName);
+ functionConfig.setParallelism(1);
+
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+ functionConfig.setSubName(subscriptionName);
+ functionConfig.setInputs(Collections.singleton(sourceTopic));
+ functionConfig.setAutoAck(true);
+
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.WordCountFunction");
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+ functionConfig.setOutput(sinkTopic);
+ functionConfig.setCleanupSubscription(true);
+ return functionConfig;
+ }
+
+ @Test(timeOut = 20000)
+ public void testPulsarFunctionState() throws Exception {
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/input";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String functionName = "PulsarFunction-test";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+ FunctionConfig functionConfig = createFunctionConfig(tenant,
namespacePortion, functionName,
+ sourceTopic, sinkTopic, subscriptionName);
+
+ String jarFilePathUrl = Utils.FILE + ":" +
getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+ admin.functions().createFunctionWithUrl(functionConfig,
jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar sink consumer has started on the topic
+
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ int totalMsgs = 5;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "foo";
+ producer.newMessage().property(propertyKey,
propertyValue).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ retryStrategically((test) -> {
+ try {
+ FunctionStats functionStat =
admin.functions().getFunctionStats(tenant, namespacePortion, functionName);
+ return functionStat.getProcessedSuccessfullyTotal() == 5;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ FunctionState state = admin.functions().getFunctionState(tenant,
namespacePortion, functionName, "foo");
+ Assert.assertEquals(state.getNumberValue().intValue(), 5);
+
+ try {
+ admin.functions().getFunctionState(tenant, namespacePortion,
functionName, "bar");
+ Assert.fail("Should have failed since key shouldn't exist");
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(),
Response.Status.NOT_FOUND.getStatusCode());
+ }
+
+ // validate pulsar-sink consumer has consumed all messages and
delivered to Pulsar sink but unacked messages
+ // due to publish failure
+
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+ totalMsgs);
+
+ // delete functions
+ admin.functions().deleteFunction(tenant, namespacePortion,
functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return
admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
+ // make sure all temp files are deleted
+ File dir = new File(System.getProperty("java.io.tmpdir"));
+ File[] foundFiles = dir.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith("function");
+ }
+ });
+
+ Assert.assertEquals(foundFiles.length, 0, "Temporary files left over:
" + Arrays.asList(foundFiles));
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0695bb4..b6652cf 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1435,6 +1435,8 @@ public abstract class ComponentImpl {
}
}
}
+ } catch (RestException e) {
+ throw e;
} catch (Exception e) {
log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
tenant, namespace, functionName, key, e);
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index be39319..373f8bc 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -349,7 +349,7 @@ public class LocalBookkeeperEnsemble {
}
}
- public void start() throws Exception {
+ public void start(boolean enableStreamStorage) throws Exception {
LOG.debug("Local ZK/BK starting ...");
ServerConfiguration conf = new ServerConfiguration();
// Use minimal configuration requiring less memory for unit tests
@@ -373,6 +373,14 @@ public class LocalBookkeeperEnsemble {
runZookeeper(1000);
initializeZookeper();
runBookies(conf);
+
+ if (enableStreamStorage) {
+ runStreamStorage(new CompositeConfiguration());
+ }
+ }
+
+ public void start() throws Exception {
+ start(false);
}
public void startStandalone() throws Exception {