This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 23c7624 fix behavior when getting a key from functions state that
doesn't exist (#5272)
23c7624 is described below
commit 23c76241c6fc4abeb5c1bd621efcfe124eeb0a0f
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Fri Sep 27 08:34:41 2019 -0700
fix behavior when getting a key from functions state that doesn't exist
(#5272)
---
.../apache/pulsar/functions/instance/ContextImpl.java | 2 +-
.../functions/instance/state/StateContextImpl.java | 18 +++++++++++-------
.../functions/instance/state/StateContextImplTest.java | 12 ++++++++++++
3 files changed, 24 insertions(+), 8 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index eaadb38..ecf2e83 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -322,7 +322,7 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
try {
return result(stateContext.get(key));
} catch (Exception e) {
- throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'");
+ throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
index 4e60814..dd492af 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
@@ -18,14 +18,15 @@
*/
package org.apache.pulsar.functions.instance.state;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.bookkeeper.api.kv.Table;
+
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.api.kv.Table;
+import static java.nio.charset.StandardCharsets.UTF_8;
/**
* This class accumulates the state updates from one function.
@@ -60,11 +61,14 @@ public class StateContextImpl implements StateContext {
return
table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
data -> {
try {
- ByteBuffer result =
ByteBuffer.allocate(data.readableBytes());
- data.readBytes(result);
- return result;
+ if (data != null) {
+ ByteBuffer result =
ByteBuffer.allocate(data.readableBytes());
+ data.readBytes(result);
+ return result;
+ }
+ return null;
} finally {
- data.release();
+ ReferenceCountUtil.safeRelease(data);
}
}
);
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
index 04d0482..8a72069 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.any;
@@ -36,6 +37,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
/**
* Unit test {@link StateContextImpl}.
@@ -95,4 +97,14 @@ public class StateContextImplTest {
);
}
+ @Test
+ public void testGetKeyNotPresent() throws Exception {
+ when(mockTable.get(any(ByteBuf.class)))
+ .thenReturn(FutureUtils.value(null));
+ CompletableFuture<ByteBuffer> result = stateContext.get("test-key");
+ assertTrue(result != null);
+ assertEquals(result.get(), null);
+
+ }
+
}