This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4e99ffe Add CLI commands for schema registry (#1944)
4e99ffe is described below
commit 4e99ffedbe5d0239eb8e28c300e365bde07c9a44
Author: Dave Rusek <[email protected]>
AuthorDate: Fri Jun 15 12:44:20 2018 -0600
Add CLI commands for schema registry (#1944)
* Add CLI commands for schema registry
* Rename commandline args
* Fix License Headers Issue
* - all schema structures used in rest apis should have default constructors
- change version to long in rest apis
- fix schema cli issues
* Add integration tests for Schema CLI
* Exclude schema example file from license check
* Exclude schema_example.conf from apache-rat:check
---
conf/schema_example.conf | 7 ++
pom.xml | 4 +
.../pulsar/broker/admin/v2/SchemasResource.java | 24 +++--
.../broker/service/schema/LongSchemaVersion.java | 4 +-
.../apache/pulsar/client/admin/PulsarAdmin.java | 10 ++
.../org/apache/pulsar/client/admin/Schemas.java | 65 +++++++++++++
.../pulsar/client/admin/internal/SchemasImpl.java | 102 +++++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdSchemas.java | 86 +++++++++++++++++
.../apache/pulsar/admin/cli/PulsarAdminTool.java | 10 +-
.../pulsar/common/schema/DeleteSchemaResponse.java | 6 +-
.../apache/pulsar/common/schema/EmptyVersion.java | 2 +-
.../pulsar/common/schema/GetSchemaResponse.java | 6 +-
.../apache/pulsar/common/schema/LatestVersion.java | 2 +-
.../pulsar/common/schema/PostSchemaPayload.java | 4 +
.../pulsar/common/schema/PostSchemaResponse.java | 4 +
.../apache/pulsar/common/schema/SchemaInfo.java | 4 +
.../apache/pulsar/tests/integration/TestCLI.java | 58 ++++++++++++
17 files changed, 385 insertions(+), 13 deletions(-)
diff --git a/conf/schema_example.conf b/conf/schema_example.conf
new file mode 100644
index 0000000..6681035
--- /dev/null
+++ b/conf/schema_example.conf
@@ -0,0 +1,7 @@
+{
+ "type": "STRING",
+ "schema": "",
+ "properties": {
+ "key1" : "value1"
+ }
+}
diff --git a/pom.xml b/pom.xml
index 7b3d23e..90af15d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -927,6 +927,7 @@ flexible messaging model and an intuitive client
API.</description>
<exclude>**/ByteBufCodedOutputStream.java</exclude>
<exclude>bin/proto/*</exclude>
<exclude>**/*.patch</exclude>
+ <exclude>conf/schema_example.conf</exclude>
<exclude>data/**</exclude>
<exclude>logs/**</exclude>
<exclude>**/*.versionsBackup</exclude>
@@ -1055,6 +1056,9 @@ flexible messaging model and an intuitive client
API.</description>
<!-- Python requirements files -->
<exclude>**/requirements.txt</exclude>
+
+ <!-- Configuration Templates -->
+ <exclude>conf/schema_example.conf</exclude>
</excludes>
</configuration>
</plugin>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 043233d..8bc511d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -27,7 +27,6 @@ import com.google.common.base.Charsets;
import io.swagger.annotations.ApiOperation;
import java.nio.ByteBuffer;
import java.time.Clock;
-import java.util.concurrent.CompletableFuture;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.Encoded;
@@ -42,6 +41,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
+import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.DeleteSchemaResponse;
@@ -51,10 +51,14 @@ import org.apache.pulsar.common.schema.PostSchemaResponse;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Path("/schemas")
public class SchemasResource extends AdminResource {
+ private static final Logger log =
LoggerFactory.getLogger(SchemasResource.class);
+
private final Clock clock;
public SchemasResource() {
@@ -67,6 +71,14 @@ public class SchemasResource extends AdminResource {
this.clock = clock;
}
+ private long getLongSchemaVersion(SchemaVersion schemaVersion) {
+ if (schemaVersion instanceof LongSchemaVersion) {
+ return ((LongSchemaVersion) schemaVersion).getVersion();
+ } else {
+ return -1L;
+ }
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@@ -86,13 +98,13 @@ public class SchemasResource extends AdminResource {
if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else if (schema.schema.isDeleted()) {
- response.resume(Response.noContent().build());
+
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
- .version(schema.version)
+
.version(getLongSchemaVersion(schema.version))
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData()))
@@ -132,13 +144,13 @@ public class SchemasResource extends AdminResource {
if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else if (schema.schema.isDeleted()) {
- response.resume(Response.noContent().build());
+
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
- .version(schema.version)
+
.version(getLongSchemaVersion(schema.version))
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData()))
@@ -173,7 +185,7 @@ public class SchemasResource extends AdminResource {
response.resume(
Response.ok().entity(
DeleteSchemaResponse.builder()
- .version(version)
+ .version(getLongSchemaVersion(version))
.build()
).build()
);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
index a837cd4..7fa0795 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
@@ -23,10 +23,10 @@ import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.pulsar.common.schema.SchemaVersion;
-class LongSchemaVersion implements SchemaVersion {
+public class LongSchemaVersion implements SchemaVersion {
private final long version;
- LongSchemaVersion(long version) {
+ public LongSchemaVersion(long version) {
this.version = version;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 369de79..b2db7e2 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -40,6 +40,7 @@ import
org.apache.pulsar.client.admin.internal.JacksonConfigurator;
import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
+import org.apache.pulsar.client.admin.internal.SchemasImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
@@ -81,6 +82,7 @@ public class PulsarAdmin implements Closeable {
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
+ private final Schemas schemas;
protected final WebTarget root;
protected final Authentication auth;
@@ -183,6 +185,7 @@ public class PulsarAdmin implements Closeable {
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth);
+ this.schemas = new SchemasImpl(root, auth);
}
/**
@@ -363,6 +366,13 @@ public class PulsarAdmin implements Closeable {
}
/**
+ * @return the schemas
+ */
+ public Schemas schemas() {
+ return schemas;
+ }
+
+ /**
* Close the Pulsar admin client to release all the resources
*/
@Override
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
new file mode 100644
index 0000000..dfb89d4
--- /dev/null
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Admin interface on interacting with schemas.
+ */
+public interface Schemas {
+
+ /**
+ * Retrieve the latest schema of a topic.
+ *
+ * @param topic topic name, in fully qualified format
+ * @return latest schema
+ * @throws PulsarAdminException
+ */
+ SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException;
+
+ /**
+ * Retrieve the schema of a topic at a given <tt>version</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @param version schema version
+ * @return the schema info at a given <tt>version</tt>
+ * @throws PulsarAdminException
+ */
+ SchemaInfo getSchemaInfo(String topic, long version) throws
PulsarAdminException;
+
+ /**
+ * Delete the schema associated with a given <tt>topic</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @throws PulsarAdminException
+ */
+ void deleteSchema(String topic) throws PulsarAdminException;
+
+ /**
+ * Create a schema for a given <tt>topic</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @param schemaPayload schema payload
+ * @throws PulsarAdminException
+ */
+ void createSchema(String topic, PostSchemaPayload schemaPayload) throws
PulsarAdminException;
+
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
new file mode 100644
index 0000000..f2ff401
--- /dev/null
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Schemas;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.schema.DeleteSchemaResponse;
+import org.apache.pulsar.common.schema.GetSchemaResponse;
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+public class SchemasImpl extends BaseResource implements Schemas {
+
+ private final WebTarget target;
+
+ public SchemasImpl(WebTarget web, Authentication auth) {
+ super(auth);
+ this.target = web.path("/admin/v2/schemas");
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException {
+ try {
+ TopicName tn = TopicName.get(topic);
+ GetSchemaResponse response =
request(schemaPath(tn)).get(GetSchemaResponse.class);
+ SchemaInfo info = new SchemaInfo();
+ info.setSchema(response.getData().getBytes());
+ info.setType(response.getType());
+ info.setProperties(response.getProperties());
+ info.setName(tn.getLocalName());
+ return info;
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo(String topic, long version) throws
PulsarAdminException {
+ try {
+ TopicName tn = TopicName.get(topic);
+ GetSchemaResponse response =
request(schemaPath(tn).path(Long.toString(version))).get(GetSchemaResponse.class);
+ SchemaInfo info = new SchemaInfo();
+ info.setSchema(response.getData().getBytes());
+ info.setType(response.getType());
+ info.setProperties(response.getProperties());
+ info.setName(tn.getLocalName());
+ return info;
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void deleteSchema(String topic) throws PulsarAdminException {
+ try {
+ TopicName tn = TopicName.get(topic);
+ request(schemaPath(tn)).delete(DeleteSchemaResponse.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void createSchema(String topic, PostSchemaPayload payload) throws
PulsarAdminException {
+ try {
+ TopicName tn = TopicName.get(topic);
+ request(schemaPath(tn))
+ .post(Entity.json(payload), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ private WebTarget schemaPath(TopicName topicName) {
+ return target
+ .path(topicName.getTenant())
+ .path(topicName.getNamespacePortion())
+ .path(topicName.getEncodedLocalName())
+ .path("schema");
+ }
+}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
new file mode 100644
index 0000000..cab463f
--- /dev/null
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+
+@Parameters(commandDescription = "Operations about schemas")
+public class CmdSchemas extends CmdBase {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ public CmdSchemas(PulsarAdmin admin) {
+ super("schemas", admin);
+ jcommander.addCommand("get", new GetSchema());
+ jcommander.addCommand("delete", new DeleteSchema());
+ jcommander.addCommand("upload", new UploadSchema());
+ }
+
+ @Parameters(commandDescription = "Get the schema for a topic")
+ private class GetSchema extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--version" }, description = "version", required
= false)
+ private Long version;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ if (version == null) {
+ print(admin.schemas().getSchemaInfo(topic));
+ } else {
+ print(admin.schemas().getSchemaInfo(topic, version));
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Delete the latest schema for a topic")
+ private class DeleteSchema extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ admin.schemas().deleteSchema(topic);
+ }
+ }
+
+ @Parameters(commandDescription = "Update the schema for a topic")
+ private class UploadSchema extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-f", "--filename" }, description = "filename",
required = true)
+ private String schemaFileName;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ PostSchemaPayload input = MAPPER.readValue(new
File(schemaFileName), PostSchemaPayload.class);
+ admin.schemas().createSchema(topic, input);
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 6467072..27bac11 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -22,6 +22,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.FileInputStream;
+import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -88,6 +89,7 @@ public class PulsarAdminTool {
commandMap.put("properties", CmdTenants.CmdProperties.class); //
deprecated, doesn't show in usage()
commandMap.put("namespaces", CmdNamespaces.class);
commandMap.put("topics", CmdTopics.class);
+ commandMap.put("schemas", CmdSchemas.class);
// Hidden deprecated "persistent" and "non-persistent" subcommands
commandMap.put("persistent", CmdPersistentTopics.class);
@@ -109,7 +111,13 @@ public class PulsarAdminTool {
jcommander.addCommand(c.getKey(),
c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
}
} catch (Exception e) {
- System.err.println(e.getClass() + ": " + e.getMessage());
+ Throwable cause;
+ if (e instanceof InvocationTargetException && null !=
e.getCause()) {
+ cause = e.getCause();
+ } else {
+ cause = e;
+ }
+ System.err.println(cause.getClass() + ": " + cause.getMessage());
System.exit(1);
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
index b9c47b2..56e5f82 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
@@ -18,11 +18,15 @@
*/
package org.apache.pulsar.common.schema;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
@Builder
+@AllArgsConstructor
+@NoArgsConstructor
public class DeleteSchemaResponse {
- private SchemaVersion version;
+ private long version;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
index 0aaefb3..d2febd0 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.common.schema;
-final public class EmptyVersion implements SchemaVersion {
+public final class EmptyVersion implements SchemaVersion {
private static final byte[] EMPTY = new byte[]{};
@Override
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
index bc98b89..4f3df27 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
@@ -19,13 +19,17 @@
package org.apache.pulsar.common.schema;
import java.util.Map;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
@Builder
+@AllArgsConstructor
+@NoArgsConstructor
public class GetSchemaResponse {
- private SchemaVersion version;
+ private long version;
private SchemaType type;
private long timestamp;
private String data;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
index b26231c..25bf054 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.common.schema;
-final class LatestVersion implements SchemaVersion {
+public final class LatestVersion implements SchemaVersion {
private static final byte[] EMPTY = new byte[]{};
@Override
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
index af04418..a01caf5 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
@@ -19,9 +19,13 @@
package org.apache.pulsar.common.schema;
import java.util.Map;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
+@AllArgsConstructor
+@NoArgsConstructor
public class PostSchemaPayload {
private String type;
private String schema;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
index b12db5d..93c2932 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
@@ -18,11 +18,15 @@
*/
package org.apache.pulsar.common.schema;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
@Builder
+@AllArgsConstructor
+@NoArgsConstructor
public class PostSchemaResponse {
private SchemaVersion version;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index d97875f..117019e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.common.schema;
import java.util.Collections;
import java.util.Map;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
+@AllArgsConstructor
+@NoArgsConstructor
public class SchemaInfo {
private String name;
private byte[] schema;
diff --git
a/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
b/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
index 35698e5..e4a67c1 100644
---
a/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
+++
b/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
@@ -125,4 +125,62 @@ public class TestCLI extends Arquillian {
// expected
}
}
+
+ @Test
+ public void testSchemaCLI() throws Exception {
+ String broker = PulsarClusterUtils.brokerSet(docker,
clusterName).stream().findAny().get();
+
+ Assert.assertTrue(DockerUtils.runCommand(
+ docker, broker,
+ "/pulsar/bin/pulsar-client",
+ "produce",
+ "-m",
+ "\"test topic schema\"",
+ "-n",
+ "1",
+ "persistent://public/default/test-schema-cli"
+ ).contains("1 messages successfully produced"));
+
+ Assert.assertTrue(DockerUtils.runCommand(
+ docker, broker,
+ "/pulsar/bin/pulsar-admin",
+ "schemas",
+ "upload",
+ "persistent://public/default/test-schema-cli",
+ "-f",
+ "/pulsar/conf/schema_example.conf"
+ ).isEmpty());
+
+ // get schema
+ Assert.assertTrue(DockerUtils.runCommand(
+ docker, broker,
+ "/pulsar/bin/pulsar-admin",
+ "schemas",
+ "get",
+ "persistent://public/default/test-schema-cli"
+ ).contains("\"type\" : \"STRING\""));
+
+ // delete the schema
+ Assert.assertTrue(DockerUtils.runCommand(
+ docker, broker,
+ "/pulsar/bin/pulsar-admin",
+ "schemas",
+ "delete",
+ "persistent://public/default/test-schema-cli"
+ ).isEmpty());
+
+ // get schema again
+ try {
+ DockerUtils.runCommand(
+ docker, broker,
+ "/pulsar/bin/pulsar-admin",
+ "schemas",
+ "get",
+ "persistent://public/default/test-schema-cli"
+ );
+ fail("Should fail to get schema if the schema is deleted");
+ } catch (RuntimeException re) {
+ // expected
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].