This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aaf1996 QueryState now spits out FunctionState (#3076)
aaf1996 is described below
commit aaf199663ab7debf674d8de71c76bc126b627e88
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Wed Nov 28 13:06:52 2018 -0800
QueryState now spits out FunctionState (#3076)
### Motivation
Instead of spitting some unformatted stuff, made querystate print out a
proper json object.
---
.../org/apache/pulsar/client/admin/Functions.java | 4 +--
.../client/admin/internal/FunctionsImpl.java | 13 ++++----
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 6 ++--
.../pulsar/common/functions/FunctionState.java | 35 ++++++++++++++++++++++
.../functions/worker/rest/api/FunctionsImpl.java | 10 +++----
5 files changed, 51 insertions(+), 17 deletions(-)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index f909557..ca93c02 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -24,6 +24,7 @@ import java.util.Set;
import
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -398,6 +399,5 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
- String getFunctionState(String tenant, String namespace, String function,
String key) throws PulsarAdminException;
-
+ FunctionState getFunctionState(String tenant, String namespace, String
function, String key) throws PulsarAdminException;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 753955e..a068a05 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.client.admin.internal;
import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import com.google.protobuf.AbstractMessage.Builder;
-import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
@@ -366,7 +367,7 @@ public class FunctionsImpl extends BaseResource implements
Functions {
}
}
- public String getFunctionState(String tenant, String namespace, String
function, String key)
+ public FunctionState getFunctionState(String tenant, String namespace,
String function, String key)
throws PulsarAdminException {
try {
Response response = request(functions.path(tenant)
@@ -374,7 +375,8 @@ public class FunctionsImpl extends BaseResource implements
Functions {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
- return response.readEntity(String.class);
+ String value = response.readEntity(String.class);
+ return new Gson().fromJson(value, new TypeToken<FunctionState>()
{}.getType());
} catch (Exception e) {
throw getApiException(e);
}
@@ -384,9 +386,4 @@ public class FunctionsImpl extends BaseResource implements
Functions {
public static void mergeJson(String json, Builder builder) throws
IOException {
JsonFormat.parser().merge(json, builder);
}
-
- public static String printJson(MessageOrBuilder msg) throws IOException {
- return JsonFormat.printer().print(msg);
- }
-
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index cfe709d..fea0ffd 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.functions.WindowConfig;
+import org.apache.pulsar.common.functions.FunctionState;
@Slf4j
@Parameters(commandDescription = "Interface for managing Pulsar Functions
(lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -756,8 +757,9 @@ public class CmdFunctions extends CmdBase {
@Override
void runCmd() throws Exception {
do {
- String valueAndVersion =
admin.functions().getFunctionState(tenant, namespace, functionName, key);
- System.out.println(valueAndVersion);
+ FunctionState functionState =
admin.functions().getFunctionState(tenant, namespace, functionName, key);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ System.out.println(gson.toJson(functionState));
if (watch) {
Thread.sleep(1000);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
new file mode 100644
index 0000000..5062247
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.functions;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.*;
+
+@Getter
+@AllArgsConstructor
+@ToString
+@JsonInclude(JsonInclude.Include.USE_DEFAULTS)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FunctionState {
+ private String key;
+ private String stringValue;
+ private Long numberValue;
+ private Long version;
+}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 7f3ed44..6d6394f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
@@ -1094,15 +1095,14 @@ public class FunctionsImpl {
.entity(new String("key '" + key + "' doesn't exist."))
.build();
} else {
- String value;
+ FunctionState value;
if (kv.isNumber()) {
- value = "value : " + kv.numberValue() + ", version : "
+ kv.version();
+ value = new FunctionState(key, null, kv.numberValue(),
kv.version());
} else {
- value = "value : " + new
String(ByteBufUtil.getBytes(kv.value()), UTF_8)
- + ", version : " + kv.version();
+ value = new FunctionState(key, new
String(ByteBufUtil.getBytes(kv.value()), UTF_8), null, kv.version());
}
return Response.status(Status.OK)
- .entity(new String(value))
+ .entity(new Gson().toJson(value))
.build();
}
}