This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 23c7624  fix behavior when getting a key from functions state that 
doesn't exist (#5272)
23c7624 is described below

commit 23c76241c6fc4abeb5c1bd621efcfe124eeb0a0f
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Fri Sep 27 08:34:41 2019 -0700

    fix behavior when getting a key from functions state that doesn't exist 
(#5272)
---
 .../apache/pulsar/functions/instance/ContextImpl.java  |  2 +-
 .../functions/instance/state/StateContextImpl.java     | 18 +++++++++++-------
 .../functions/instance/state/StateContextImplTest.java | 12 ++++++++++++
 3 files changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index eaadb38..ecf2e83 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -322,7 +322,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         try {
             return result(stateContext.get(key));
         } catch (Exception e) {
-            throw new RuntimeException("Failed to retrieve the state value for 
key '" + key + "'");
+            throw new RuntimeException("Failed to retrieve the state value for 
key '" + key + "'", e);
         }
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
index 4e60814..dd492af 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
@@ -18,14 +18,15 @@
  */
 package org.apache.pulsar.functions.instance.state;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.bookkeeper.api.kv.Table;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
-import org.apache.bookkeeper.api.kv.Table;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 /**
  * This class accumulates the state updates from one function.
@@ -60,11 +61,14 @@ public class StateContextImpl implements StateContext {
         return 
table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
                 data -> {
                     try {
-                        ByteBuffer result = 
ByteBuffer.allocate(data.readableBytes());
-                        data.readBytes(result);
-                        return result;
+                        if (data != null) {
+                            ByteBuffer result = 
ByteBuffer.allocate(data.readableBytes());
+                            data.readBytes(result);
+                            return result;
+                        }
+                        return null;
                     } finally {
-                        data.release();
+                        ReferenceCountUtil.safeRelease(data);
                     }
                 }
         );
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
index 04d0482..8a72069 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Mockito.any;
@@ -36,6 +37,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
 
 /**
  * Unit test {@link StateContextImpl}.
@@ -95,4 +97,14 @@ public class StateContextImplTest {
         );
     }
 
+    @Test
+    public void testGetKeyNotPresent() throws Exception {
+        when(mockTable.get(any(ByteBuf.class)))
+                .thenReturn(FutureUtils.value(null));
+        CompletableFuture<ByteBuffer> result = stateContext.get("test-key");
+        assertTrue(result != null);
+        assertEquals(result.get(), null);
+
+    }
+
 }

Reply via email to