jiangpengcheng commented on code in PR #21966:
URL: https://github.com/apache/pulsar/pull/21966#discussion_r1466181704
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java:
##########
@@ -190,4 +191,39 @@ public ByteBuffer get(String key) {
throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
}
}
+
+ @Override
+ public StateValue getStateValue(String key) {
+ try {
+ return result(getStateValueAsync(key));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return
table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+ data -> {
+ try {
+ if (data != null && data.value() != null) {
+ ByteBuffer result =
ByteBuffer.allocate(data.value().readableBytes());
+ data.value().readBytes(result);
+ // Set position to off the buffer to the
beginning, since the position after the
+ // read is going to be end of the buffer
+ // If we do not rewind to the beginning here,
users will have to explicitly do
+ // this in their function code
+ // in order to use any of the ByteBuffer operations
+ result.position(0);
Review Comment:
got it now, I just used the same code with line:
https://github.com/apache/pulsar/blob/b56efff3f125b107d39eb7a88c040badc721f692/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java#L179
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java:
##########
@@ -190,4 +191,39 @@ public ByteBuffer get(String key) {
throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
}
}
+
+ @Override
+ public StateValue getStateValue(String key) {
+ try {
+ return result(getStateValueAsync(key));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return
table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+ data -> {
+ try {
+ if (data != null && data.value() != null) {
+ ByteBuffer result =
ByteBuffer.allocate(data.value().readableBytes());
+ data.value().readBytes(result);
+ // Set position to off the buffer to the
beginning, since the position after the
+ // read is going to be end of the buffer
+ // If we do not rewind to the beginning here,
users will have to explicitly do
+ // this in their function code
+ // in order to use any of the ByteBuffer operations
+ result.position(0);
+ return new StateValue(result, data.version(),
data.isNumber());
+ }
+ return null;
+ } finally {
+ if (data != null) {
+ ReferenceCountUtil.safeRelease(data);
Review Comment:
got it now, I just used the same code with line:
https://github.com/apache/pulsar/blob/b56efff3f125b107d39eb7a88c040badc721f692/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java#L179
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]