rdhabalia closed pull request #2258: Derive source/sink arg-class name from 
function-class for file-url
URL: https://github.com/apache/incubator-pulsar/pull/2258
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 16f1a76597..5398bc9b15 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -21,6 +21,10 @@
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 
 import java.io.File;
 import java.lang.reflect.Method;
@@ -62,7 +66,6 @@
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
-import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -70,11 +73,9 @@
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.rest.WorkerServer;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -124,7 +125,7 @@
     public Object[][] validRoleName() {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
-    
+
     @BeforeMethod
     void setup(Method method) throws Exception {
 
@@ -147,7 +148,6 @@ void setup(Method method) throws Exception {
         config.setBrokerServicePortTls(brokerServiceTlsPort);
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
 
-
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         config.setAuthenticationEnabled(true);
@@ -156,7 +156,6 @@ void setup(Method method) throws Exception {
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config.setTlsAllowInsecureConnection(true);
-    
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         urlTls = new URL(brokerServiceUrl);
@@ -190,12 +189,12 @@ void setup(Method method) throws Exception {
                     workerConfig.getClientAuthenticationParameters());
         }
         pulsarClient = clientBuilder.build();
-       
+
         TenantInfo propAdmin = new TenantInfo();
         propAdmin.getAdminRoles().add("superUser");
         
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
-       
+
         Thread.sleep(100);
     }
 
@@ -237,7 +236,7 @@ private WorkerService 
createPulsarFunctionWorker(ServiceConfiguration config) {
         workerConfig.setUseTls(true);
         workerConfig.setTlsAllowInsecureConnection(true);
         workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
-        
+
         workerConfig.setAuthenticationEnabled(true);
         workerConfig.setAuthorizationEnabled(true);
 
@@ -285,7 +284,7 @@ public void testE2EPulsarSink() throws Exception {
             }
         }, 5, 150);
         // validate pulsar sink consumer has started on the topic
-        
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 
1);
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
         int totalMsgs = 5;
         for (int i = 0; i < totalMsgs; i++) {
@@ -303,17 +302,15 @@ public void testE2EPulsarSink() throws Exception {
 
         Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
         String receivedPropertyValue = msg.getProperty(propertyKey);
-        Assert.assertEquals(propertyValue, receivedPropertyValue);
+        assertEquals(propertyValue, receivedPropertyValue);
 
         // validate pulsar-sink consumer has consumed all messages and 
delivered to Pulsar sink but unacked messages
         // due to publish failure
-        Assert.assertNotEquals(
-                
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+        
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
                 totalMsgs);
 
     }
 
-    
     @Test(timeOut = 20000)
     public void testPulsarSinkStats() throws Exception {
 
@@ -349,7 +346,7 @@ public void testPulsarSinkStats() throws Exception {
             }
         }, 5, 150);
         // validate pulsar sink consumer has started on the topic
-        
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 
1);
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
 
         int totalMsgs = 10;
         for (int i = 0; i < totalMsgs; i++) {
@@ -371,7 +368,7 @@ public void testPulsarSinkStats() throws Exception {
                 functionName);
 
         int numInstances = functionStats.getFunctionStatusListCount();
-        Assert.assertEquals(numInstances, 1);
+        assertEquals(numInstances, 1);
 
         FunctionStatus stats = 
functionStats.getFunctionStatusListList().get(0);
         Map<String, DataDigest> metricsData = 
stats.getMetrics().getMetricsMap();
@@ -379,12 +376,13 @@ public void testPulsarSinkStats() throws Exception {
         double count = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
         double success = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
         String ownerWorkerId = stats.getWorkerId();
-        Assert.assertEquals((int) count, totalMsgs);
-        Assert.assertEquals((int) success, totalMsgs);
-        Assert.assertEquals(ownerWorkerId, workerId);
+        assertEquals((int) count, totalMsgs);
+        assertEquals((int) success, totalMsgs);
+        assertEquals(ownerWorkerId, workerId);
     }
 
-    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic, String 
subscriptionName) {
+    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName,
+            String sinkTopic, String subscriptionName) {
 
         File file = new File(jarFile);
         try {
@@ -407,7 +405,7 @@ protected FunctionDetails createSinkConfig(String jarFile, 
String tenant, String
         // source spec classname should be empty so that the default pulsar 
source will be used
         SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
         
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
-        sourceSpecBuilder.setTypeClassName(byte[].class.getName());
+        sourceSpecBuilder.setTypeClassName(typeArg.getName());
         sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
         sourceSpecBuilder.setSubscriptionName(subscriptionName);
         sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, 
DefaultSerDe.class.getName());
@@ -425,7 +423,7 @@ protected FunctionDetails createSinkConfig(String jarFile, 
String tenant, String
 
         return functionDetailsBuilder.build();
     }
-    
+
     @Test(dataProvider = "validRoleName")
     public void testAuthorization(boolean validRoleName) throws Exception {
 
@@ -450,9 +448,62 @@ public void testAuthorization(boolean validRoleName) 
throws Exception {
                 sinkTopic, subscriptionName);
         try {
             admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
-            Assert.assertTrue(validRoleName);
+            assertTrue(validRoleName);
         } catch 
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) 
{
-            Assert.assertFalse(validRoleName);
+            assertFalse(validRoleName);
         }
     }
+
+    /**
+     * Test to verify: function-server loads jar using file-url and derives 
type-args classes if not provided
+     * @throws Exception
+     */
+    @Test(timeOut = 20000)
+    public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String functionName = "PulsarSink-test";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
IdentityFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+        FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+        functionDetailsBuilder.setTenant(tenant);
+        functionDetailsBuilder.setNamespace(namespacePortion);
+        functionDetailsBuilder.setName(functionName);
+        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+        functionDetailsBuilder.setParallelism(1);
+        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+
+        Class<?>[] typeArgs = 
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(new 
IdentityFunction(), false);
+
+        // set source spec
+        // source spec classname should be empty so that the default pulsar 
source will be used
+        SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, 
DefaultSerDe.class.getName());
+        functionDetailsBuilder.setAutoAck(true);
+        functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+        // set up sink spec
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+        sinkSpecBuilder.setTopic(sinkTopic);
+        Map<String, Object> sinkConfigMap = Maps.newHashMap();
+        sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+        FunctionDetails functionDetails = functionDetailsBuilder.build();
+        admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+
+        FunctionDetails functionMetadata = 
admin.functions().getFunction(tenant, namespacePortion, functionName);
+
+        assertEquals(functionMetadata.getSource().getTypeClassName(), 
typeArgs[0].getName());
+        assertEquals(functionMetadata.getSink().getTypeClassName(), 
typeArgs[1].getName());
+
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 0c25be29ca..94c315df3f 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -102,16 +102,21 @@ public static int findAvailablePort() {
     }
 
     public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) {
-
-        Object userClass = createInstance(functionConfig.getClassName(), 
Thread.currentThread().getContextClassLoader());
+        Object userClass = createInstance(functionConfig.getClassName(),
+                Thread.currentThread().getContextClassLoader());
+        boolean isWindowConfigPresent = functionConfig.getWindowConfig() != 
null;
+        return getFunctionTypes(userClass, isWindowConfigPresent);
+    }
+    
+    public static Class<?>[] getFunctionTypes(Object userClass, boolean 
isWindowConfigPresent) {
 
         Class<?>[] typeArgs;
         // if window function
-        if (functionConfig.getWindowConfig() != null) {
+        if (isWindowConfigPresent) {
             java.util.function.Function function = 
(java.util.function.Function) userClass;
             if (function == null) {
-                throw new IllegalArgumentException(String.format("The Java 
util function class %s could not be instantiated",
-                        functionConfig.getClassName()));
+                throw new IllegalArgumentException(
+                        String.format("The Java util function class %s could 
not be instantiated", userClass));
             }
             typeArgs = 
TypeResolver.resolveRawArguments(java.util.function.Function.class, 
function.getClass());
             if (!typeArgs[0].equals(Collection.class)) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 9b0ad98b27..989d09026b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -56,7 +56,10 @@
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.join;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -139,7 +142,7 @@ public Response registerFunction(final String tenant, final 
String namespace, fi
         }
 
         FunctionDetails functionDetails;
-        boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
+        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         // validate parameters
         try {
             if (isPkgUrlProvided) {
@@ -203,7 +206,7 @@ public Response updateFunction(final String tenant, final 
String namespace, fina
         }
         
         FunctionDetails functionDetails;
-        boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl);
+        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         // validate parameters
         try {
             if (isPkgUrlProvided) {
@@ -756,14 +759,14 @@ private FunctionDetails 
validateUpdateRequestParams(String tenant, String namesp
     private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) {
         if (functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
-            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+            if (!isEmpty(sourceSpec.getBuiltin())) {
                 return true;
             }
         }
 
         if (functionDetails.hasSink()) {
             SinkSpec sinkSpec = functionDetails.getSink();
-            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+            if (!isEmpty(sinkSpec.getBuiltin())) {
                 return true;
             }
         }
@@ -774,14 +777,14 @@ private boolean isFunctionCodeBuiltin(FunctionDetails 
functionDetails) {
     private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
         if (functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
-            if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+            if (!isEmpty(sourceSpec.getBuiltin())) {
                 return sourceSpec.getBuiltin();
             }
         }
 
         if (functionDetails.hasSink()) {
             SinkSpec sinkSpec = functionDetails.getSink();
-            if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
+            if (!isEmpty(sinkSpec.getBuiltin())) {
                 return sinkSpec.getBuiltin();
             }
         }
@@ -833,7 +836,7 @@ private FunctionDetails validateUpdateRequestParams(String 
tenant, String namesp
                 missingFields.add("Sink");
             }
             if (!missingFields.isEmpty()) {
-                String errorMessage = StringUtils.join(missingFields, ",");
+                String errorMessage = join(missingFields, ",");
                 throw new IllegalArgumentException(errorMessage + " is not 
provided");
             }
             if (functionDetails.getParallelism() <= 0) {
@@ -855,7 +858,7 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
             return;
         }
 
-        if (StringUtils.isBlank(functionDetailsBuilder.getClassName())) {
+        if (isBlank(functionDetailsBuilder.getClassName())) {
             throw new IllegalArgumentException("function class-name can't be 
empty");
         }
 
@@ -865,13 +868,15 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
 
         // validate function class-type
         Object functionObject = 
createInstance(functionDetailsBuilder.getClassName(), classLoader);
+        Class<?>[] typeArgs = 
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
+        
         if (!(functionObject instanceof 
org.apache.pulsar.functions.api.Function)
                 && !(functionObject instanceof java.util.function.Function)) {
             throw new RuntimeException("User class must either be Function or 
java.util.Function");
         }
-
+        
         if (functionDetailsBuilder.hasSource() && 
functionDetailsBuilder.getSource() != null
-                && 
StringUtils.isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
+                && 
isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
             try {
                 String sourceClassName = 
functionDetailsBuilder.getSource().getClassName();
                 String argClassName = getTypeArg(sourceClassName, 
Source.class, classLoader).getName();
@@ -880,7 +885,7 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
 
                 // if sink-class not present then set same arg as source
                 if (!functionDetailsBuilder.hasSink()
-                        || 
StringUtils.isBlank(functionDetailsBuilder.getSink().getClassName())) {
+                        || 
isBlank(functionDetailsBuilder.getSink().getClassName())) {
                     functionDetailsBuilder
                             
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
                 }
@@ -891,10 +896,14 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
                 log.error("Failed to validate source class", e);
                 throw new IllegalArgumentException("Failed to validate source 
class-name", e);
             }
+        } else if 
(isBlank(functionDetailsBuilder.getSourceBuilder().getTypeClassName())) {
+            // if function-src-class is not present then set function-src 
type-class according to function class
+            functionDetailsBuilder
+                    
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName()));
         }
 
         if (functionDetailsBuilder.hasSink() && 
functionDetailsBuilder.getSink() != null
-                && 
StringUtils.isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
+                && 
isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
             try {
                 String sinkClassName = 
functionDetailsBuilder.getSink().getClassName();
                 String argClassName = getTypeArg(sinkClassName, Sink.class, 
classLoader).getName();
@@ -902,7 +911,7 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
 
                 // if source-class not present then set same arg as sink
                 if (!functionDetailsBuilder.hasSource()
-                        || 
StringUtils.isBlank(functionDetailsBuilder.getSource().getClassName())) {
+                        || 
isBlank(functionDetailsBuilder.getSource().getClassName())) {
                     functionDetailsBuilder
                             
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
                 }
@@ -913,7 +922,12 @@ private void validateFunctionClassTypes(File jarFile, 
FunctionDetails.Builder fu
                 log.error("Failed to validate sink class", e);
                 throw new IllegalArgumentException("Failed to validate sink 
class-name", e);
             }
+        } else 
if(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())){
+            // if function-sink-class is not present then set function-sink 
type-class according to function class
+            functionDetailsBuilder
+                    
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
         }
+
     }
 
     private Class<?> getTypeArg(String className, Class<?> funClass, 
URLClassLoader classLoader)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to