This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 80257637213 [fix][fn] Add missing `version` field back to `querystate` 
API (#21966)
80257637213 is described below

commit 802576372132617b5076a44004846f2dbabede08
Author: jiangpengcheng <[email protected]>
AuthorDate: Sat Jan 27 19:33:31 2024 +0800

    [fix][fn] Add missing `version` field back to `querystate` API (#21966)
---
 .../functions/api/state/ByteBufferStateStore.java  | 27 +++++++++++++++++++
 .../pulsar/functions/api/state/StateValue.java     | 30 ++++++++++++++++++++++
 .../src/main/resources/findbugsExclude.xml         |  9 +++++++
 .../functions/instance/state/BKStateStoreImpl.java | 30 ++++++++++++++++++++++
 .../state/PulsarMetadataStateStoreImpl.java        | 15 +++++++++++
 .../instance/state/BKStateStoreImplTest.java       | 26 +++++++++++++++++++
 .../state/PulsarMetadataStateStoreImplTest.java    |  5 ++++
 .../functions/worker/rest/api/ComponentImpl.java   | 25 +++++++++++-------
 .../integration/functions/PulsarStateTest.java     | 11 +++++---
 9 files changed, 165 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
index 8dbd7b322a5..d938fe0c82b 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
@@ -73,4 +73,31 @@ public interface ByteBufferStateStore extends StateStore {
      */
     CompletableFuture<ByteBuffer> getAsync(String key);
 
+    /**
+     * Retrieve the StateValue for the key.
+     *
+     * @param key name of the key
+     * @return the StateValue.
+     */
+    default StateValue getStateValue(String key) {
+        return getStateValueAsync(key).join();
+    }
+
+    /**
+     * Retrieve the StateValue for the key, but don't wait for the operation 
to be completed.
+     *
+     * @param key name of the key
+     * @return the StateValue.
+     */
+    default CompletableFuture<StateValue> getStateValueAsync(String key) {
+        return getAsync(key).thenApply(val -> {
+            if (val != null && val.remaining() >= 0) {
+                byte[] data = new byte[val.remaining()];
+                val.get(data);
+                return new StateValue(data, null, null);
+            } else {
+                return null;
+            }
+        });
+    }
 }
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
new file mode 100644
index 00000000000..ce06b54a6e4
--- /dev/null
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public class StateValue {
+    private final byte[] value;
+    private final Long version;
+    private final Boolean isNumber;
+}
\ No newline at end of file
diff --git a/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml 
b/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
index 9638cfcca8d..d593536d467 100644
--- a/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
+++ b/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
@@ -29,6 +29,11 @@
     <Method name="getSchema"/>
     <Bug pattern="EI_EXPOSE_REP"/>
   </Match>
+  <Match>
+    <Class name="org.apache.pulsar.functions.api.state.StateValue"/>
+    <Method name="getValue"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
   <Match>
     <Class 
name="org.apache.pulsar.functions.api.utils.FunctionRecord$FunctionRecordBuilder"/>
     <Method name="properties"/>
@@ -39,4 +44,8 @@
     <Method name="schema"/>
     <Bug pattern="EI_EXPOSE_REP2"/>
   </Match>
+  <Match>
+    <Class name="org.apache.pulsar.functions.api.state.StateValue"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
 </FindBugsFilter>
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
index bf43f18b175..d85e4afd762 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.options.Options;
 import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
 /**
@@ -190,4 +191,33 @@ public class BKStateStoreImpl implements DefaultStateStore 
{
             throw new RuntimeException("Failed to retrieve the state value for 
key '" + key + "'", e);
         }
     }
+
+    @Override
+    public StateValue getStateValue(String key) {
+        try {
+            return result(getStateValueAsync(key));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to retrieve the state value for 
key '" + key + "'", e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<StateValue> getStateValueAsync(String key) {
+        return 
table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+                data -> {
+                    try {
+                        if (data != null && data.value() != null && 
data.value().readableBytes() >= 0) {
+                            byte[] result = new 
byte[data.value().readableBytes()];
+                            data.value().readBytes(result);
+                            return new StateValue(result, data.version(), 
data.isNumber());
+                        }
+                        return null;
+                    } finally {
+                        if (data != null) {
+                            ReferenceCountUtil.safeRelease(data);
+                        }
+                    }
+                }
+        );
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
index 50541c40ae9..bba3cea0d8f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 
@@ -111,6 +112,20 @@ public class PulsarMetadataStateStoreImpl implements 
DefaultStateStore {
                                 .orElse(null));
     }
 
+    @Override
+    public StateValue getStateValue(String key) {
+        return getStateValueAsync(key).join();
+    }
+
+    @Override
+    public CompletableFuture<StateValue> getStateValueAsync(String key) {
+        return store.get(getPath(key))
+                .thenApply(optRes ->
+                        optRes.map(x ->
+                            new StateValue(x.getValue(), 
x.getStat().getVersion(), null))
+                                .orElse(null));
+    }
+
     @Override
     public void incrCounter(String key, long amount) {
         incrCounterAsync(key, amount).join();
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
index 1d35f3dfe5b..7696c71d5d1 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
@@ -35,7 +35,9 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.options.Options;
 import org.apache.bookkeeper.api.kv.result.DeleteResult;
+import org.apache.bookkeeper.api.kv.result.KeyValue;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -114,6 +116,24 @@ public class BKStateStoreImplTest {
         );
     }
 
+    @Test
+    public void testGetStateValue() throws Exception {
+        KeyValue returnedKeyValue = mock(KeyValue.class);
+        ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
+        when(returnedKeyValue.value()).thenReturn(returnedValue);
+        when(returnedKeyValue.version()).thenReturn(1l);
+        when(returnedKeyValue.isNumber()).thenReturn(false);
+        when(mockTable.getKv(any(ByteBuf.class)))
+            .thenReturn(FutureUtils.value(returnedKeyValue));
+        StateValue result = stateContext.getStateValue("test-key");
+        assertEquals("test-value", new String(result.getValue(), UTF_8));
+        assertEquals(1l, result.getVersion().longValue());
+        assertEquals(false, result.getIsNumber().booleanValue());
+        verify(mockTable, times(1)).getKv(
+            eq(Unpooled.copiedBuffer("test-key", UTF_8))
+        );
+    }
+
     @Test
     public void testGetAmount() throws Exception {
         when(mockTable.getNumber(any(ByteBuf.class)))
@@ -132,6 +152,12 @@ public class BKStateStoreImplTest {
         assertTrue(result != null);
         assertEquals(result.get(), null);
 
+        when(mockTable.getKv(any(ByteBuf.class)))
+                .thenReturn(FutureUtils.value(null));
+        CompletableFuture<StateValue> stateValueResult = 
stateContext.getStateValueAsync("test-key");
+        assertTrue(stateValueResult != null);
+        assertEquals(stateValueResult.get(), null);
+
     }
 
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
index 3b8cb02c3bb..4d1a1f73fe6 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -101,6 +102,10 @@ public class PulsarMetadataStateStoreImplTest {
         CompletableFuture<ByteBuffer> result = 
stateContext.getAsync("test-key");
         assertTrue(result != null);
         assertEquals(result.get(), null);
+
+        CompletableFuture<StateValue> stateValueResult = 
stateContext.getStateValueAsync("test-key");
+        assertTrue(stateValueResult != null);
+        assertEquals(stateValueResult.get(), null);
     }
 
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 613158aef44..db31847f91c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -74,6 +74,7 @@ import 
org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
 import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.instance.state.DefaultStateStore;
 import org.apache.pulsar.functions.proto.Function;
@@ -1151,23 +1152,29 @@ public abstract class ComponentImpl implements 
Component<PulsarWorkerService> {
 
         try {
             DefaultStateStore store = 
worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
-            ByteBuffer buf = store.get(key);
-            if (buf == null) {
+            StateValue value = store.getStateValue(key);
+            if (value == null) {
+                throw new RestException(Status.NOT_FOUND, "key '" + key + "' 
doesn't exist.");
+            }
+            byte[] data = value.getValue();
+            if (data == null) {
                 throw new RestException(Status.NOT_FOUND, "key '" + key + "' 
doesn't exist.");
             }
 
-            // try to parse the state as a long
-            // but even if it can be parsed as a long, this number may not be 
the actual state,
-            // so we will always return a `stringValue` or `bytesValue` with 
the number value
+            ByteBuffer buf = ByteBuffer.wrap(data);
+
             Long number = null;
             if (buf.remaining() == Long.BYTES) {
                 number = buf.getLong();
             }
+            if (Boolean.TRUE.equals(value.getIsNumber())) {
+                return new FunctionState(key, null, null, number, 
value.getVersion());
+            }
 
-            if (Utf8.isWellFormed(buf.array())) {
-                return new FunctionState(key, new String(buf.array(), UTF_8), 
null, number, null);
+            if (Utf8.isWellFormed(data)) {
+                return new FunctionState(key, new String(data, UTF_8), null, 
number, value.getVersion());
             } else {
-                return new FunctionState(key, null, buf.array(), number, null);
+                return new FunctionState(key, null, data, number, 
value.getVersion());
             }
         } catch (RestException e) {
             throw e;
@@ -1215,7 +1222,7 @@ public abstract class ComponentImpl implements 
Component<PulsarWorkerService> {
         try {
             DefaultStateStore store = 
worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
             ByteBuffer data;
-            if (StringUtils.isNotEmpty(state.getStringValue())) {
+            if (state.getStringValue() != null) {
                 data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8));
             } else if (state.getByteValue() != null) {
                 data = ByteBuffer.wrap(state.getByteValue());
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 5e80c3ebd54..a292e0e0dd1 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -97,10 +97,10 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
         getFunctionStatus(functionName, numMessages);
 
         // get state
-        queryState(functionName, "hello", numMessages);
-        queryState(functionName, "test", numMessages);
+        queryState(functionName, "hello", numMessages, numMessages - 1);
+        queryState(functionName, "test", numMessages, numMessages - 1);
         for (int i = 0; i < numMessages; i++) {
-            queryState(functionName, "message-" + i, 1);
+            queryState(functionName, "message-" + i, 1, 0);
         }
 
         // test put state
@@ -468,7 +468,7 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
         assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : 
" + numMessages));
     }
 
-    private void queryState(String functionName, String key, int amount)
+    private void queryState(String functionName, String key, int amount, long 
version)
         throws Exception {
         ContainerExecResult result = container.execCmd(
             PulsarCluster.ADMIN_SCRIPT,
@@ -480,6 +480,9 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
             "--key", key
         );
         assertTrue(result.getStdout().contains("\"numberValue\": " + amount));
+        assertTrue(result.getStdout().contains("\"version\": " + version));
+        assertFalse(result.getStdout().contains("stringValue"));
+        assertFalse(result.getStdout().contains("byteValue"));
     }
 
     private void putAndQueryState(String functionName, String key, String 
state, String expect)

Reply via email to