This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 9630608ff1f [fix][fn] Throw 404 RestException when state key not found
(#21921)
9630608ff1f is described below
commit 9630608ff1fd726cbd124a2e65a1a50c3802b5dc
Author: jiangpengcheng <[email protected]>
AuthorDate: Fri Jan 19 11:10:57 2024 +0800
[fix][fn] Throw 404 RestException when state key not found (#21921)
---
.../functions/worker/rest/api/ComponentImpl.java | 2 ++
.../tests/integration/functions/PulsarStateTest.java | 18 ++++++++++++++++++
2 files changed, 20 insertions(+)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index b175a7f275e..613158aef44 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1169,6 +1169,8 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
} else {
return new FunctionState(key, null, buf.array(), number, null);
}
+ } catch (RestException e) {
+ throw e;
} catch (Throwable e) {
log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
tenant, namespace, functionName, key, e);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 8472ed3db2c..5e80c3ebd54 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import com.google.common.base.Utf8;
import java.util.Base64;
@@ -32,6 +33,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -144,6 +146,14 @@ public class PulsarStateTest extends
PulsarStandaloneTestSuite {
assertEquals(functionState.getStringValue(), "val1");
}
+ // query a non-exist key should get a 404 error
+ {
+ PulsarAdminException e =
expectThrows(PulsarAdminException.class, () -> {
+ admin.functions().getFunctionState("public", "default",
sourceName, "non-exist");
+ });
+ assertEquals(e.getStatusCode(), 404);
+ }
+
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
FunctionState functionState =
admin.functions().getFunctionState("public", "default", sourceName, "now");
assertTrue(functionState.getStringValue().matches("val1-.*"));
@@ -186,6 +196,14 @@ public class PulsarStateTest extends
PulsarStandaloneTestSuite {
assertEquals(functionState.getStringValue(), "val1");
}
+ // query a non-exist key should get a 404 error
+ {
+ PulsarAdminException e =
expectThrows(PulsarAdminException.class, () -> {
+ admin.functions().getFunctionState("public", "default",
sinkName, "non-exist");
+ });
+ assertEquals(e.getStatusCode(), 404);
+ }
+
for (int i = 0; i < numMessages; i++) {
producer.send("foo");
}