lhotari commented on code in PR #21966:
URL: https://github.com/apache/pulsar/pull/21966#discussion_r1466187778
##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java:
##########
@@ -18,21 +18,19 @@
*/
package org.apache.pulsar.functions.api.state;
-import java.nio.ByteBuffer;
-
public class StateValue {
- private final ByteBuffer value;
+ private final byte[] value;
private final Long version;
private final Boolean isNumber;
- public StateValue(ByteBuffer value, Long version, Boolean isNumber) {
- this.value = value == null ? null : ByteBuffer.wrap(value.array());
+ public StateValue(byte[] value, Long version, Boolean isNumber) {
+ this.value = value == null ? null : value.clone();
Review Comment:
is there a need to make a copy of the input array?
##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java:
##########
@@ -190,4 +191,39 @@ public ByteBuffer get(String key) {
throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
}
}
+
+ @Override
+ public StateValue getStateValue(String key) {
+ try {
+ return result(getStateValueAsync(key));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'", e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return
table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+ data -> {
+ try {
+ if (data != null && data.value() != null) {
+ ByteBuffer result =
ByteBuffer.allocate(data.value().readableBytes());
+ data.value().readBytes(result);
+ // Set position to off the buffer to the
beginning, since the position after the
+ // read is going to be end of the buffer
+ // If we do not rewind to the beginning here,
users will have to explicitly do
+ // this in their function code
+ // in order to use any of the ByteBuffer operations
+ result.position(0);
+ return new StateValue(result, data.version(),
data.isNumber());
+ }
+ return null;
+ } finally {
+ if (data != null) {
+ ReferenceCountUtil.safeRelease(data);
Review Comment:
Oh yes, it's a Netty ByteBuf
##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java:
##########
@@ -18,21 +18,19 @@
*/
package org.apache.pulsar.functions.api.state;
-import java.nio.ByteBuffer;
-
public class StateValue {
- private final ByteBuffer value;
+ private final byte[] value;
private final Long version;
private final Boolean isNumber;
- public StateValue(ByteBuffer value, Long version, Boolean isNumber) {
- this.value = value == null ? null : ByteBuffer.wrap(value.array());
+ public StateValue(byte[] value, Long version, Boolean isNumber) {
+ this.value = value == null ? null : value.clone();
this.version = version;
this.isNumber = isNumber;
}
- public ByteBuffer getValue() {
- return value == null ? null : ByteBuffer.wrap(value.array());
+ public byte[] getValue() {
+ return this.value == null ? null : this.value.clone();
Review Comment:
is it necessary to clone the returned value? I understand that this could
make sense in some cases, but is there a real need to have defensive copies?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]