This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 65592ad cache StorageClient (#3078)
65592ad is described below
commit 65592ad2ae9b3f460ef7af95c78f02dbd0f1c5e1
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Nov 28 22:45:59 2018 +0800
cache StorageClient (#3078)
---
.../functions/worker/rest/api/FunctionsImpl.java | 62 ++++++++++++----------
1 file changed, 34 insertions(+), 28 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 474032e..7f3ed44 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
@@ -121,6 +122,8 @@ public class FunctionsImpl {
public static final String SOURCE = "Source";
public static final String SINK = "Sink";
+ private final AtomicReference<StorageClient> storageClient = new
AtomicReference<>();
+
private final Supplier<WorkerService> workerServiceSupplier;
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
@@ -1074,39 +1077,42 @@ public class FunctionsImpl {
String stateStorageServiceUrl =
worker().getWorkerConfig().getStateStorageServiceUrl();
- try (StorageClient client = StorageClientBuilder.newBuilder()
- .withSettings(StorageClientSettings.newBuilder()
- .serviceUri(stateStorageServiceUrl)
- .clientName("functions-admin")
- .build())
- .withNamespace(tableNs)
- .build()) {
- try (Table<ByteBuf, ByteBuf> table =
result(client.openTable(tableName))) {
- try (KeyValue<ByteBuf, ByteBuf> kv =
result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
- if (null == kv) {
- return Response.status(Status.NOT_FOUND)
- .entity(new String("key '" + key + "' doesn't
exist."))
- .build();
+ if (storageClient.get() == null) {
+ storageClient.compareAndSet(null, StorageClientBuilder.newBuilder()
+ .withSettings(StorageClientSettings.newBuilder()
+ .serviceUri(stateStorageServiceUrl)
+ .clientName("functions-admin")
+ .build())
+ .withNamespace(tableNs)
+ .build());
+ }
+
+ try (Table<ByteBuf, ByteBuf> table =
result(storageClient.get().openTable(tableName))) {
+ try (KeyValue<ByteBuf, ByteBuf> kv =
result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
+ if (null == kv) {
+ return Response.status(Status.NOT_FOUND)
+ .entity(new String("key '" + key + "' doesn't exist."))
+ .build();
+ } else {
+ String value;
+ if (kv.isNumber()) {
+ value = "value : " + kv.numberValue() + ", version : "
+ kv.version();
} else {
- String value;
- if (kv.isNumber()) {
- value = "value : " + kv.numberValue() + ", version
: " + kv.version();
- } else {
- value = "value : " + new
String(ByteBufUtil.getBytes(kv.value()), UTF_8)
- + ", version : " + kv.version();
- }
- return Response.status(Status.OK)
- .entity(new String(value))
- .build();
+ value = "value : " + new
String(ByteBufUtil.getBytes(kv.value()), UTF_8)
+ + ", version : " + kv.version();
}
+ return Response.status(Status.OK)
+ .entity(new String(value))
+ .build();
}
- } catch (Exception e) {
- log.error("Error while getFunctionState request @
/{}/{}/{}/{}",
- tenant, namespace, functionName, key, e);
- return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(e.getMessage())).build();
}
+ } catch (Exception e) {
+ log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
+ tenant, namespace, functionName, key, e);
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
}
+
}
public Response uploadFunction(final InputStream uploadedInputStream,
final String path) {