This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b06ea5e  Rewind ByteBuffers Read/Written to Function's state store 
(#7929)
b06ea5e is described below

commit b06ea5e824557d0d5cfc5aa47e7195c03305617b
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Aug 29 18:39:19 2020 -0700

    Rewind ByteBuffers Read/Written to Function's state store (#7929)
    
    Co-authored-by: Jerry Peng <[email protected]>
---
 .../pulsar/functions/instance/state/StateContextImpl.java  | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
index 0697d74..1c2ca5d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
@@ -18,19 +18,17 @@
  */
 package org.apache.pulsar.functions.instance.state;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.ReferenceCountUtil;
 import org.apache.bookkeeper.api.kv.Table;
-import org.apache.bookkeeper.api.kv.options.DeleteOption;
-import org.apache.bookkeeper.api.kv.options.Option;
 import org.apache.bookkeeper.api.kv.options.Options;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 /**
  * This class accumulates the state updates from one function.
  *
@@ -55,6 +53,10 @@ public class StateContextImpl implements StateContext {
     @Override
     public CompletableFuture<Void> put(String key, ByteBuffer value) {
         if(value != null) {
+            // Set position to off the buffer to the beginning.
+            // If a user used an operation like 
ByteBuffer.allocate(4).putInt(count) to create a ByteBuffer to store to the 
state store
+            // the position of the buffer will be at the end and nothing will 
be written to table service
+            value.position(0);
             return table.put(
                     Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
                     Unpooled.wrappedBuffer(value));
@@ -81,6 +83,10 @@ public class StateContextImpl implements StateContext {
                         if (data != null) {
                             ByteBuffer result = 
ByteBuffer.allocate(data.readableBytes());
                             data.readBytes(result);
+                            // Set position to off the buffer to the 
beginning, since the position after the read is going to be end of the buffer
+                            // If we do not rewind to the begining here, users 
will have to explicitly do this in their function code
+                            // in order to use any of the ByteBuffer operations
+                            result.position(0);
                             return result;
                         }
                         return null;

Reply via email to