This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b06ea5e Rewind ByteBuffers Read/Written to Function's state store
(#7929)
b06ea5e is described below
commit b06ea5e824557d0d5cfc5aa47e7195c03305617b
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Aug 29 18:39:19 2020 -0700
Rewind ByteBuffers Read/Written to Function's state store (#7929)
Co-authored-by: Jerry Peng <[email protected]>
---
.../pulsar/functions/instance/state/StateContextImpl.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
index 0697d74..1c2ca5d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
@@ -18,19 +18,17 @@
*/
package org.apache.pulsar.functions.instance.state;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.api.kv.Table;
-import org.apache.bookkeeper.api.kv.options.DeleteOption;
-import org.apache.bookkeeper.api.kv.options.Option;
import org.apache.bookkeeper.api.kv.options.Options;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
/**
* This class accumulates the state updates from one function.
*
@@ -55,6 +53,10 @@ public class StateContextImpl implements StateContext {
@Override
public CompletableFuture<Void> put(String key, ByteBuffer value) {
if(value != null) {
+ // Set position to off the buffer to the beginning.
+ // If a user used an operation like
ByteBuffer.allocate(4).putInt(count) to create a ByteBuffer to store to the
state store
+ // the position of the buffer will be at the end and nothing will
be written to table service
+ value.position(0);
return table.put(
Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
Unpooled.wrappedBuffer(value));
@@ -81,6 +83,10 @@ public class StateContextImpl implements StateContext {
if (data != null) {
ByteBuffer result =
ByteBuffer.allocate(data.readableBytes());
data.readBytes(result);
+ // Set position to off the buffer to the
beginning, since the position after the read is going to be end of the buffer
+ // If we do not rewind to the begining here, users
will have to explicitly do this in their function code
+ // in order to use any of the ByteBuffer operations
+ result.position(0);
return result;
}
return null;