sijie closed pull request #2570: Fix BC issue in functions trigger function
submitted by old CLI
URL: https://github.com/apache/pulsar/pull/2570
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 9de92269e4..d297a7899c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;
import com.google.common.annotations.VisibleForTesting;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -27,6 +28,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -111,6 +114,26 @@ public void initialize() {
}
}
+ static FunctionMetaData normalizeFunctionMetaData(FunctionMetaData fmd) {
+ if
(!fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().isEmpty())
{
+ FunctionDetails.Builder fdb =
FunctionDetails.newBuilder(fmd.getFunctionDetails());
+ for (Map.Entry<String, String> topicEntry :
fmd.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().entrySet())
{
+ fdb.getSourceBuilder().putInputSpecs(
+ topicEntry.getKey(),
+ ConsumerSpec.newBuilder()
+ .setSerdeClassName(topicEntry.getValue())
+ .setIsRegexPattern(topicEntry.getKey() ==
fmd.getFunctionDetails().getSource().getTopicsPattern())
+ .build());
+ }
+ fdb.getSourceBuilder().clearTopicsToSerDeClassName();
+ return FunctionMetaData.newBuilder(fmd)
+ .setFunctionDetails(fdb)
+ .build();
+ } else {
+ return fmd;
+ }
+ }
+
/**
* Get the function metadata for a function
* @param tenant the tenant the function belongs to
@@ -119,7 +142,7 @@ public void initialize() {
* @return FunctionMetaData that contains the function metadata
*/
public synchronized FunctionMetaData getFunctionMetaData(String tenant,
String namespace, String functionName) {
- return
this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
+ return
normalizeFunctionMetaData(this.functionMetaDataMap.get(tenant).get(namespace).get(functionName));
}
/**
@@ -130,7 +153,7 @@ public synchronized FunctionMetaData
getFunctionMetaData(String tenant, String n
List<FunctionMetaData> ret = new LinkedList<>();
for (Map<String, Map<String, FunctionMetaData>> i :
this.functionMetaDataMap.values()) {
for (Map<String, FunctionMetaData> j : i.values()) {
- ret.addAll(j.values());
+
ret.addAll(j.values().stream().map(FunctionMetaDataManager::normalizeFunctionMetaData).collect(Collectors.toList()));
}
}
return ret;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index df82c0d25c..f15d4b2c73 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -574,7 +574,6 @@ public Response triggerFunction(final String tenant, final
String namespace, fin
return getUnavailableResponse();
}
- FunctionDetails functionDetails;
// validate parameters
try {
validateTriggerRequestParams(tenant, namespace, functionName,
topic, input, uploadedInputStream);
@@ -591,6 +590,8 @@ public Response triggerFunction(final String tenant, final
String namespace, fin
.entity(new ErrorData(String.format("Function %s doesn't
exist", functionName))).build();
}
+ // function metadata will be normalized by function metadata manager
to ensure validation logic
+ // only handle with latest format.
FunctionMetaData functionMetaData =
functionMetaDataManager.getFunctionMetaData(tenant, namespace,
functionName);
@@ -598,15 +599,17 @@ public Response triggerFunction(final String tenant,
final String namespace, fin
if (topic != null) {
inputTopicToWrite = topic;
} else if
(functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 1) {
+ // the function was submitted by a newer CLI which is using input
specs
inputTopicToWrite =
functionMetaData.getFunctionDetails().getSource().getInputSpecsMap()
.keySet().iterator().next();
} else {
log.error("Function in trigger function has more than 1 input
topics @ /{}/{}/{}", tenant, namespace, functionName);
return Response.status(Status.BAD_REQUEST).build();
}
- if
(functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0
- ||
!functionMetaData.getFunctionDetails().getSource().getInputSpecsMap()
- .containsKey(inputTopicToWrite)) {
+ boolean topicIdentified =
+
functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() > 0
+ &&
functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().containsKey(inputTopicToWrite);
+ if (!topicIdentified) {
log.error("Function in trigger function has unidentified topic @
/{}/{}/{} {}", tenant, namespace, functionName, inputTopicToWrite);
return Response.status(Status.BAD_REQUEST).build();
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index ac99f9a6a4..561ec5bf4e 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -26,6 +26,10 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
@@ -37,6 +41,10 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Request;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
@@ -58,6 +66,39 @@ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
return client;
}
+ @Test
+ public void testNormalizeFunctionMetadata() throws Exception {
+ FunctionMetaData fmd = FunctionMetaData.newBuilder()
+ .setFunctionDetails(FunctionDetails.newBuilder()
+ .setSource(SourceSpec.newBuilder()
+ .setTopicsPattern("test-pattern")
+ .putTopicsToSerDeClassName("test-pattern", "class-pattern")
+ .putTopicsToSerDeClassName("test-topic-1", "class1")
+ .putTopicsToSerDeClassName("test-topic-2", "class2")
+ .build())
+ .build())
+ .build();
+
+ FunctionMetaData normalizedFmd =
FunctionMetaDataManager.normalizeFunctionMetaData(fmd);
+ SourceSpec ss = normalizedFmd.getFunctionDetails().getSource();
+ assertEquals(0, ss.getTopicsToSerDeClassNameCount());
+ assertEquals(3, ss.getInputSpecsCount());
+ ConsumerSpec cs = ss.getInputSpecsOrThrow("test-pattern");
+ assertNotNull(cs);
+ assertEquals("class-pattern", cs.getSerdeClassName());
+ assertTrue(cs.getIsRegexPattern());
+
+ cs = ss.getInputSpecsOrThrow("test-topic-1");
+ assertNotNull(cs);
+ assertEquals("class1", cs.getSerdeClassName());
+ assertFalse(cs.getIsRegexPattern());
+
+ cs = ss.getInputSpecsOrThrow("test-topic-2");
+ assertNotNull(cs);
+ assertEquals("class2", cs.getSerdeClassName());
+ assertFalse(cs.getIsRegexPattern());
+ }
+
@Test
public void testListFunctions() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
@@ -81,15 +122,15 @@ public void testListFunctions() throws
PulsarClientException {
functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1",
functionMetaDataMap1);
functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-2",
functionMetaDataInfoMap2);
- Assert.assertEquals(0, functionMetaDataManager.listFunctions(
+ assertEquals(0, functionMetaDataManager.listFunctions(
"tenant", "namespace").size());
- Assert.assertEquals(2, functionMetaDataManager.listFunctions(
+ assertEquals(2, functionMetaDataManager.listFunctions(
"tenant-1", "namespace-1").size());
Assert.assertTrue(functionMetaDataManager.listFunctions(
"tenant-1", "namespace-1").contains("func-1"));
Assert.assertTrue(functionMetaDataManager.listFunctions(
"tenant-1", "namespace-1").contains("func-2"));
- Assert.assertEquals(1, functionMetaDataManager.listFunctions(
+ assertEquals(1, functionMetaDataManager.listFunctions(
"tenant-1", "namespace-2").size());
Assert.assertTrue(functionMetaDataManager.listFunctions(
"tenant-1", "namespace-2").contains("func-3"));
@@ -279,9 +320,9 @@ public void processUpdateTest() throws
PulsarClientException {
verify(functionMetaDataManager, times(1))
.setFunctionMetaData(any(Function.FunctionMetaData.class));
verify(schedulerManager, times(1)).schedule();
- Assert.assertEquals(m1,
functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-1"));
- Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
// worker has record of function
@@ -308,7 +349,7 @@ public void processUpdateTest() throws
PulsarClientException {
.build();
functionMetaDataManager.processUpdate(serviceRequest);
- Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
+ assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
"tenant-1", "namespace-1", "func-1"));
verify(functionMetaDataManager, times(1))
.setFunctionMetaData(any(Function.FunctionMetaData.class));
@@ -324,15 +365,15 @@ public void processUpdateTest() throws
PulsarClientException {
.setWorkerId("worker-2")
.build();
functionMetaDataManager.processUpdate(serviceRequest);
- Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
+ assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
"tenant-1", "namespace-1", "func-1"));
verify(functionMetaDataManager, times(1))
.setFunctionMetaData(any(Function.FunctionMetaData.class));
verify(schedulerManager, times(0)).schedule();
- Assert.assertEquals(m1,
functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-1"));
- Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
// schedule
@@ -361,10 +402,10 @@ public void processUpdateTest() throws
PulsarClientException {
.setFunctionMetaData(any(Function.FunctionMetaData.class));
verify(schedulerManager, times(1)).schedule();
- Assert.assertEquals(m1.toBuilder().setVersion(version + 1).build(),
+ assertEquals(m1.toBuilder().setVersion(version + 1).build(),
functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-1"));
- Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
}
@@ -394,9 +435,9 @@ public void processDeregister() throws
PulsarClientException {
functionMetaDataManager.proccessDeregister(serviceRequest);
verify(schedulerManager, times(0)).schedule();
- Assert.assertEquals(test,
functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-2"));
- Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
// function exists but request outdated
@@ -419,11 +460,11 @@ public void processDeregister() throws
PulsarClientException {
functionMetaDataManager.proccessDeregister(serviceRequest);
verify(schedulerManager, times(0)).schedule();
- Assert.assertEquals(test,
functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-2"));
- Assert.assertEquals(m2,
functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(m2, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-1"));
- Assert.assertEquals(2, functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(2, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
// function deleted
@@ -451,9 +492,9 @@ public void processDeregister() throws
PulsarClientException {
functionMetaDataManager.proccessDeregister(serviceRequest);
verify(schedulerManager, times(1)).schedule();
- Assert.assertEquals(test,
functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-2"));
- Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+ assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
}
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services