This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a59f988 Derive source/sink arg-class name from function-class for
file-url (#2258)
a59f988 is described below
commit a59f9884865d7430b1bc762444207358b16618a5
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Jul 31 11:32:49 2018 -0700
Derive source/sink arg-class name from function-class for file-url (#2258)
* Derive source/sink arg-class name from functio-class for file-url archive
* fix set type-arg if src/sink arg-class is not set
* add unit-test
---
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 99 ++++++++++++++++------
.../org/apache/pulsar/functions/utils/Utils.java | 15 ++--
.../functions/worker/rest/api/FunctionsImpl.java | 42 ++++++---
3 files changed, 113 insertions(+), 43 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 16f1a76..5398bc9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -21,6 +21,10 @@ package org.apache.pulsar.io;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
import java.io.File;
import java.lang.reflect.Method;
@@ -62,7 +66,6 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
-import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.Reflections;
@@ -70,11 +73,9 @@ import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -124,7 +125,7 @@ public class PulsarSinkE2ETest {
public Object[][] validRoleName() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
-
+
@BeforeMethod
void setup(Method method) throws Exception {
@@ -147,7 +148,6 @@ public class PulsarSinkE2ETest {
config.setBrokerServicePortTls(brokerServiceTlsPort);
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
-
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
@@ -156,7 +156,6 @@ public class PulsarSinkE2ETest {
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
-
functionsWorkerService = createPulsarFunctionWorker(config);
urlTls = new URL(brokerServiceUrl);
@@ -190,12 +189,12 @@ public class PulsarSinkE2ETest {
workerConfig.getClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();
-
+
TenantInfo propAdmin = new TenantInfo();
propAdmin.getAdminRoles().add("superUser");
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
admin.tenants().updateTenant(tenant, propAdmin);
-
+
Thread.sleep(100);
}
@@ -237,7 +236,7 @@ public class PulsarSinkE2ETest {
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
-
+
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);
@@ -285,7 +284,7 @@ public class PulsarSinkE2ETest {
}
}, 5, 150);
// validate pulsar sink consumer has started on the topic
-
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(),
1);
+
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
int totalMsgs = 5;
for (int i = 0; i < totalMsgs; i++) {
@@ -303,17 +302,15 @@ public class PulsarSinkE2ETest {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedPropertyValue = msg.getProperty(propertyKey);
- Assert.assertEquals(propertyValue, receivedPropertyValue);
+ assertEquals(propertyValue, receivedPropertyValue);
// validate pulsar-sink consumer has consumed all messages and
delivered to Pulsar sink but unacked messages
// due to publish failure
- Assert.assertNotEquals(
-
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
totalMsgs);
}
-
@Test(timeOut = 20000)
public void testPulsarSinkStats() throws Exception {
@@ -349,7 +346,7 @@ public class PulsarSinkE2ETest {
}
}, 5, 150);
// validate pulsar sink consumer has started on the topic
-
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(),
1);
+
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
@@ -371,7 +368,7 @@ public class PulsarSinkE2ETest {
functionName);
int numInstances = functionStats.getFunctionStatusListCount();
- Assert.assertEquals(numInstances, 1);
+ assertEquals(numInstances, 1);
FunctionStatus stats =
functionStats.getFunctionStatusListList().get(0);
Map<String, DataDigest> metricsData =
stats.getMetrics().getMetricsMap();
@@ -379,12 +376,13 @@ public class PulsarSinkE2ETest {
double count =
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
double success =
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
String ownerWorkerId = stats.getWorkerId();
- Assert.assertEquals((int) count, totalMsgs);
- Assert.assertEquals((int) success, totalMsgs);
- Assert.assertEquals(ownerWorkerId, workerId);
+ assertEquals((int) count, totalMsgs);
+ assertEquals((int) success, totalMsgs);
+ assertEquals(ownerWorkerId, workerId);
}
- protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String functionName, String sinkTopic, String
subscriptionName) {
+ protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String functionName,
+ String sinkTopic, String subscriptionName) {
File file = new File(jarFile);
try {
@@ -407,7 +405,7 @@ public class PulsarSinkE2ETest {
// source spec classname should be empty so that the default pulsar
source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
- sourceSpecBuilder.setTypeClassName(byte[].class.getName());
+ sourceSpecBuilder.setTypeClassName(typeArg.getName());
sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
sourceSpecBuilder.setSubscriptionName(subscriptionName);
sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern,
DefaultSerDe.class.getName());
@@ -425,7 +423,7 @@ public class PulsarSinkE2ETest {
return functionDetailsBuilder.build();
}
-
+
@Test(dataProvider = "validRoleName")
public void testAuthorization(boolean validRoleName) throws Exception {
@@ -450,9 +448,62 @@ public class PulsarSinkE2ETest {
sinkTopic, subscriptionName);
try {
admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
- Assert.assertTrue(validRoleName);
+ assertTrue(validRoleName);
} catch
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne)
{
- Assert.assertFalse(validRoleName);
+ assertFalse(validRoleName);
}
}
+
+ /**
+ * Test to verify: function-server loads jar using file-url and derives
type-args classes if not provided
+ * @throws Exception
+ */
+ @Test(timeOut = 20000)
+ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String functionName = "PulsarSink-test";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ String jarFilePathUrl = Utils.FILE + ":"
+ +
IdentityFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+ FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder();
+ functionDetailsBuilder.setTenant(tenant);
+ functionDetailsBuilder.setNamespace(namespacePortion);
+ functionDetailsBuilder.setName(functionName);
+ functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+ functionDetailsBuilder.setParallelism(1);
+ functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+
+ Class<?>[] typeArgs =
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(new
IdentityFunction(), false);
+
+ // set source spec
+ // source spec classname should be empty so that the default pulsar
source will be used
+ SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+ sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic,
DefaultSerDe.class.getName());
+ functionDetailsBuilder.setAutoAck(true);
+ functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+ // set up sink spec
+ SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+ sinkSpecBuilder.setTopic(sinkTopic);
+ Map<String, Object> sinkConfigMap = Maps.newHashMap();
+ sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
+ functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+ FunctionDetails functionDetails = functionDetailsBuilder.build();
+ admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
+
+ FunctionDetails functionMetadata =
admin.functions().getFunction(tenant, namespacePortion, functionName);
+
+ assertEquals(functionMetadata.getSource().getTypeClassName(),
typeArgs[0].getName());
+ assertEquals(functionMetadata.getSink().getTypeClassName(),
typeArgs[1].getName());
+
+ }
}
\ No newline at end of file
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 0c25be2..94c315d 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -102,16 +102,21 @@ public class Utils {
}
public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) {
-
- Object userClass = createInstance(functionConfig.getClassName(),
Thread.currentThread().getContextClassLoader());
+ Object userClass = createInstance(functionConfig.getClassName(),
+ Thread.currentThread().getContextClassLoader());
+ boolean isWindowConfigPresent = functionConfig.getWindowConfig() !=
null;
+ return getFunctionTypes(userClass, isWindowConfigPresent);
+ }
+
+ public static Class<?>[] getFunctionTypes(Object userClass, boolean
isWindowConfigPresent) {
Class<?>[] typeArgs;
// if window function
- if (functionConfig.getWindowConfig() != null) {
+ if (isWindowConfigPresent) {
java.util.function.Function function =
(java.util.function.Function) userClass;
if (function == null) {
- throw new IllegalArgumentException(String.format("The Java
util function class %s could not be instantiated",
- functionConfig.getClassName()));
+ throw new IllegalArgumentException(
+ String.format("The Java util function class %s could
not be instantiated", userClass));
}
typeArgs =
TypeResolver.resolveRawArguments(java.util.function.Function.class,
function.getClass());
if (!typeArgs[0].equals(Collection.class)) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index e590b0e..50c09a0 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -56,7 +56,10 @@ import javax.ws.rs.core.StreamingOutput;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.join;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -139,7 +142,7 @@ public class FunctionsImpl {
}
FunctionDetails functionDetails;
- boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
+ boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
// validate parameters
try {
if (isPkgUrlProvided) {
@@ -203,7 +206,7 @@ public class FunctionsImpl {
}
FunctionDetails functionDetails;
- boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
+ boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
// validate parameters
try {
if (isPkgUrlProvided) {
@@ -745,14 +748,14 @@ public class FunctionsImpl {
private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) {
if (functionDetails.hasSource()) {
SourceSpec sourceSpec = functionDetails.getSource();
- if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+ if (!isEmpty(sourceSpec.getBuiltin())) {
return true;
}
}
if (functionDetails.hasSink()) {
SinkSpec sinkSpec = functionDetails.getSink();
- if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+ if (!isEmpty(sinkSpec.getBuiltin())) {
return true;
}
}
@@ -763,14 +766,14 @@ public class FunctionsImpl {
private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
if (functionDetails.hasSource()) {
SourceSpec sourceSpec = functionDetails.getSource();
- if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+ if (!isEmpty(sourceSpec.getBuiltin())) {
return sourceSpec.getBuiltin();
}
}
if (functionDetails.hasSink()) {
SinkSpec sinkSpec = functionDetails.getSink();
- if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+ if (!isEmpty(sinkSpec.getBuiltin())) {
return sinkSpec.getBuiltin();
}
}
@@ -822,7 +825,7 @@ public class FunctionsImpl {
missingFields.add("Sink");
}
if (!missingFields.isEmpty()) {
- String errorMessage = StringUtils.join(missingFields, ",");
+ String errorMessage = join(missingFields, ",");
throw new IllegalArgumentException(errorMessage + " is not
provided");
}
if (functionDetails.getParallelism() <= 0) {
@@ -844,7 +847,7 @@ public class FunctionsImpl {
return;
}
- if (StringUtils.isBlank(functionDetailsBuilder.getClassName())) {
+ if (isBlank(functionDetailsBuilder.getClassName())) {
throw new IllegalArgumentException("function class-name can't be
empty");
}
@@ -854,13 +857,15 @@ public class FunctionsImpl {
// validate function class-type
Object functionObject =
createInstance(functionDetailsBuilder.getClassName(), classLoader);
+ Class<?>[] typeArgs =
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
+
if (!(functionObject instanceof
org.apache.pulsar.functions.api.Function)
&& !(functionObject instanceof java.util.function.Function)) {
throw new RuntimeException("User class must either be Function or
java.util.Function");
}
-
+
if (functionDetailsBuilder.hasSource() &&
functionDetailsBuilder.getSource() != null
- &&
StringUtils.isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
+ &&
isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
try {
String sourceClassName =
functionDetailsBuilder.getSource().getClassName();
String argClassName = getTypeArg(sourceClassName,
Source.class, classLoader).getName();
@@ -869,7 +874,7 @@ public class FunctionsImpl {
// if sink-class not present then set same arg as source
if (!functionDetailsBuilder.hasSink()
- ||
StringUtils.isBlank(functionDetailsBuilder.getSink().getClassName())) {
+ ||
isBlank(functionDetailsBuilder.getSink().getClassName())) {
functionDetailsBuilder
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
}
@@ -880,10 +885,14 @@ public class FunctionsImpl {
log.error("Failed to validate source class", e);
throw new IllegalArgumentException("Failed to validate source
class-name", e);
}
+ } else if
(isBlank(functionDetailsBuilder.getSourceBuilder().getTypeClassName())) {
+ // if function-src-class is not present then set function-src
type-class according to function class
+ functionDetailsBuilder
+
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName()));
}
if (functionDetailsBuilder.hasSink() &&
functionDetailsBuilder.getSink() != null
- &&
StringUtils.isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
+ &&
isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
try {
String sinkClassName =
functionDetailsBuilder.getSink().getClassName();
String argClassName = getTypeArg(sinkClassName, Sink.class,
classLoader).getName();
@@ -891,7 +900,7 @@ public class FunctionsImpl {
// if source-class not present then set same arg as sink
if (!functionDetailsBuilder.hasSource()
- ||
StringUtils.isBlank(functionDetailsBuilder.getSource().getClassName())) {
+ ||
isBlank(functionDetailsBuilder.getSource().getClassName())) {
functionDetailsBuilder
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
}
@@ -902,7 +911,12 @@ public class FunctionsImpl {
log.error("Failed to validate sink class", e);
throw new IllegalArgumentException("Failed to validate sink
class-name", e);
}
+ } else
if(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())){
+ // if function-sink-class is not present then set function-sink
type-class according to function class
+ functionDetailsBuilder
+
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
}
+
}
private Class<?> getTypeArg(String className, Class<?> funClass,
URLClassLoader classLoader)