This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 80257637213 [fix][fn] Add missing `version` field back to `querystate`
API (#21966)
80257637213 is described below
commit 802576372132617b5076a44004846f2dbabede08
Author: jiangpengcheng <[email protected]>
AuthorDate: Sat Jan 27 19:33:31 2024 +0800
[fix][fn] Add missing `version` field back to `querystate` API (#21966)
---
.../functions/api/state/ByteBufferStateStore.java | 27 +++++++++++++++++++
.../pulsar/functions/api/state/StateValue.java | 30 ++++++++++++++++++++++
.../src/main/resources/findbugsExclude.xml | 9 +++++++
.../functions/instance/state/BKStateStoreImpl.java | 30 ++++++++++++++++++++++
.../state/PulsarMetadataStateStoreImpl.java | 15 +++++++++++
.../instance/state/BKStateStoreImplTest.java | 26 +++++++++++++++++++
.../state/PulsarMetadataStateStoreImplTest.java | 5 ++++
.../functions/worker/rest/api/ComponentImpl.java | 25 +++++++++++-------
.../integration/functions/PulsarStateTest.java | 11 +++++---
9 files changed, 165 insertions(+), 13 deletions(-)
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
index 8dbd7b322a5..d938fe0c82b 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
@@ -73,4 +73,31 @@ public interface ByteBufferStateStore extends StateStore {
*/
CompletableFuture<ByteBuffer> getAsync(String key);
+ /**
+ * Retrieve the StateValue for the key.
+ *
+ * @param key name of the key
+ * @return the StateValue.
+ */
+ default StateValue getStateValue(String key) {
+ return getStateValueAsync(key).join();
+ }
+
+ /**
+ * Retrieve the StateValue for the key, but don't wait for the operation
to be completed.
+ *
+ * @param key name of the key
+ * @return the StateValue.
+ */
+ default CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return getAsync(key).thenApply(val -> {
+ if (val != null && val.remaining() >= 0) {
+ byte[] data = new byte[val.remaining()];
+ val.get(data);
+ return new StateValue(data, null, null);
+ } else {
+ return null;
+ }
+ });
+ }
}
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
new file mode 100644
index 00000000000..ce06b54a6e4
--- /dev/null
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public class StateValue {
+ private final byte[] value;
+ private final Long version;
+ private final Boolean isNumber;
+}
\ No newline at end of file
diff --git a/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
b/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
index 9638cfcca8d..d593536d467 100644
--- a/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
+++ b/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
@@ -29,6 +29,11 @@
<Method name="getSchema"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.functions.api.state.StateValue"/>
+ <Method name="getValue"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
<Match>
<Class
name="org.apache.pulsar.functions.api.utils.FunctionRecord$FunctionRecordBuilder"/>
<Method name="properties"/>
@@ -39,4 +44,8 @@
<Method name="schema"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.functions.api.state.StateValue"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
</FindBugsFilter>
\ No newline at end of file
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
index bf43f18b175..d85e4afd762 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.functions.utils.FunctionCommon;
/**
@@ -190,4 +191,33 @@ public class BKStateStoreImpl implements DefaultStateStore
{
throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
}
}
+
+ @Override
+ public StateValue getStateValue(String key) {
+ try {
+ return result(getStateValueAsync(key));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return
table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+ data -> {
+ try {
+ if (data != null && data.value() != null &&
data.value().readableBytes() >= 0) {
+ byte[] result = new
byte[data.value().readableBytes()];
+ data.value().readBytes(result);
+ return new StateValue(result, data.version(),
data.isNumber());
+ }
+ return null;
+ } finally {
+ if (data != null) {
+ ReferenceCountUtil.safeRelease(data);
+ }
+ }
+ }
+ );
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
index 50541c40ae9..bba3cea0d8f 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -111,6 +112,20 @@ public class PulsarMetadataStateStoreImpl implements
DefaultStateStore {
.orElse(null));
}
+ @Override
+ public StateValue getStateValue(String key) {
+ return getStateValueAsync(key).join();
+ }
+
+ @Override
+ public CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return store.get(getPath(key))
+ .thenApply(optRes ->
+ optRes.map(x ->
+ new StateValue(x.getValue(),
x.getStat().getVersion(), null))
+ .orElse(null));
+ }
+
@Override
public void incrCounter(String key, long amount) {
incrCounterAsync(key, amount).join();
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
index 1d35f3dfe5b..7696c71d5d1 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
@@ -35,7 +35,9 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.bookkeeper.api.kv.result.DeleteResult;
+import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -114,6 +116,24 @@ public class BKStateStoreImplTest {
);
}
+ @Test
+ public void testGetStateValue() throws Exception {
+ KeyValue returnedKeyValue = mock(KeyValue.class);
+ ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
+ when(returnedKeyValue.value()).thenReturn(returnedValue);
+ when(returnedKeyValue.version()).thenReturn(1l);
+ when(returnedKeyValue.isNumber()).thenReturn(false);
+ when(mockTable.getKv(any(ByteBuf.class)))
+ .thenReturn(FutureUtils.value(returnedKeyValue));
+ StateValue result = stateContext.getStateValue("test-key");
+ assertEquals("test-value", new String(result.getValue(), UTF_8));
+ assertEquals(1l, result.getVersion().longValue());
+ assertEquals(false, result.getIsNumber().booleanValue());
+ verify(mockTable, times(1)).getKv(
+ eq(Unpooled.copiedBuffer("test-key", UTF_8))
+ );
+ }
+
@Test
public void testGetAmount() throws Exception {
when(mockTable.getNumber(any(ByteBuf.class)))
@@ -132,6 +152,12 @@ public class BKStateStoreImplTest {
assertTrue(result != null);
assertEquals(result.get(), null);
+ when(mockTable.getKv(any(ByteBuf.class)))
+ .thenReturn(FutureUtils.value(null));
+ CompletableFuture<StateValue> stateValueResult =
stateContext.getStateValueAsync("test-key");
+ assertTrue(stateValueResult != null);
+ assertEquals(stateValueResult.get(), null);
+
}
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
index 3b8cb02c3bb..4d1a1f73fe6 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -101,6 +102,10 @@ public class PulsarMetadataStateStoreImplTest {
CompletableFuture<ByteBuffer> result =
stateContext.getAsync("test-key");
assertTrue(result != null);
assertEquals(result.get(), null);
+
+ CompletableFuture<StateValue> stateValueResult =
stateContext.getStateValueAsync("test-key");
+ assertTrue(stateValueResult != null);
+ assertEquals(stateValueResult.get(), null);
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 613158aef44..db31847f91c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -74,6 +74,7 @@ import
org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.proto.Function;
@@ -1151,23 +1152,29 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
try {
DefaultStateStore store =
worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
- ByteBuffer buf = store.get(key);
- if (buf == null) {
+ StateValue value = store.getStateValue(key);
+ if (value == null) {
+ throw new RestException(Status.NOT_FOUND, "key '" + key + "'
doesn't exist.");
+ }
+ byte[] data = value.getValue();
+ if (data == null) {
throw new RestException(Status.NOT_FOUND, "key '" + key + "'
doesn't exist.");
}
- // try to parse the state as a long
- // but even if it can be parsed as a long, this number may not be
the actual state,
- // so we will always return a `stringValue` or `bytesValue` with
the number value
+ ByteBuffer buf = ByteBuffer.wrap(data);
+
Long number = null;
if (buf.remaining() == Long.BYTES) {
number = buf.getLong();
}
+ if (Boolean.TRUE.equals(value.getIsNumber())) {
+ return new FunctionState(key, null, null, number,
value.getVersion());
+ }
- if (Utf8.isWellFormed(buf.array())) {
- return new FunctionState(key, new String(buf.array(), UTF_8),
null, number, null);
+ if (Utf8.isWellFormed(data)) {
+ return new FunctionState(key, new String(data, UTF_8), null,
number, value.getVersion());
} else {
- return new FunctionState(key, null, buf.array(), number, null);
+ return new FunctionState(key, null, data, number,
value.getVersion());
}
} catch (RestException e) {
throw e;
@@ -1215,7 +1222,7 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
try {
DefaultStateStore store =
worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
ByteBuffer data;
- if (StringUtils.isNotEmpty(state.getStringValue())) {
+ if (state.getStringValue() != null) {
data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8));
} else if (state.getByteValue() != null) {
data = ByteBuffer.wrap(state.getByteValue());
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 5e80c3ebd54..a292e0e0dd1 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -97,10 +97,10 @@ public class PulsarStateTest extends
PulsarStandaloneTestSuite {
getFunctionStatus(functionName, numMessages);
// get state
- queryState(functionName, "hello", numMessages);
- queryState(functionName, "test", numMessages);
+ queryState(functionName, "hello", numMessages, numMessages - 1);
+ queryState(functionName, "test", numMessages, numMessages - 1);
for (int i = 0; i < numMessages; i++) {
- queryState(functionName, "message-" + i, 1);
+ queryState(functionName, "message-" + i, 1, 0);
}
// test put state
@@ -468,7 +468,7 @@ public class PulsarStateTest extends
PulsarStandaloneTestSuite {
assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" :
" + numMessages));
}
- private void queryState(String functionName, String key, int amount)
+ private void queryState(String functionName, String key, int amount, long
version)
throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
@@ -480,6 +480,9 @@ public class PulsarStateTest extends
PulsarStandaloneTestSuite {
"--key", key
);
assertTrue(result.getStdout().contains("\"numberValue\": " + amount));
+ assertTrue(result.getStdout().contains("\"version\": " + version));
+ assertFalse(result.getStdout().contains("stringValue"));
+ assertFalse(result.getStdout().contains("byteValue"));
}
private void putAndQueryState(String functionName, String key, String
state, String expect)