This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 13f870c Add Async State manipulation methods (#3978)
13f870c is described below
commit 13f870c3bafedb7f73449a2a5b25f07b4f6667e9
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Thu Apr 4 11:26:16 2019 -0700
Add Async State manipulation methods (#3978)
* Add Async State manipulation methods
* Fix build
* Fixed unittest
---
.../org/apache/pulsar/functions/api/Context.java | 34 ++++++++++++++++++
.../pulsar/functions/instance/ContextImpl.java | 33 +++++++++++++++---
.../functions/instance/state/StateContext.java | 9 ++---
.../functions/instance/state/StateContextImpl.java | 40 ++++++++++++----------
.../pulsar/functions/instance/ContextImplTest.java | 14 ++++----
.../instance/state/StateContextImplTest.java | 8 ++---
6 files changed, 101 insertions(+), 37 deletions(-)
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 63cbc9e..17f989e 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -125,6 +125,15 @@ public interface Context {
void incrCounter(String key, long amount);
/**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+
+ /**
* Retrieve the counter value for the key.
*
* @param key name of the key
@@ -133,6 +142,15 @@ public interface Context {
long getCounter(String key);
/**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+
+ /**
* Update the state value for the key.
*
* @param key name of the key
@@ -141,6 +159,14 @@ public interface Context {
void putState(String key, ByteBuffer value);
/**
+ * Update the state value for the key, but don't wait for the operation to
be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+
+ /**
* Retrieve the state value for the key.
*
* @param key name of the key
@@ -149,6 +175,14 @@ public interface Context {
ByteBuffer getState(String key);
/**
+ * Retrieve the state value for the key, but don't wait for the operation
to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+
+ /**
* Get a map of all user-defined key/value configs for the function.
*
* @return The full map of user-defined config values
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index bb70b41..dc99f60 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -61,6 +61,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
import static
org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
/**
* This class implements the Context interface exposed to the user.
@@ -263,40 +264,64 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
}
@Override
+ public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+ ensureStateEnabled();
+ return stateContext.incrCounter(key, amount);
+ }
+
+ @Override
public void incrCounter(String key, long amount) {
ensureStateEnabled();
try {
- stateContext.incr(key, amount);
+ result(stateContext.incrCounter(key, amount));
} catch (Exception e) {
throw new RuntimeException("Failed to increment key '" + key + "'
by amount '" + amount + "'", e);
}
}
@Override
+ public CompletableFuture<Long> getCounterAsync(String key) {
+ ensureStateEnabled();
+ return stateContext.getCounter(key);
+ }
+
+ @Override
public long getCounter(String key) {
ensureStateEnabled();
try {
- return stateContext.getAmount(key);
+ return result(stateContext.getCounter(key));
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve counter from key '"
+ key + "'");
}
}
@Override
+ public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value)
{
+ ensureStateEnabled();
+ return stateContext.put(key, value);
+ }
+
+ @Override
public void putState(String key, ByteBuffer value) {
ensureStateEnabled();
try {
- stateContext.put(key, value);
+ result(stateContext.put(key, value));
} catch (Exception e) {
throw new RuntimeException("Failed to update the state value for
key '" + key + "'");
}
}
@Override
+ public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+ ensureStateEnabled();
+ return stateContext.get(key);
+ }
+
+ @Override
public ByteBuffer getState(String key) {
ensureStateEnabled();
try {
- return stateContext.getValue(key);
+ return result(stateContext.get(key));
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve the state value for
key '" + key + "'");
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
index c17e8b6..90e85f4 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.instance.state;
import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
/**
* A state context per function.
@@ -31,7 +32,7 @@ public interface StateContext {
* @param key key to increment
* @param amount the amount incremented
*/
- void incr(String key, long amount) throws Exception;
+ CompletableFuture<Void> incrCounter(String key, long amount) throws
Exception;
/**
* Update the given <i>key</i> to the provide <i>value</i>.
@@ -49,7 +50,7 @@ public interface StateContext {
* @param key key to update.
* @param value value to update
*/
- void put(String key, ByteBuffer value) throws Exception;
+ CompletableFuture<Void> put(String key, ByteBuffer value) throws Exception;
/**
* Get the value of a given <i>key</i>.
@@ -57,7 +58,7 @@ public interface StateContext {
* @param key key to retrieve
* @return a completable future representing the retrieve result.
*/
- ByteBuffer getValue(String key) throws Exception;
+ CompletableFuture<ByteBuffer> get(String key) throws Exception;
/**
* Get the amount of a given <i>key</i>.
@@ -65,6 +66,6 @@ public interface StateContext {
* @param key key to retrieve
* @return a completable future representing the retrieve result.
*/
- long getAmount(String key) throws Exception;
+ CompletableFuture<Long> getCounter(String key) throws Exception;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
index 1a2c26d..4e60814 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
@@ -19,11 +19,12 @@
package org.apache.pulsar.functions.instance.state;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.bookkeeper.api.kv.Table;
/**
@@ -40,35 +41,38 @@ public class StateContextImpl implements StateContext {
}
@Override
- public void incr(String key, long amount) throws Exception {
+ public CompletableFuture<Void> incrCounter(String key, long amount) {
// TODO: this can be optimized with a batch operation.
- result(table.increment(
+ return table.increment(
Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
- amount));
+ amount);
}
@Override
- public void put(String key, ByteBuffer value) throws Exception {
- result(table.put(
+ public CompletableFuture<Void> put(String key, ByteBuffer value) {
+ return table.put(
Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
- Unpooled.wrappedBuffer(value)));
+ Unpooled.wrappedBuffer(value));
}
@Override
- public ByteBuffer getValue(String key) throws Exception {
- ByteBuf data =
result(table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))));
- try {
- ByteBuffer result = ByteBuffer.allocate(data.readableBytes());
- data.readBytes(result);
- return result;
- } finally {
- data.release();
- }
+ public CompletableFuture<ByteBuffer> get(String key) {
+ return
table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+ data -> {
+ try {
+ ByteBuffer result =
ByteBuffer.allocate(data.readableBytes());
+ data.readBytes(result);
+ return result;
+ } finally {
+ data.release();
+ }
+ }
+ );
}
@Override
- public long getAmount(String key) throws Exception {
- return
result(table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8))));
+ public CompletableFuture<Long> getCounter(String key) {
+ return table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8)));
}
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index a898e54..fd541f3 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -104,16 +104,16 @@ public class ContextImplTest {
public void testIncrCounterStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
- context.incrCounter("test-key", 10L);
- verify(stateContext, times(1)).incr(eq("test-key"), eq(10L));
+ context.incrCounterAsync("test-key", 10L);
+ verify(stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
}
@Test
public void testGetCounterStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
- context.getCounter("test-key");
- verify(stateContext, times(1)).getAmount(eq("test-key"));
+ context.getCounterAsync("test-key");
+ verify(stateContext, times(1)).getCounter(eq("test-key"));
}
@Test
@@ -121,7 +121,7 @@ public class ContextImplTest {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
- context.putState("test-key", buffer);
+ context.putStateAsync("test-key", buffer);
verify(stateContext, times(1)).put(eq("test-key"), same(buffer));
}
@@ -129,8 +129,8 @@ public class ContextImplTest {
public void testGetStateStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
- context.getState("test-key");
- verify(stateContext, times(1)).getValue(eq("test-key"));
+ context.getStateAsync("test-key");
+ verify(stateContext, times(1)).get(eq("test-key"));
}
@Test
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
index 338f64e..2805a3d 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
@@ -55,7 +55,7 @@ public class StateContextImplTest {
public void testIncr() throws Exception {
when(mockTable.increment(any(ByteBuf.class), anyLong()))
.thenReturn(FutureUtils.Void());
- stateContext.incr("test-key", 10L);
+ stateContext.incrCounter("test-key", 10L).get();
verify(mockTable, times(1)).increment(
eq(Unpooled.copiedBuffer("test-key", UTF_8)),
eq(10L)
@@ -66,7 +66,7 @@ public class StateContextImplTest {
public void testPut() throws Exception {
when(mockTable.put(any(ByteBuf.class), any(ByteBuf.class)))
.thenReturn(FutureUtils.Void());
- stateContext.put("test-key",
ByteBuffer.wrap("test-value".getBytes(UTF_8)));
+ stateContext.put("test-key",
ByteBuffer.wrap("test-value".getBytes(UTF_8))).get();
verify(mockTable, times(1)).put(
eq(Unpooled.copiedBuffer("test-key", UTF_8)),
eq(Unpooled.copiedBuffer("test-value", UTF_8))
@@ -78,7 +78,7 @@ public class StateContextImplTest {
ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
when(mockTable.get(any(ByteBuf.class)))
.thenReturn(FutureUtils.value(returnedValue));
- ByteBuffer result = stateContext.getValue("test-key");
+ ByteBuffer result = stateContext.get("test-key").get();
assertEquals("test-value", new String(result.array(), UTF_8));
verify(mockTable, times(1)).get(
eq(Unpooled.copiedBuffer("test-key", UTF_8))
@@ -89,7 +89,7 @@ public class StateContextImplTest {
public void testGetAmount() throws Exception {
when(mockTable.getNumber(any(ByteBuf.class)))
.thenReturn(FutureUtils.value(10L));
- assertEquals(10L, stateContext.getAmount("test-key"));
+ assertEquals((Long)10L, stateContext.getCounter("test-key").get());
verify(mockTable, times(1)).getNumber(
eq(Unpooled.copiedBuffer("test-key", UTF_8))
);