sijie closed pull request #2570: Fix BC issue in functions trigger function 
submitted by old CLI
URL: https://github.com/apache/pulsar/pull/2570
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 9de92269e4..d297a7899c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -27,6 +28,8 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -111,6 +114,26 @@ public void initialize() {
         }
     }
 
+    static FunctionMetaData normalizeFunctionMetaData(FunctionMetaData fmd) {
+        if 
(!fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().isEmpty())
 {
+            FunctionDetails.Builder fdb = 
FunctionDetails.newBuilder(fmd.getFunctionDetails());
+            for (Map.Entry<String, String> topicEntry : 
fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().entrySet()) 
{
+                fdb.getSourceBuilder().putInputSpecs(
+                    topicEntry.getKey(),
+                    ConsumerSpec.newBuilder()
+                        .setSerdeClassName(topicEntry.getValue())
+                        .setIsRegexPattern(topicEntry.getKey() == 
fmd.getFunctionDetails().getSource().getTopicsPattern())
+                        .build());
+            }
+            fdb.getSourceBuilder().clearTopicsToSerDeClassName();
+            return FunctionMetaData.newBuilder(fmd)
+                .setFunctionDetails(fdb)
+                .build();
+        } else {
+            return fmd;
+        }
+    }
+
     /**
      * Get the function metadata for a function
      * @param tenant the tenant the function belongs to
@@ -119,7 +142,7 @@ public void initialize() {
      * @return FunctionMetaData that contains the function metadata
      */
     public synchronized FunctionMetaData getFunctionMetaData(String tenant, 
String namespace, String functionName) {
-        return 
this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
+        return 
normalizeFunctionMetaData(this.functionMetaDataMap.get(tenant).get(namespace).get(functionName));
     }
 
     /**
@@ -130,7 +153,7 @@ public synchronized FunctionMetaData 
getFunctionMetaData(String tenant, String n
         List<FunctionMetaData> ret = new LinkedList<>();
         for (Map<String, Map<String, FunctionMetaData>> i : 
this.functionMetaDataMap.values()) {
             for (Map<String, FunctionMetaData> j : i.values()) {
-                ret.addAll(j.values());
+                
ret.addAll(j.values().stream().map(FunctionMetaDataManager::normalizeFunctionMetaData).collect(Collectors.toList()));
             }
         }
         return ret;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index df82c0d25c..f15d4b2c73 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -574,7 +574,6 @@ public Response triggerFunction(final String tenant, final 
String namespace, fin
             return getUnavailableResponse();
         }
 
-        FunctionDetails functionDetails;
         // validate parameters
         try {
             validateTriggerRequestParams(tenant, namespace, functionName, 
topic, input, uploadedInputStream);
@@ -591,6 +590,8 @@ public Response triggerFunction(final String tenant, final 
String namespace, fin
                     .entity(new ErrorData(String.format("Function %s doesn't 
exist", functionName))).build();
         }
 
+        // function metadata will be normalized by function metadata manager 
to ensure validation logic
+        // only handle with latest format.
         FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace,
                 functionName);
 
@@ -598,15 +599,17 @@ public Response triggerFunction(final String tenant, 
final String namespace, fin
         if (topic != null) {
             inputTopicToWrite = topic;
         } else if 
(functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 1) {
+            // the function was submitted by a newer CLI which is using input 
specs
             inputTopicToWrite = 
functionMetaData.getFunctionDetails().getSource().getInputSpecsMap()
                     .keySet().iterator().next();
         } else {
             log.error("Function in trigger function has more than 1 input 
topics @ /{}/{}/{}", tenant, namespace, functionName);
             return Response.status(Status.BAD_REQUEST).build();
         }
-        if 
(functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0
-                || 
!functionMetaData.getFunctionDetails().getSource().getInputSpecsMap()
-                        .containsKey(inputTopicToWrite)) {
+        boolean topicIdentified =
+            
functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() > 0
+                && 
functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().containsKey(inputTopicToWrite);
+        if (!topicIdentified) {
             log.error("Function in trigger function has unidentified topic @ 
/{}/{}/{} {}", tenant, namespace, functionName, inputTopicToWrite);
 
             return Response.status(Status.BAD_REQUEST).build();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index ac99f9a6a4..561ec5bf4e 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -26,6 +26,10 @@
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,6 +41,10 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Request;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
@@ -58,6 +66,39 @@ private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
         return client;
     }
 
+    @Test
+    public void testNormalizeFunctionMetadata() throws Exception {
+        FunctionMetaData fmd = FunctionMetaData.newBuilder()
+            .setFunctionDetails(FunctionDetails.newBuilder()
+                .setSource(SourceSpec.newBuilder()
+                    .setTopicsPattern("test-pattern")
+                    .putTopicsToSerDeClassName("test-pattern", "class-pattern")
+                    .putTopicsToSerDeClassName("test-topic-1", "class1")
+                    .putTopicsToSerDeClassName("test-topic-2", "class2")
+                    .build())
+                .build())
+            .build();
+
+        FunctionMetaData normalizedFmd = 
FunctionMetaDataManager.normalizeFunctionMetaData(fmd);
+        SourceSpec ss = normalizedFmd.getFunctionDetails().getSource();
+        assertEquals(0, ss.getTopicsToSerDeClassNameCount());
+        assertEquals(3, ss.getInputSpecsCount());
+        ConsumerSpec cs = ss.getInputSpecsOrThrow("test-pattern");
+        assertNotNull(cs);
+        assertEquals("class-pattern", cs.getSerdeClassName());
+        assertTrue(cs.getIsRegexPattern());
+
+        cs = ss.getInputSpecsOrThrow("test-topic-1");
+        assertNotNull(cs);
+        assertEquals("class1", cs.getSerdeClassName());
+        assertFalse(cs.getIsRegexPattern());
+
+        cs = ss.getInputSpecsOrThrow("test-topic-2");
+        assertNotNull(cs);
+        assertEquals("class2", cs.getSerdeClassName());
+        assertFalse(cs.getIsRegexPattern());
+    }
+
     @Test
     public void testListFunctions() throws PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
@@ -81,15 +122,15 @@ public void testListFunctions() throws 
PulsarClientException {
         
functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", 
functionMetaDataMap1);
         
functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-2", 
functionMetaDataInfoMap2);
 
-        Assert.assertEquals(0, functionMetaDataManager.listFunctions(
+        assertEquals(0, functionMetaDataManager.listFunctions(
                 "tenant", "namespace").size());
-        Assert.assertEquals(2, functionMetaDataManager.listFunctions(
+        assertEquals(2, functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-1").size());
         Assert.assertTrue(functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-1").contains("func-1"));
         Assert.assertTrue(functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-1").contains("func-2"));
-        Assert.assertEquals(1, functionMetaDataManager.listFunctions(
+        assertEquals(1, functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-2").size());
         Assert.assertTrue(functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-2").contains("func-3"));
@@ -279,9 +320,9 @@ public void processUpdateTest() throws 
PulsarClientException {
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(1)).schedule();
-        Assert.assertEquals(m1, 
functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
         // worker has record of function
@@ -308,7 +349,7 @@ public void processUpdateTest() throws 
PulsarClientException {
                 .build();
         functionMetaDataManager.processUpdate(serviceRequest);
 
-        Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
+        assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
                 "tenant-1", "namespace-1", "func-1"));
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
@@ -324,15 +365,15 @@ public void processUpdateTest() throws 
PulsarClientException {
                 .setWorkerId("worker-2")
                 .build();
         functionMetaDataManager.processUpdate(serviceRequest);
-        Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
+        assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
                 "tenant-1", "namespace-1", "func-1"));
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(0)).schedule();
 
-        Assert.assertEquals(m1, 
functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
         // schedule
@@ -361,10 +402,10 @@ public void processUpdateTest() throws 
PulsarClientException {
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(1)).schedule();
 
-        Assert.assertEquals(m1.toBuilder().setVersion(version + 1).build(),
+        assertEquals(m1.toBuilder().setVersion(version + 1).build(),
                 functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
     }
 
@@ -394,9 +435,9 @@ public void processDeregister() throws 
PulsarClientException {
         functionMetaDataManager.proccessDeregister(serviceRequest);
 
         verify(schedulerManager, times(0)).schedule();
-        Assert.assertEquals(test, 
functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-2"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
         // function exists but request outdated
@@ -419,11 +460,11 @@ public void processDeregister() throws 
PulsarClientException {
         functionMetaDataManager.proccessDeregister(serviceRequest);
         verify(schedulerManager, times(0)).schedule();
 
-        Assert.assertEquals(test, 
functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-2"));
-        Assert.assertEquals(m2, 
functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(m2, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-1"));
-        Assert.assertEquals(2, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(2, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
 
         // function deleted
@@ -451,9 +492,9 @@ public void processDeregister() throws 
PulsarClientException {
         functionMetaDataManager.proccessDeregister(serviceRequest);
         verify(schedulerManager, times(1)).schedule();
 
-        Assert.assertEquals(test, 
functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").get("func-2"));
-        Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+        assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
                 "tenant-1").get("namespace-1").size());
     }
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to