This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e99ffe  Add CLI commands for schema registry (#1944)
4e99ffe is described below

commit 4e99ffedbe5d0239eb8e28c300e365bde07c9a44
Author: Dave Rusek <[email protected]>
AuthorDate: Fri Jun 15 12:44:20 2018 -0600

    Add CLI commands for schema registry (#1944)
    
    * Add CLI commands for schema registry
    
    * Rename commandline args
    
    * Fix License Headers Issue
    
    * - all schema structures used in rest apis should have default constructors
    - change version to long in rest apis
    - fix schema cli issues
    
    * Add integration tests for Schema CLI
    
    * Exclude schema example file from license check
    
    * Exclude schema_example.conf from apache-rat:check
---
 conf/schema_example.conf                           |   7 ++
 pom.xml                                            |   4 +
 .../pulsar/broker/admin/v2/SchemasResource.java    |  24 +++--
 .../broker/service/schema/LongSchemaVersion.java   |   4 +-
 .../apache/pulsar/client/admin/PulsarAdmin.java    |  10 ++
 .../org/apache/pulsar/client/admin/Schemas.java    |  65 +++++++++++++
 .../pulsar/client/admin/internal/SchemasImpl.java  | 102 +++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdSchemas.java    |  86 +++++++++++++++++
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |  10 +-
 .../pulsar/common/schema/DeleteSchemaResponse.java |   6 +-
 .../apache/pulsar/common/schema/EmptyVersion.java  |   2 +-
 .../pulsar/common/schema/GetSchemaResponse.java    |   6 +-
 .../apache/pulsar/common/schema/LatestVersion.java |   2 +-
 .../pulsar/common/schema/PostSchemaPayload.java    |   4 +
 .../pulsar/common/schema/PostSchemaResponse.java   |   4 +
 .../apache/pulsar/common/schema/SchemaInfo.java    |   4 +
 .../apache/pulsar/tests/integration/TestCLI.java   |  58 ++++++++++++
 17 files changed, 385 insertions(+), 13 deletions(-)

diff --git a/conf/schema_example.conf b/conf/schema_example.conf
new file mode 100644
index 0000000..6681035
--- /dev/null
+++ b/conf/schema_example.conf
@@ -0,0 +1,7 @@
+{
+    "type": "STRING",
+    "schema": "",
+    "properties": {
+        "key1" : "value1"
+    }
+}
diff --git a/pom.xml b/pom.xml
index 7b3d23e..90af15d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -927,6 +927,7 @@ flexible messaging model and an intuitive client 
API.</description>
             <exclude>**/ByteBufCodedOutputStream.java</exclude>
             <exclude>bin/proto/*</exclude>
             <exclude>**/*.patch</exclude>
+            <exclude>conf/schema_example.conf</exclude>
             <exclude>data/**</exclude>
             <exclude>logs/**</exclude>
             <exclude>**/*.versionsBackup</exclude>
@@ -1055,6 +1056,9 @@ flexible messaging model and an intuitive client 
API.</description>
 
             <!-- Python requirements files -->
             <exclude>**/requirements.txt</exclude>
+
+            <!-- Configuration Templates -->
+            <exclude>conf/schema_example.conf</exclude>
           </excludes>
         </configuration>
       </plugin>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 043233d..8bc511d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -27,7 +27,6 @@ import com.google.common.base.Charsets;
 import io.swagger.annotations.ApiOperation;
 import java.nio.ByteBuffer;
 import java.time.Clock;
-import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.Encoded;
@@ -42,6 +41,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
+import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.DeleteSchemaResponse;
@@ -51,10 +51,14 @@ import org.apache.pulsar.common.schema.PostSchemaResponse;
 import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Path("/schemas")
 public class SchemasResource extends AdminResource {
 
+    private static final Logger log = 
LoggerFactory.getLogger(SchemasResource.class);
+
     private final Clock clock;
 
     public SchemasResource() {
@@ -67,6 +71,14 @@ public class SchemasResource extends AdminResource {
         this.clock = clock;
     }
 
+    private long getLongSchemaVersion(SchemaVersion schemaVersion) {
+        if (schemaVersion instanceof LongSchemaVersion) {
+            return ((LongSchemaVersion) schemaVersion).getVersion();
+        } else {
+            return -1L;
+        }
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/schema")
     @Produces(MediaType.APPLICATION_JSON)
@@ -86,13 +98,13 @@ public class SchemasResource extends AdminResource {
                     if (isNull(schema)) {
                         
response.resume(Response.status(Response.Status.NOT_FOUND).build());
                     } else if (schema.schema.isDeleted()) {
-                        response.resume(Response.noContent().build());
+                        
response.resume(Response.status(Response.Status.NOT_FOUND).build());
                     } else {
                         response.resume(
                             Response.ok()
                                 .encoding(MediaType.APPLICATION_JSON)
                                 .entity(GetSchemaResponse.builder()
-                                    .version(schema.version)
+                                    
.version(getLongSchemaVersion(schema.version))
                                     .type(schema.schema.getType())
                                     .timestamp(schema.schema.getTimestamp())
                                     .data(new String(schema.schema.getData()))
@@ -132,13 +144,13 @@ public class SchemasResource extends AdminResource {
                     if (isNull(schema)) {
                         
response.resume(Response.status(Response.Status.NOT_FOUND).build());
                     } else if (schema.schema.isDeleted()) {
-                        response.resume(Response.noContent().build());
+                        
response.resume(Response.status(Response.Status.NOT_FOUND).build());
                     } else {
                         response.resume(
                             Response.ok()
                                 .encoding(MediaType.APPLICATION_JSON)
                                 .entity(GetSchemaResponse.builder()
-                                    .version(schema.version)
+                                    
.version(getLongSchemaVersion(schema.version))
                                     .type(schema.schema.getType())
                                     .timestamp(schema.schema.getTimestamp())
                                     .data(new String(schema.schema.getData()))
@@ -173,7 +185,7 @@ public class SchemasResource extends AdminResource {
                     response.resume(
                         Response.ok().entity(
                             DeleteSchemaResponse.builder()
-                                .version(version)
+                                .version(getLongSchemaVersion(version))
                                 .build()
                         ).build()
                     );
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
index a837cd4..7fa0795 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/LongSchemaVersion.java
@@ -23,10 +23,10 @@ import java.nio.ByteBuffer;
 import java.util.Objects;
 import org.apache.pulsar.common.schema.SchemaVersion;
 
-class LongSchemaVersion implements SchemaVersion {
+public class LongSchemaVersion implements SchemaVersion {
     private final long version;
 
-    LongSchemaVersion(long version) {
+    public LongSchemaVersion(long version) {
         this.version = version;
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 369de79..b2db7e2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -40,6 +40,7 @@ import 
org.apache.pulsar.client.admin.internal.JacksonConfigurator;
 import org.apache.pulsar.client.admin.internal.LookupImpl;
 import org.apache.pulsar.client.admin.internal.NamespacesImpl;
 import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
+import org.apache.pulsar.client.admin.internal.SchemasImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
@@ -81,6 +82,7 @@ public class PulsarAdmin implements Closeable {
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
+    private final Schemas schemas;
     protected final WebTarget root;
     protected final Authentication auth;
 
@@ -183,6 +185,7 @@ public class PulsarAdmin implements Closeable {
         this.resourceQuotas = new ResourceQuotasImpl(root, auth);
         this.lookups = new LookupImpl(root, auth, useTls);
         this.functions = new FunctionsImpl(root, auth);
+        this.schemas = new SchemasImpl(root, auth);
     }
 
     /**
@@ -363,6 +366,13 @@ public class PulsarAdmin implements Closeable {
     }
 
     /**
+     * @return the schemas
+     */
+    public Schemas schemas() {
+        return schemas;
+    }
+
+    /**
      * Close the Pulsar admin client to release all the resources
      */
     @Override
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
new file mode 100644
index 0000000..dfb89d4
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * Admin interface on interacting with schemas.
+ */
+public interface Schemas {
+
+    /**
+     * Retrieve the latest schema of a topic.
+     *
+     * @param topic topic name, in fully qualified format
+     * @return latest schema
+     * @throws PulsarAdminException
+     */
+    SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException;
+
+    /**
+     * Retrieve the schema of a topic at a given <tt>version</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @param version schema version
+     * @return the schema info at a given <tt>version</tt>
+     * @throws PulsarAdminException
+     */
+    SchemaInfo getSchemaInfo(String topic, long version) throws 
PulsarAdminException;
+
+    /**
+     * Delete the schema associated with a given <tt>topic</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @throws PulsarAdminException
+     */
+    void deleteSchema(String topic) throws PulsarAdminException;
+
+    /**
+     * Create a schema for a given <tt>topic</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @param schemaPayload schema payload
+     * @throws PulsarAdminException
+     */
+    void createSchema(String topic, PostSchemaPayload schemaPayload) throws 
PulsarAdminException;
+
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
new file mode 100644
index 0000000..f2ff401
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Schemas;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.schema.DeleteSchemaResponse;
+import org.apache.pulsar.common.schema.GetSchemaResponse;
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+public class SchemasImpl extends BaseResource implements Schemas {
+
+    private final WebTarget target;
+
+    public SchemasImpl(WebTarget web, Authentication auth) {
+        super(auth);
+        this.target = web.path("/admin/v2/schemas");
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException {
+        try {
+            TopicName tn = TopicName.get(topic);
+            GetSchemaResponse response = 
request(schemaPath(tn)).get(GetSchemaResponse.class);
+            SchemaInfo info = new SchemaInfo();
+            info.setSchema(response.getData().getBytes());
+            info.setType(response.getType());
+            info.setProperties(response.getProperties());
+            info.setName(tn.getLocalName());
+            return info;
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo(String topic, long version) throws 
PulsarAdminException {
+        try {
+            TopicName tn = TopicName.get(topic);
+            GetSchemaResponse response = 
request(schemaPath(tn).path(Long.toString(version))).get(GetSchemaResponse.class);
+            SchemaInfo info = new SchemaInfo();
+            info.setSchema(response.getData().getBytes());
+            info.setType(response.getType());
+            info.setProperties(response.getProperties());
+            info.setName(tn.getLocalName());
+            return info;
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void deleteSchema(String topic) throws PulsarAdminException {
+        try {
+            TopicName tn = TopicName.get(topic);
+            request(schemaPath(tn)).delete(DeleteSchemaResponse.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void createSchema(String topic, PostSchemaPayload payload) throws 
PulsarAdminException {
+        try {
+            TopicName tn = TopicName.get(topic);
+            request(schemaPath(tn))
+                .post(Entity.json(payload), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    private WebTarget schemaPath(TopicName topicName) {
+        return target
+            .path(topicName.getTenant())
+            .path(topicName.getNamespacePortion())
+            .path(topicName.getEncodedLocalName())
+            .path("schema");
+    }
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
new file mode 100644
index 0000000..cab463f
--- /dev/null
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+
+@Parameters(commandDescription = "Operations about schemas")
+public class CmdSchemas extends CmdBase {
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    public CmdSchemas(PulsarAdmin admin) {
+        super("schemas", admin);
+        jcommander.addCommand("get", new GetSchema());
+        jcommander.addCommand("delete", new DeleteSchema());
+        jcommander.addCommand("upload", new UploadSchema());
+    }
+
+    @Parameters(commandDescription = "Get the schema for a topic")
+    private class GetSchema extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--version" }, description = "version", required 
= false)
+        private Long version;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            if (version == null) {
+                print(admin.schemas().getSchemaInfo(topic));
+            } else {
+                print(admin.schemas().getSchemaInfo(topic, version));
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Delete the latest schema for a topic")
+    private class DeleteSchema extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            admin.schemas().deleteSchema(topic);
+        }
+    }
+
+    @Parameters(commandDescription = "Update the schema for a topic")
+    private class UploadSchema extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-f", "--filename" }, description = "filename", 
required = true)
+        private String schemaFileName;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            PostSchemaPayload input = MAPPER.readValue(new 
File(schemaFileName), PostSchemaPayload.class);
+            admin.schemas().createSchema(topic, input);
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 6467072..27bac11 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -22,6 +22,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 
 import java.io.FileInputStream;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -88,6 +89,7 @@ public class PulsarAdminTool {
         commandMap.put("properties", CmdTenants.CmdProperties.class); // 
deprecated, doesn't show in usage()
         commandMap.put("namespaces", CmdNamespaces.class);
         commandMap.put("topics", CmdTopics.class);
+        commandMap.put("schemas", CmdSchemas.class);
 
         // Hidden deprecated "persistent" and "non-persistent" subcommands
         commandMap.put("persistent", CmdPersistentTopics.class);
@@ -109,7 +111,13 @@ public class PulsarAdminTool {
                 jcommander.addCommand(c.getKey(), 
c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
             }
         } catch (Exception e) {
-            System.err.println(e.getClass() + ": " + e.getMessage());
+            Throwable cause;
+            if (e instanceof InvocationTargetException && null != 
e.getCause()) {
+                cause = e.getCause();
+            } else {
+                cause = e;
+            }
+            System.err.println(cause.getClass() + ": " + cause.getMessage());
             System.exit(1);
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
index b9c47b2..56e5f82 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
@@ -18,11 +18,15 @@
  */
 package org.apache.pulsar.common.schema;
 
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 @Data
 @Builder
+@AllArgsConstructor
+@NoArgsConstructor
 public class DeleteSchemaResponse {
-    private SchemaVersion version;
+    private long version;
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
index 0aaefb3..d2febd0 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.common.schema;
 
-final public class EmptyVersion implements SchemaVersion {
+public final class EmptyVersion implements SchemaVersion {
     private static final byte[] EMPTY = new byte[]{};
 
     @Override
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
index bc98b89..4f3df27 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
@@ -19,13 +19,17 @@
 package org.apache.pulsar.common.schema;
 
 import java.util.Map;
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 @Data
 @Builder
+@AllArgsConstructor
+@NoArgsConstructor
 public class GetSchemaResponse {
-    private SchemaVersion version;
+    private long version;
     private SchemaType type;
     private long timestamp;
     private String data;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
index b26231c..25bf054 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.common.schema;
 
-final class LatestVersion implements SchemaVersion {
+public final class LatestVersion implements SchemaVersion {
     private static final byte[] EMPTY = new byte[]{};
 
     @Override
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
index af04418..a01caf5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
@@ -19,9 +19,13 @@
 package org.apache.pulsar.common.schema;
 
 import java.util.Map;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 @Data
+@AllArgsConstructor
+@NoArgsConstructor
 public class PostSchemaPayload {
     private String type;
     private String schema;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
index b12db5d..93c2932 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
@@ -18,11 +18,15 @@
  */
 package org.apache.pulsar.common.schema;
 
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 @Data
 @Builder
+@AllArgsConstructor
+@NoArgsConstructor
 public class PostSchemaResponse {
     private SchemaVersion version;
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index d97875f..117019e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.common.schema;
 import java.util.Collections;
 import java.util.Map;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 @Data
+@AllArgsConstructor
+@NoArgsConstructor
 public class SchemaInfo {
     private String name;
     private byte[] schema;
diff --git 
a/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
 
b/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
index 35698e5..e4a67c1 100644
--- 
a/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
+++ 
b/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
@@ -125,4 +125,62 @@ public class TestCLI extends Arquillian {
             // expected
         }
     }
+
+    @Test
+    public void testSchemaCLI() throws Exception {
+        String broker = PulsarClusterUtils.brokerSet(docker, 
clusterName).stream().findAny().get();
+
+        Assert.assertTrue(DockerUtils.runCommand(
+            docker, broker,
+            "/pulsar/bin/pulsar-client",
+            "produce",
+            "-m",
+            "\"test topic schema\"",
+            "-n",
+            "1",
+            "persistent://public/default/test-schema-cli"
+        ).contains("1 messages successfully produced"));
+
+        Assert.assertTrue(DockerUtils.runCommand(
+            docker, broker,
+            "/pulsar/bin/pulsar-admin",
+            "schemas",
+            "upload",
+            "persistent://public/default/test-schema-cli",
+            "-f",
+            "/pulsar/conf/schema_example.conf"
+        ).isEmpty());
+
+        // get schema
+        Assert.assertTrue(DockerUtils.runCommand(
+            docker, broker,
+            "/pulsar/bin/pulsar-admin",
+            "schemas",
+            "get",
+            "persistent://public/default/test-schema-cli"
+        ).contains("\"type\" : \"STRING\""));
+
+        // delete the schema
+        Assert.assertTrue(DockerUtils.runCommand(
+            docker, broker,
+            "/pulsar/bin/pulsar-admin",
+            "schemas",
+            "delete",
+            "persistent://public/default/test-schema-cli"
+        ).isEmpty());
+
+        // get schema again
+        try {
+            DockerUtils.runCommand(
+                docker, broker,
+                "/pulsar/bin/pulsar-admin",
+                "schemas",
+                "get",
+                "persistent://public/default/test-schema-cli"
+            );
+            fail("Should fail to get schema if the schema is deleted");
+        } catch (RuntimeException re) {
+            // expected
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to