This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b2678be  [REST] Rest API Produce message. (#8125)
b2678be is described below

commit b2678be0a97580d69da0b543a499efb3d9adbd5e
Author: Marvin Cai <[email protected]>
AuthorDate: Wed Sep 29 06:56:03 2021 -0700

    [REST] Rest API Produce message. (#8125)
    
    ### Motivation
    PIP 64: 
https://github.com/apache/pulsar/wiki/PIP-64%3A-Introduce-REST-endpoints-for-producing%2C-consuming-and-reading-messages
    
    Tested with Postman
    
    ### Modifications
    
    Add produce message rest api.
---
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +
 .../broker/rest/RestMessagePublishContext.java     |  89 +++
 .../java/org/apache/pulsar/broker/rest/Topics.java | 160 ++++
 .../org/apache/pulsar/broker/rest/TopicsBase.java  | 768 +++++++++++++++++++
 .../apache/pulsar/broker/rest/package-info.java    |  29 +-
 .../pulsar/broker/service/BrokerService.java       |   4 +
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   9 +-
 .../org/apache/pulsar/broker/admin/TopicsTest.java | 831 +++++++++++++++++++++
 .../impl/schema/generic/GenericJsonRecord.java     |   2 +-
 .../pulsar/websocket/data/ProducerMessages.java    |   2 +-
 11 files changed, 1861 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5cd6813..86b9d94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -803,6 +803,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 "org.apache.pulsar.broker.admin.v3", true, attributeMap);
         webService.addRestResources("/lookup",
                 "org.apache.pulsar.broker.lookup", true, attributeMap);
+        webService.addRestResources("/topics",
+                            "org.apache.pulsar.broker.rest", true, 
attributeMap);
 
         // Add metrics servlet
         webService.addServlet("/metrics",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
new file mode 100644
index 0000000..9891572
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.rest;
+
+import io.netty.util.Recycler;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.Topic;
+
+/**
+ * PublishContext implementation for REST message publishing.
+ */
+@Slf4j
+public class RestMessagePublishContext implements Topic.PublishContext {
+
+    private Topic topic;
+    private long startTimeNs;
+    private CompletableFuture<PositionImpl> positionFuture;
+
+    /**
+     * Executed from managed ledger thread when the message is persisted.
+     */
+    @Override
+    public void completed(Exception exception, long ledgerId, long entryId) {
+        if (exception != null) {
+            positionFuture.completeExceptionally(exception);
+            if (log.isInfoEnabled()) {
+                log.info("Failed to write entry for rest produce request: 
ledgerId: {}, entryId: {}. "
+                                + "triggered send callback.",
+                        ledgerId, entryId);
+            }
+        } else {
+            if (log.isInfoEnabled()) {
+                log.info("Success write topic for rest produce request: {}, 
ledgerId: {}, entryId: {}. "
+                                + "triggered send callback.",
+                        topic.getName(), ledgerId, entryId);
+            }
+            topic.recordAddLatency(System.nanoTime() - startTimeNs, 
TimeUnit.MICROSECONDS);
+            positionFuture.complete(PositionImpl.get(ledgerId, entryId));
+        }
+        recycle();
+    }
+
+    // recycler
+    public static RestMessagePublishContext 
get(CompletableFuture<PositionImpl> positionFuture, Topic topic,
+                                                     long startTimeNs) {
+        RestMessagePublishContext callback = RECYCLER.get();
+        callback.positionFuture = positionFuture;
+        callback.topic = topic;
+        callback.startTimeNs = startTimeNs;
+        return callback;
+    }
+
+    private final Recycler.Handle<RestMessagePublishContext> recyclerHandle;
+
+    private 
RestMessagePublishContext(Recycler.Handle<RestMessagePublishContext> 
recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    private static final Recycler<RestMessagePublishContext> RECYCLER = new 
Recycler<RestMessagePublishContext>() {
+        protected RestMessagePublishContext 
newObject(Handle<RestMessagePublishContext> handle) {
+            return new RestMessagePublishContext(handle);
+        }
+    };
+
+    public void recycle() {
+        topic = null;
+        startTimeNs = -1;
+        recyclerHandle.recycle(this);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
new file mode 100644
index 0000000..a8095f0
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.rest;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import org.apache.pulsar.websocket.data.ProducerMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/")
+@Consumes(MediaType.APPLICATION_JSON)
+@Produces(MediaType.APPLICATION_JSON)
+@Api(value = "/persistent", description = "Apis for produce,consume and ack 
message on topics.", tags = "topics")
+public class Topics extends TopicsBase {
+    private static final Logger log = LoggerFactory.getLogger(Topics.class);
+
+    @POST
+    @Path("/persistent/{tenant}/{namespace}/{topic}")
+    @ApiOperation(value = "Produce message to a persistent topic.", response = 
String.class, responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
+            @ApiResponse(code = 412, message = "Namespace name is not valid"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    public void produceOnPersistentTopic(@Suspended final AsyncResponse 
asyncResponse,
+                               @ApiParam(value = "Specify the tenant", 
required = true)
+                               @PathParam("tenant") String tenant,
+                               @ApiParam(value = "Specify the namespace", 
required = true)
+                               @PathParam("namespace") String namespace,
+                               @ApiParam(value = "Specify topic name", 
required = true)
+                               @PathParam("topic") @Encoded String 
encodedTopic,
+                               @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative,
+                               ProducerMessages producerMessages) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            validateProducePermission();
+            publishMessages(asyncResponse, producerMessages, authoritative);
+        } catch (Exception e) {
+            log.error("[{}] Failed to produce on topic {}", clientAppId(), 
topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    @POST
+    @Path("/persistent/{tenant}/{namespace}/{topic}/partitions/{partition}")
+    @ApiOperation(value = "Produce message to a partition of a persistent 
topic.",
+            response = String.class, responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
+            @ApiResponse(code = 412, message = "Namespace name is not valid"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    public void produceOnPersistentTopicPartition(@Suspended final 
AsyncResponse asyncResponse,
+                                        @ApiParam(value = "Specify the 
tenant", required = true)
+                                        @PathParam("tenant") String tenant,
+                                        @ApiParam(value = "Specify the 
namespace", required = true)
+                                        @PathParam("namespace") String 
namespace,
+                                        @ApiParam(value = "Specify topic 
name", required = true)
+                                        @PathParam("topic") @Encoded String 
encodedTopic,
+                                        @ApiParam(value = "Specify topic 
partition", required = true)
+                                        @PathParam("partition") int partition,
+                                        @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative,
+                                        ProducerMessages producerMessages) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            validateProducePermission();
+            publishMessagesToPartition(asyncResponse, producerMessages, 
authoritative, partition);
+        } catch (Exception e) {
+            log.error("[{}] Failed to produce on topic {}", clientAppId(), 
topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    @POST
+    @Path("/non-persistent/{tenant}/{namespace}/{topic}")
+    @ApiOperation(value = "Produce message to a persistent topic.", response = 
String.class, responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
+            @ApiResponse(code = 412, message = "Namespace name is not valid"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    public void produceOnNonPersistentTopic(@Suspended final AsyncResponse 
asyncResponse,
+                                         @ApiParam(value = "Specify the 
tenant", required = true)
+                                         @PathParam("tenant") String tenant,
+                                         @ApiParam(value = "Specify the 
namespace", required = true)
+                                         @PathParam("namespace") String 
namespace,
+                                         @ApiParam(value = "Specify topic 
name", required = true)
+                                         @PathParam("topic") @Encoded String 
encodedTopic,
+                                         @QueryParam("authoritative") 
@DefaultValue("false")
+                                                        boolean authoritative,
+                                         ProducerMessages producerMessages) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            validateProducePermission();
+            publishMessages(asyncResponse, producerMessages, authoritative);
+        } catch (Exception e) {
+            log.error("[{}] Failed to produce on topic {}", clientAppId(), 
topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    @POST
+    
@Path("/non-persistent/{tenant}/{namespace}/{topic}/partitions/{partition}")
+    @ApiOperation(value = "Produce message to a partition of a persistent 
topic.",
+            response = String.class, responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
+            @ApiResponse(code = 412, message = "Namespace name is not valid"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    public void produceOnNonPersistentTopicPartition(@Suspended final 
AsyncResponse asyncResponse,
+                                                  @ApiParam(value = "Specify 
the tenant", required = true)
+                                                  @PathParam("tenant") String 
tenant,
+                                                  @ApiParam(value = "Specify 
the namespace", required = true)
+                                                  @PathParam("namespace") 
String namespace,
+                                                  @ApiParam(value = "Specify 
topic name", required = true)
+                                                  @PathParam("topic") @Encoded 
String encodedTopic,
+                                                  @ApiParam(value = "Specify 
topic partition", required = true)
+                                                  @PathParam("partition") int 
partition,
+                                                  @QueryParam("authoritative") 
@DefaultValue("false")
+                                                                 boolean 
authoritative,
+                                                  ProducerMessages 
producerMessages) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            validateProducePermission();
+            publishMessagesToPartition(asyncResponse, producerMessages, 
authoritative, partition);
+        } catch (Exception e) {
+            log.error("[{}] Failed to produce on topic {}", clientAppId(), 
topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
new file mode 100644
index 0000000..76ce022
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -0,0 +1,768 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.rest;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroWriter;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonWriter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.websocket.data.ProducerAck;
+import org.apache.pulsar.websocket.data.ProducerAcks;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessages;
+
+/**
+ * Contains methods used by REST api to producer/consumer/read messages 
to/from pulsar topics.
+ */
+@Slf4j
+public class TopicsBase extends PersistentTopicsBase {
+
+    private static String defaultProducerName = "RestProducer";
+
+    // Publish message to a topic, can be partitioned or non-partitioned
+    protected void publishMessages(AsyncResponse asyncResponse, 
ProducerMessages request, boolean authoritative) {
+        String topic = topicName.getPartitionedTopicName();
+        try {
+            if 
(pulsar().getBrokerService().getOwningTopics().containsKey(topic)
+                    || !findOwnerBrokerForTopic(authoritative, asyncResponse)) 
{
+                // If we've done look up or or after look up this broker owns 
some of the partitions
+                // then proceed to publish message else asyncResponse will be 
complete by look up.
+                addOrGetSchemaForTopic(getSchemaData(request.getKeySchema(), 
request.getValueSchema()),
+                        request.getSchemaVersion() == -1 ? null : new 
LongSchemaVersion(request.getSchemaVersion()))
+                        .thenAccept(schemaMeta -> {
+                            // Both schema version and schema data are 
necessary.
+                            if (schemaMeta.getLeft() != null && 
schemaMeta.getRight() != null) {
+                                internalPublishMessages(topicName, request, 
pulsar().getBrokerService()
+                                                
.getOwningTopics().get(topic).values(), asyncResponse,
+                                        
AutoConsumeSchema.getSchema(schemaMeta.getLeft().toSchemaInfo()),
+                                        schemaMeta.getRight());
+                            } else {
+                                asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                                        "Fail to add or retrieve schema."));
+                            }
+                        }).exceptionally(e -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Fail to publish message: " + 
e.getMessage());
+                    }
+                    asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message:"
+                            + e.getMessage()));
+                    return null;
+                });
+            }
+        } catch (Exception e) {
+            asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message: "
+                    + e.getMessage()));
+        }
+    }
+
+    // Publish message to single partition of a partitioned topic.
+    protected void publishMessagesToPartition(AsyncResponse asyncResponse, 
ProducerMessages request,
+                                                     boolean authoritative, 
int partition) {
+        if (topicName.isPartitioned()) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Topic 
name can't contain "
+                    + "'-partition-' suffix."));
+        }
+        String topic = topicName.getPartitionedTopicName();
+        try {
+            // If broker owns the partition then proceed to publish message, 
else do look up.
+            if 
((pulsar().getBrokerService().getOwningTopics().containsKey(topic)
+                    && pulsar().getBrokerService().getOwningTopics().get(topic)
+                    .contains(partition))
+                    || !findOwnerBrokerForTopic(authoritative, asyncResponse)) 
{
+                addOrGetSchemaForTopic(getSchemaData(request.getKeySchema(), 
request.getValueSchema()),
+                        request.getSchemaVersion() == -1 ? null : new 
LongSchemaVersion(request.getSchemaVersion()))
+                        .thenAccept(schemaMeta -> {
+                            // Both schema version and schema data are 
necessary.
+                            if (schemaMeta.getLeft() != null && 
schemaMeta.getRight() != null) {
+                                internalPublishMessagesToPartition(topicName, 
request, partition, asyncResponse,
+                                        
AutoConsumeSchema.getSchema(schemaMeta.getLeft().toSchemaInfo()),
+                                        schemaMeta.getRight());
+                            } else {
+                                asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                                        "Fail to add or retrieve schema."));
+                            }
+                        }).exceptionally(e -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Fail to publish message to single 
partition: " + e.getLocalizedMessage());
+                    }
+                    asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message"
+                            + "to single partition: "
+                            + e.getMessage()));
+                    return null;
+                });
+            }
+        } catch (Exception e) {
+            asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message: "
+                    + e.getMessage()));
+        }
+    }
+
+    private void internalPublishMessagesToPartition(TopicName topicName, 
ProducerMessages request,
+                                                  int partition, AsyncResponse 
asyncResponse,
+                                                  Schema schema, SchemaVersion 
schemaVersion) {
+        try {
+            String producerName = (null == request.getProducerName() || 
request.getProducerName().isEmpty())
+                    ? defaultProducerName : request.getProducerName();
+            List<Message> messages = buildMessage(request, schema, 
producerName, topicName);
+            List<CompletableFuture<PositionImpl>> publishResults = new 
ArrayList<>();
+            List<ProducerAck> produceMessageResults = new ArrayList<>();
+            for (int index = 0; index < messages.size(); index++) {
+                ProducerAck produceMessageResult = new ProducerAck();
+                produceMessageResult.setMessageId(partition + "");
+                produceMessageResults.add(produceMessageResult);
+                
publishResults.add(publishSingleMessageToPartition(topicName.getPartition(partition).toString(),
+                        messages.get(index)));
+            }
+            FutureUtil.waitForAll(publishResults).thenRun(() -> {
+                processPublishMessageResults(produceMessageResults, 
publishResults);
+                asyncResponse.resume(Response.ok().entity(new 
ProducerAcks(produceMessageResults,
+                        ((LongSchemaVersion) 
schemaVersion).getVersion())).build());
+            }).exceptionally(e -> {
+                // Some message may published successfully, so till return ok 
with result for each individual message.
+                processPublishMessageResults(produceMessageResults, 
publishResults);
+                asyncResponse.resume(Response.ok().entity(new 
ProducerAcks(produceMessageResults,
+                        ((LongSchemaVersion) 
schemaVersion).getVersion())).build());
+                return null;
+            });
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Fail publish messages to single partition with rest 
produce message "
+                                + "request for topic  {}: {} ", topicName, 
e.getCause());
+            }
+            asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()));
+        }
+    }
+
+    private void internalPublishMessages(TopicName topicName, ProducerMessages 
request,
+                                                     List<Integer> 
partitionIndexes,
+                                                     AsyncResponse 
asyncResponse, Schema schema,
+                                                     SchemaVersion 
schemaVersion) {
+        if (partitionIndexes.size() < 1) {
+            asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                    new BrokerServiceException.TopicNotFoundException("Topic 
not owned by current broker.")));
+        }
+        try {
+            String producerName = (null == request.getProducerName() || 
request.getProducerName().isEmpty())
+                    ? defaultProducerName : request.getProducerName();
+            List<Message> messages = buildMessage(request, schema, 
producerName, topicName);
+            List<CompletableFuture<PositionImpl>> publishResults = new 
ArrayList<>();
+            List<ProducerAck> produceMessageResults = new ArrayList<>();
+            // Try to publish messages to all partitions this broker owns in 
round robin mode.
+            for (int index = 0; index < messages.size(); index++) {
+                ProducerAck produceMessageResult = new ProducerAck();
+                produceMessageResult.setMessageId(partitionIndexes.get(index % 
(int) partitionIndexes.size()) + "");
+                produceMessageResults.add(produceMessageResult);
+                
publishResults.add(publishSingleMessageToPartition(topicName.getPartition(partitionIndexes
+                                .get(index % (int) 
partitionIndexes.size())).toString(),
+                        messages.get(index)));
+            }
+            FutureUtil.waitForAll(publishResults).thenRun(() -> {
+                processPublishMessageResults(produceMessageResults, 
publishResults);
+                asyncResponse.resume(Response.ok().entity(new 
ProducerAcks(produceMessageResults,
+                        ((LongSchemaVersion) 
schemaVersion).getVersion())).build());
+            }).exceptionally(e -> {
+                // Some message may published successfully, so till return ok 
with result for each individual message.
+                processPublishMessageResults(produceMessageResults, 
publishResults);
+                asyncResponse.resume(Response.ok().entity(new 
ProducerAcks(produceMessageResults,
+                        ((LongSchemaVersion) 
schemaVersion).getVersion())).build());
+                return null;
+            });
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Fail to publish messages with rest produce message 
request for topic  {}: {} ",
+                        topicName, e.getCause());
+            }
+            asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()));
+        }
+    }
+
+    private CompletableFuture<PositionImpl> 
publishSingleMessageToPartition(String topic, Message message) {
+        CompletableFuture<PositionImpl> publishResult = new 
CompletableFuture<>();
+        pulsar().getBrokerService().getTopic(topic, false)
+        .thenAccept(t -> {
+            // TODO: Check message backlog and fail if backlog too large.
+            if (!t.isPresent()) {
+                // Topic not found, and remove from owning partition list.
+                publishResult.completeExceptionally(new 
BrokerServiceException.TopicNotFoundException("Topic not "
+                        + "owned by current broker."));
+                TopicName topicName = TopicName.get(topic);
+                
pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName())
+                        .remove(topicName.getPartitionIndex());
+            } else {
+                try {
+                    t.get().publishMessage(messageToByteBuf(message),
+                            RestMessagePublishContext.get(publishResult, 
t.get(), System.nanoTime()));
+                } catch (Exception e) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Fail to publish single messages to topic  
{}: {} ",
+                                topicName, e.getCause());
+                    }
+                    publishResult.completeExceptionally(e);
+                }
+            }
+        });
+
+        return publishResult;
+    }
+
+    // Process results for all message publishing attempts
+    private void processPublishMessageResults(List<ProducerAck> 
produceMessageResults,
+                                              
List<CompletableFuture<PositionImpl>> publishResults) {
+        // process publish message result
+        for (int index = 0; index < publishResults.size(); index++) {
+            try {
+                PositionImpl position = publishResults.get(index).get();
+                MessageId messageId = new 
MessageIdImpl(position.getLedgerId(), position.getEntryId(),
+                        
Integer.parseInt(produceMessageResults.get(index).getMessageId()));
+                
produceMessageResults.get(index).setMessageId(messageId.toString());
+            } catch (Exception e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Fail publish [{}] message with rest produce 
message request for topic  {}: {} ",
+                            index, topicName);
+                }
+                if (e instanceof 
BrokerServiceException.TopicNotFoundException) {
+                    // Topic ownership might changed, force to look up again.
+                    
pulsar().getBrokerService().getOwningTopics().remove(topicName.getPartitionedTopicName());
+                }
+                extractException(e, produceMessageResults.get(index));
+            }
+        }
+    }
+
+    // Return error code depends on exception we got indicating if client 
should retry with same broker.
+    private void extractException(Exception e, ProducerAck 
produceMessageResult) {
+        if (!(e instanceof BrokerServiceException.TopicFencedException && e 
instanceof ManagedLedgerException)) {
+            produceMessageResult.setErrorCode(2);
+        } else {
+            produceMessageResult.setErrorCode(1);
+        }
+        produceMessageResult.setErrorMsg(e.getMessage());
+    }
+
+    // Look up topic owner for given topic. Return if asyncResponse has been 
completed
+    // which indicating redirect or exception.
+    private boolean findOwnerBrokerForTopic(boolean authoritative, 
AsyncResponse asyncResponse) {
+        PartitionedTopicMetadata metadata = 
internalGetPartitionedMetadata(authoritative, false);
+        List<String> redirectAddresses = Collections.synchronizedList(new 
ArrayList<>());
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        List<CompletableFuture<Void>> lookupFutures = new ArrayList<>();
+        if (!topicName.isPartitioned() && metadata.partitions > 1) {
+            // Partitioned topic with multiple partitions, need to do look up 
for each partition.
+            for (int index = 0; index < metadata.partitions; index++) {
+                
lookupFutures.add(lookUpBrokerForTopic(topicName.getPartition(index),
+                        authoritative, redirectAddresses));
+            }
+        } else {
+            // Non-partitioned topic or specific topic partition.
+            lookupFutures.add(lookUpBrokerForTopic(topicName, authoritative, 
redirectAddresses));
+        }
+
+        FutureUtil.waitForAll(lookupFutures)
+        .thenRun(() -> {
+            processLookUpResult(redirectAddresses, asyncResponse, future);
+        }).exceptionally(e -> {
+            processLookUpResult(redirectAddresses, asyncResponse, future);
+            return null;
+        });
+        try {
+            return future.get();
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Fail to lookup topic for rest produce message 
request for topic {}.", topicName.toString());
+            }
+            if (!asyncResponse.isDone()) {
+                asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR, "Internal error: "
+                        + e.getMessage()));
+            }
+            return true;
+        }
+    }
+
+    private void processLookUpResult(List<String> redirectAddresses,  
AsyncResponse asyncResponse,
+                                     CompletableFuture<Boolean> future) {
+        // Current broker doesn't own the topic or any partition of the topic, 
redirect client to a broker
+        // that own partition of the topic or know who own partition of the 
topic.
+        if 
(!pulsar().getBrokerService().getOwningTopics().containsKey(topicName.getPartitionedTopicName()))
 {
+            if (redirectAddresses.isEmpty()) {
+                // No broker to redirect, means look up for some partitions 
failed,
+                // client should retry with other brokers.
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Can't find owner of given topic."));
+                future.complete(true);
+            } else {
+                // Redirect client to other broker owns the topic or know 
which broker own the topic.
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Redirect rest produce request for topic {} 
from {} to {}.",
+                                topicName, pulsar().getWebServiceAddress(), 
redirectAddresses.get(0));
+                    }
+                    URI redirectURI = new URI(String.format("%s%s", 
redirectAddresses.get(0), uri.getPath(false)));
+                    
asyncResponse.resume(Response.temporaryRedirect(redirectURI).build());
+                    future.complete(true);
+                } catch (URISyntaxException | NullPointerException e) {
+                    if (log.isDebugEnabled()) {
+                        log.error("Error in preparing redirect url with rest 
produce message request for topic  {}: {}",
+                                topicName, e.getMessage(), e);
+                    }
+                    asyncResponse.resume(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                            "Fail to redirect client request."));
+                    future.complete(true);
+                }
+            }
+        } else {
+            future.complete(false);
+        }
+    }
+
+    // Look up topic owner for non-partitioned topic or single topic partition.
+    private CompletableFuture<Void> lookUpBrokerForTopic(TopicName 
partitionedTopicName,
+                                                         boolean 
authoritative, List<String> redirectAddresses) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        if 
(!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Too many concurrent lookup request.");
+            }
+            future.completeExceptionally(new 
BrokerServiceException.TooManyRequestsException("Too many "
+                    + "concurrent lookup request"));
+            return future;
+        }
+        CompletableFuture<Optional<LookupResult>> lookupFuture = 
pulsar().getNamespaceService()
+                .getBrokerServiceUrlAsync(partitionedTopicName,
+                        
LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());
+
+        lookupFuture.thenAccept(optionalResult -> {
+            if (optionalResult == null || !optionalResult.isPresent()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Fail to lookup topic for rest produce message 
request for topic {}.",
+                            partitionedTopicName);
+                }
+                completeLookup(Pair.of(Collections.emptyList(), false), 
redirectAddresses, future);
+                return;
+            }
+
+            LookupResult result = optionalResult.get();
+            if 
(result.getLookupData().getHttpUrl().equals(pulsar().getWebServiceAddress())) {
+                // Current broker owns the topic, add to owning topic.
+                if (log.isDebugEnabled()) {
+                    log.debug("Complete topic look up for rest produce message 
request for topic {}, "
+                                    + "current broker is owner broker: {}",
+                            partitionedTopicName, result.getLookupData());
+                }
+                
pulsar().getBrokerService().getOwningTopics().computeIfAbsent(partitionedTopicName
+                                .getPartitionedTopicName(), (key) -> new 
ConcurrentOpenHashSet<Integer>())
+                        .add(partitionedTopicName.getPartitionIndex());
+                completeLookup(Pair.of(Collections.emptyList(), false), 
redirectAddresses, future);
+            } else {
+                // Current broker doesn't own the topic or doesn't know who 
own the topic.
+                if (log.isDebugEnabled()) {
+                    log.debug("Complete topic look up for rest produce message 
request for topic {}, "
+                                    + "current broker is not owner broker: {}",
+                            partitionedTopicName, result.getLookupData());
+                }
+                if (result.isRedirect()) {
+                    // Redirect lookup.
+                    
completeLookup(Pair.of(Arrays.asList(result.getLookupData().getHttpUrl(),
+                            result.getLookupData().getHttpUrlTls()), false), 
redirectAddresses, future);
+                } else {
+                    // Found owner for topic.
+                    
completeLookup(Pair.of(Arrays.asList(result.getLookupData().getHttpUrl(),
+                            result.getLookupData().getHttpUrlTls()), true), 
redirectAddresses, future);
+                }
+            }
+        }).exceptionally(exception -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Fail to lookup broker with rest produce message 
request for topic {}: {}",
+                        partitionedTopicName, exception.getMessage());
+            }
+            completeLookup(Pair.of(Collections.emptyList(), false), 
redirectAddresses, future);
+            return null;
+        });
+        return future;
+    }
+
+    private CompletableFuture<Pair<SchemaData, SchemaVersion>> 
addOrGetSchemaForTopic(SchemaData schemaData,
+                                                                               
       LongSchemaVersion schemaVersion) {
+        CompletableFuture<Pair<SchemaData, SchemaVersion>> future = new 
CompletableFuture<>();
+        // If schema version presents try to fetch existing schema.
+        if (null != schemaVersion) {
+            String id = 
TopicName.get(topicName.getPartitionedTopicName()).getSchemaName();
+            SchemaRegistry.SchemaAndMetadata schemaAndMetadata;
+            try {
+                schemaAndMetadata = 
pulsar().getSchemaRegistryService().getSchema(id, schemaVersion).get();
+                future.complete(Pair.of(schemaAndMetadata.schema, 
schemaAndMetadata.version));
+            } catch (Exception e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Fail to retrieve schema of version {} for topic 
{}: {}",
+                            schemaVersion.getVersion(), topicName, 
e.getMessage());
+                }
+                future.completeExceptionally(e);
+            }
+        } else if (null != schemaData) {
+            // Else try to add schema to topic.
+            SchemaVersion sv;
+            try {
+                sv = addSchema(schemaData).get();
+                future.complete(Pair.of(schemaData, sv));
+            } catch (Exception e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Fail to add schema {} for topic {}: {}",
+                            new String(schemaData.toSchemaInfo().getSchema()), 
topicName, e.getMessage());
+                }
+                future.completeExceptionally(e);
+            }
+        } else {
+            // Indicating exception.
+            future.complete(Pair.of(null, null));
+        }
+        return future;
+    }
+
+    // Add a new schema to schema registry for a topic
+    private CompletableFuture<SchemaVersion> addSchema(SchemaData schemaData) {
+        // Only need to add to first partition the broker owns since the 
schema id in schema registry are
+        // same for all partitions which is the partitionedTopicName
+        List<Integer> partitions = 
pulsar().getBrokerService().getOwningTopics()
+                .get(topicName.getPartitionedTopicName()).values();
+        CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
+        for (int index = 0; index < partitions.size(); index++) {
+            CompletableFuture<SchemaVersion> future = new 
CompletableFuture<>();
+            String topicPartitionName = 
topicName.getPartition(partitions.get(index)).toString();
+            pulsar().getBrokerService().getTopic(topicPartitionName, false)
+            .thenAccept(topic -> {
+                if (!topic.isPresent()) {
+                    future.completeExceptionally(new 
BrokerServiceException.TopicNotFoundException(
+                            "Topic " + topicPartitionName + " not found"));
+                } else {
+                    topic.get().addSchema(schemaData).thenAccept(schemaVersion 
-> future.complete(schemaVersion))
+                    .exceptionally(exception -> {
+                        future.completeExceptionally(exception);
+                        return null;
+                    });
+                }
+            });
+            try {
+                result.complete(future.get());
+                break;
+            } catch (Exception e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Fail to add schema to topic " + 
topicName.getPartitionedTopicName()
+                            + " for partition " + partitions.get(index) + " 
for REST produce request.");
+                }
+            }
+        }
+        // Not able to add schema to any partition
+        if (!result.isDone()) {
+            result.completeExceptionally(new SchemaException("Unable to add 
schema " + schemaData
+                    + " to topic " + topicName.getPartitionedTopicName()));
+        }
+        return result;
+    }
+
+    // Build schemaData from passed in schema string.
+    private SchemaData getSchemaData(String keySchema, String valueSchema) {
+        try {
+            SchemaInfoImpl valueSchemaInfo = (valueSchema == null || 
valueSchema.isEmpty())
+                    ? (SchemaInfoImpl) StringSchema.utf8().getSchemaInfo() :
+                    ObjectMapperFactory.getThreadLocal()
+                            .readValue(valueSchema, SchemaInfoImpl.class);
+            if (null == valueSchemaInfo.getName()) {
+                valueSchemaInfo.setName(valueSchemaInfo.getType().toString());
+            }
+            // Value schema only
+            if (keySchema == null || keySchema.isEmpty()) {
+                return SchemaData.builder()
+                        .data(valueSchemaInfo.getSchema())
+                        .isDeleted(false)
+                        .user("Rest Producer")
+                        .timestamp(System.currentTimeMillis())
+                        .type(valueSchemaInfo.getType())
+                        .props(valueSchemaInfo.getProperties())
+                        .build();
+            } else {
+                // Key_Value schema
+                SchemaInfoImpl keySchemaInfo = 
ObjectMapperFactory.getThreadLocal()
+                        .readValue(keySchema, SchemaInfoImpl.class);
+                if (null == keySchemaInfo.getName()) {
+                    keySchemaInfo.setName(keySchemaInfo.getType().toString());
+                }
+                SchemaInfo schemaInfo = 
KeyValueSchemaInfo.encodeKeyValueSchemaInfo("KVSchema-"
+                                + topicName.getPartitionedTopicName(),
+                        keySchemaInfo, valueSchemaInfo,
+                        KeyValueEncodingType.SEPARATED);
+                return SchemaData.builder()
+                        .data(schemaInfo.getSchema())
+                        .isDeleted(false)
+                        .user("Rest Producer")
+                        .timestamp(System.currentTimeMillis())
+                        .type(schemaInfo.getType())
+                        .props(schemaInfo.getProperties())
+                        .build();
+            }
+        } catch (IOException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Fail to parse schema info for rest produce request 
with key schema {} and value schema {}"
+                        , keySchema, valueSchema);
+            }
+            return null;
+        }
+    }
+
+    // Convert message to ByteBuf
+    public ByteBuf messageToByteBuf(Message message) {
+        checkArgument(message instanceof MessageImpl, "Message must be type of 
MessageImpl.");
+
+        MessageImpl msg = (MessageImpl) message;
+        MessageMetadata messageMetadata = msg.getMessageBuilder();
+        ByteBuf payload = msg.getDataBuffer();
+        
messageMetadata.setCompression(CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE));
+        messageMetadata.setUncompressedSize(payload.readableBytes());
+
+        return 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, 
messageMetadata, payload);
+    }
+
+    // Build pulsar message from REST request.
+    private List<Message> buildMessage(ProducerMessages producerMessages, 
Schema schema,
+                                       String producerName, TopicName 
topicName) {
+        List<ProducerMessage> messages;
+        List<Message> pulsarMessages = new ArrayList<>();
+
+        messages = producerMessages.getMessages();
+        for (ProducerMessage message : messages) {
+            MessageMetadata messageMetadata = new MessageMetadata();
+            messageMetadata.setProducerName(producerName);
+            messageMetadata.setPublishTime(System.currentTimeMillis());
+            messageMetadata.setSequenceId(message.getSequenceId());
+            if (null != message.getReplicationClusters()) {
+                
messageMetadata.addAllReplicateTos(message.getReplicationClusters());
+            }
+
+            if (null != message.getProperties()) {
+                
messageMetadata.addAllProperties(message.getProperties().entrySet().stream().map(entry
 -> {
+                    org.apache.pulsar.common.api.proto.KeyValue keyValue =
+                            new org.apache.pulsar.common.api.proto.KeyValue();
+                    keyValue.setKey(entry.getKey());
+                    keyValue.setValue(entry.getValue());
+                    return keyValue;
+                }).collect(Collectors.toList()));
+            }
+            if (null != message.getKey()) {
+                // If has key schema, encode partition key, else use plain 
text.
+                if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+                    KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
+                    messageMetadata.setPartitionKey(
+                            
Base64.getEncoder().encodeToString(encodeWithSchema(message.getKey(),
+                                    kvSchema.getKeySchema())));
+                    messageMetadata.setPartitionKeyB64Encoded(true);
+                } else {
+                    messageMetadata.setPartitionKey(message.getKey());
+                    messageMetadata.setPartitionKeyB64Encoded(false);
+                }
+            }
+            if (null != message.getEventTime() && 
!message.getEventTime().isEmpty()) {
+                
messageMetadata.setEventTime(Long.valueOf(message.getEventTime()));
+            }
+            if (message.isDisableReplication()) {
+                messageMetadata.clearReplicateTo();
+                messageMetadata.addReplicateTo("__local__");
+            }
+            if (message.getDeliverAt() != 0 && messageMetadata.hasEventTime()) 
{
+                messageMetadata.setDeliverAtTime(message.getDeliverAt());
+            } else if (message.getDeliverAfterMs() != 0) {
+                
messageMetadata.setDeliverAtTime(messageMetadata.getEventTime() + 
message.getDeliverAfterMs());
+            }
+            if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+                KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
+                pulsarMessages.add(MessageImpl.create(messageMetadata,
+                        ByteBuffer.wrap(encodeWithSchema(message.getPayload(), 
kvSchema.getValueSchema())),
+                        schema, topicName.toString()));
+            } else {
+                pulsarMessages.add(MessageImpl.create(messageMetadata,
+                        ByteBuffer.wrap(encodeWithSchema(message.getPayload(), 
schema)), schema,
+                        topicName.toString()));
+            }
+        }
+
+        return pulsarMessages;
+    }
+
+    // Encode message with corresponding schema, do necessary conversion 
before encoding
+    private byte[] encodeWithSchema(String input, Schema schema) {
+        try {
+            switch (schema.getSchemaInfo().getType()) {
+                case INT8:
+                    return schema.encode(Byte.parseByte(input));
+                case INT16:
+                    return schema.encode(Short.parseShort(input));
+                case INT32:
+                    return schema.encode(Integer.parseInt(input));
+                case INT64:
+                    return schema.encode(Long.parseLong(input));
+                case STRING:
+                    return schema.encode(input);
+                case FLOAT:
+                    return schema.encode(Float.parseFloat(input));
+                case DOUBLE:
+                    return schema.encode(Double.parseDouble(input));
+                case BOOLEAN:
+                    return schema.encode(Boolean.parseBoolean(input));
+                case BYTES:
+                    return schema.encode(input.getBytes());
+                case DATE:
+                    return 
schema.encode(DateFormat.getDateInstance().parse(input));
+                case TIME:
+                    return schema.encode(new Time(Long.parseLong(input)));
+                case TIMESTAMP:
+                    return schema.encode(new Timestamp(Long.parseLong(input)));
+                case INSTANT:
+                    return schema.encode(Instant.parse(input));
+                case LOCAL_DATE:
+                    return schema.encode(LocalDate.parse(input));
+                case LOCAL_TIME:
+                    return schema.encode(LocalTime.parse(input));
+                case LOCAL_DATE_TIME:
+                    return schema.encode(LocalDateTime.parse(input));
+                case JSON:
+                    GenericJsonWriter jsonWriter = new GenericJsonWriter();
+                    return jsonWriter.write(new GenericJsonRecord(null, null,
+                          
ObjectMapperFactory.getThreadLocal().readTree(input), schema.getSchemaInfo()));
+                case AVRO:
+                    AvroBaseStructSchema avroSchema = ((AvroBaseStructSchema) 
schema);
+                    Decoder decoder = 
DecoderFactory.get().jsonDecoder(avroSchema.getAvroSchema(), input);
+                    DatumReader<GenericData.Record> reader = new 
GenericDatumReader(avroSchema.getAvroSchema());
+                    GenericRecord genericRecord = reader.read(null, decoder);
+                    GenericAvroWriter avroWriter = new 
GenericAvroWriter(avroSchema.getAvroSchema());
+                    return avroWriter.write(new GenericAvroRecord(null,
+                            avroSchema.getAvroSchema(), null, genericRecord));
+                case PROTOBUF_NATIVE:
+                case KEY_VALUE:
+                default:
+                    throw new 
PulsarClientException.InvalidMessageException("");
+            }
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Fail to encode value {} with schema {} for rest 
produce request", input,
+                        new String(schema.getSchemaInfo().getSchema()));
+            }
+            return new byte[0];
+        }
+    }
+
+    // Release lookup semaphore and add result to redirectAddresses if current 
broker doesn't own the topic.
+    private synchronized void completeLookup(Pair<List<String>, Boolean> 
result, List<String> redirectAddresses,
+                                              CompletableFuture<Void> future) {
+        pulsar().getBrokerService().getLookupRequestSemaphore().release();
+        // Left is lookup result of secure/insecure address if lookup succeed, 
Right is address is the owner's address
+        // or it's a address to redirect lookup.
+        if (!result.getLeft().isEmpty()) {
+            if (result.getRight()) {
+                // If address is for owner of topic partition, add to head and 
it'll have higher priority
+                // compare to broker for look redirect.
+                redirectAddresses.add(0, isRequestHttps() ? 
result.getLeft().get(1) : result.getLeft().get(0));
+            } else {
+                redirectAddresses.add(redirectAddresses.size(), 
isRequestHttps()
+                        ? result.getLeft().get(1) : result.getLeft().get(0));
+            }
+        }
+        future.complete(null);
+    }
+
+    public void validateProducePermission() throws Exception {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+                && pulsar().getBrokerService().isAuthorizationEnabled()) {
+            if (!isClientAuthenticated(clientAppId())) {
+                throw new RestException(Status.UNAUTHORIZED, "Need to 
authenticate to perform the request");
+            }
+
+            boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
+                    .canProduce(topicName, originalPrincipal(), 
clientAuthData());
+            if (!isAuthorized) {
+                throw new RestException(Status.UNAUTHORIZED, 
String.format("Unauthorized to produce to topic %s"
+                                        + " with clientAppId [%s] and authdata 
%s", topicName.toString(),
+                        clientAppId(), clientAuthData()));
+            }
+        }
+    }
+
+}
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
 b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/package-info.java
similarity index 57%
copy from 
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/package-info.java
index e74ebb1..bafd56b 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/package-info.java
@@ -16,31 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.websocket.data;
-
-import java.util.List;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/**
- * Class represent messages to be published.
- */
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class ProducerMessages {
-    // Version of schema used for messages.
-    private long schemaVersion;
-
-    // Base64 encoded serialized schema for key
-    private String keySchema;
-
-    // Base64 encoded serialized schema for value
-    private String valueSchema;
-
-    private String producerName;
-
-    private List<ProducerMessage> messages;
-}
+package org.apache.pulsar.broker.rest;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f7cc662..58b3f43 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -191,6 +191,9 @@ public class BrokerService implements Closeable {
     // Namespace --> Bundle --> topicName --> topic
     private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, 
ConcurrentOpenHashMap<String, Topic>>>
             multiLayerTopicsMap;
+    // Keep track of topics and partitions served by this broker for fast 
lookup.
+    @Getter
+    private final ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<Integer>> owningTopics;
     private int numberOfNamespaceBundles = 0;
 
     private final EventLoopGroup acceptorGroup;
@@ -271,6 +274,7 @@ public class BrokerService implements Closeable {
         this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
 
         this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
+        this.owningTopics = new ConcurrentOpenHashMap<>();
         this.pulsarStats = new PulsarStats(pulsar);
         this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index 624108f..5cf36ee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.client.api.Range;
  * 2.The whole range of hash value could be covered by all the consumers.
  * 3.Once a consumer is removed, the left consumers could still serve the 
whole range.
  *
- * Initializing with a fixed hash range, by default 2 << 5.
+ * Initializing with a fixed hash range, by default 2 << 15.
  * First consumer added, hash range looks like:
  *
  * 0 -> 65536(consumer-1)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ccd5a50..f0324ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1152,12 +1152,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                                 return null;
                             } else {
-                                // There was an early request to create a 
producer with
-                                // same producerId. This can happen when
-                                // client
-                                // timeout is lower the broker timeouts. We 
need to wait
-                                // until the previous producer creation
-                                // request
+                                // There was an early request to create a 
producer with same producerId.
+                                // This can happen when client timeout is 
lower than the broker timeouts.
+                                // We need to wait until the previous producer 
creation request
                                 // either complete or fails.
                                 ServerError error = null;
                                 if (!existingProducerFuture.isDone()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
new file mode 100644
index 0000000..3f57806
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -0,0 +1,831 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Sets;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.admin.v2.PersistentTopics;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.rest.Topics;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerAcks;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessages;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+@PrepareForTest(PersistentTopics.class)
+public class TopicsTest extends MockedPulsarServiceBaseTest {
+
+    private Topics topics;
+    private final String testLocalCluster = "test";
+    private final String testTenant = "my-tenant";
+    private final String testNamespace = "my-namespace";
+    private final String testTopicName = "my-topic";
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        super.internalSetup();
+        topics = spy(new Topics());
+        topics.setPulsar(pulsar);
+        doReturn(TopicDomain.persistent.value()).when(topics).domain();
+        doReturn("test-app").when(topics).clientAppId();
+        
doReturn(mock(AuthenticationDataHttps.class)).when(topics).clientAuthData();
+        admin.clusters().createCluster(testLocalCluster, new 
ClusterDataImpl());
+        admin.tenants().createTenant(testTenant, new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"), 
Sets.newHashSet(testLocalCluster)));
+        admin.namespaces().createNamespace(testTenant + "/" + testNamespace,
+                Sets.newHashSet(testLocalCluster));
+    }
+
+    @Override
+    @AfterMethod
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testProduceToNonPartitionedTopic() throws Exception {
+        admin.topics().createNonPartitionedTopic("persistent://" + testTenant 
+ "/"
+                + testNamespace + "/" + testTopicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        Schema<String> schema = StringSchema.utf8();
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(schema.getSchemaInfo()));
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(schema.getSchemaInfo()));
+        String message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 3);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), -1);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+    }
+
+    @Test
+    public void testProduceToPartitionedTopic() throws Exception {
+        admin.topics().createPartitionedTopic("persistent://" + testTenant + 
"/" + testNamespace
+                + "/" + testTopicName + "-p", 5);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        Schema<String> schema = StringSchema.utf8();
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(schema.getSchemaInfo()));
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(schema.getSchemaInfo()));
+        String message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":6},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":7},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":8},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":9},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":10}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName + "-p", false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 10);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        int[] messagePerPartition = new int[5];
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
messagePerPartition[Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2])]++;
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+        for (int index = 0; index < messagePerPartition.length; index++) {
+            // We publish to each partition in round robin mode so each 
partition should get at most 2 message.
+            Assert.assertTrue(messagePerPartition[index] <= 2);
+        }
+    }
+
+    @Test
+    public void testProduceToPartitionedTopicSpecificPartition() throws 
Exception {
+        admin.topics().createPartitionedTopic("persistent://" + testTenant + 
"/"
+                + testNamespace + "/" + testTopicName, 5);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        Schema<String> schema = StringSchema.utf8();
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(schema.getSchemaInfo()));
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(schema.getSchemaInfo()));
+        String message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopicPartition(asyncResponse, testTenant, 
testNamespace, testTopicName, 2,false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 4);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), 2);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+    }
+
+    @Test
+    public void testProduceFailed() throws Exception {
+        admin.topics().createNonPartitionedTopic("persistent://" + testTenant 
+ "/" + testNamespace + "/" + testTopicName);
+        pulsar.getBrokerService().getTopic("persistent://" + testTenant + "/" 
+ testNamespace
+                + "/" + testTopicName, false).thenAccept(topic -> {
+            try {
+                PersistentTopic mockPersistentTopic = spy((PersistentTopic) 
topic.get());
+                AtomicInteger count = new AtomicInteger();
+                doAnswer(new Answer() {
+                    @Override
+                    public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                        Topic.PublishContext publishContext = 
invocationOnMock.getArgument(1);
+                        if (count.getAndIncrement() < 2) {
+                            publishContext.completed(null, -1, -1);
+                        } else {
+                            publishContext.completed(new 
BrokerServiceException.TopicFencedException("Fake exception"),
+                                    -1, -1);
+                        }
+                        return null;
+                    }
+                }).when(mockPersistentTopic).publishMessage(any(), any());
+                BrokerService mockBrokerService = 
spy(pulsar.getBrokerService());
+                
doReturn(CompletableFuture.completedFuture(Optional.of(mockPersistentTopic)))
+                        .when(mockBrokerService).getTopic(anyString(), 
anyBoolean());
+                doReturn(mockBrokerService).when(pulsar).getBrokerService();
+                AsyncResponse asyncResponse = mock(AsyncResponse.class);
+                Schema<String> schema = StringSchema.utf8();
+                ProducerMessages producerMessages = new ProducerMessages();
+                
producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+                        writeValueAsString(schema.getSchemaInfo()));
+                
producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                        writeValueAsString(schema.getSchemaInfo()));
+                String message = "[" +
+                        
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                        
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                        
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
 +
+                        
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}]";
+                
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                        new TypeReference<List<ProducerMessage>>() {}));
+                // Previous request should trigger namespace bundle loading, 
retry produce.
+                topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+                ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+                verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+                Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+                Object responseEntity = responseCaptor.getValue().getEntity();
+                Assert.assertTrue(responseEntity instanceof ProducerAcks);
+                ProducerAcks response = (ProducerAcks) responseEntity;
+                
Assert.assertEquals(response.getMessagePublishResults().size(), 4);
+                int errorResponse = 0;
+                for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+                    int errorCode = 
response.getMessagePublishResults().get(index).getErrorCode();
+                    if (0 == errorCode) {
+                        
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                                .getMessageId().split(":")[2]), -1);
+                        
Assert.assertTrue(response.getMessagePublishResults().get(index)
+                                .getMessageId().length() > 0);
+                    } else {
+                        errorResponse++;
+                        Assert.assertEquals(errorCode, 2);
+                        
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorMsg(),
+                                
"org.apache.pulsar.broker.service.BrokerServiceException$"
+                                         + "TopicFencedException: Fake 
exception");
+                    }
+                }
+                // Add entry start to fail after 2nd operation, we published 4 
msg so expecting 2 error response.
+                Assert.assertTrue(errorResponse == 2);
+            } catch (Throwable e) {
+                Assert.fail(e.getMessage());
+            }
+        }).get();
+    }
+
+    @Test
+    public void testLookUpWithRedirect() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        String requestPath = 
"/admin/v3/topics/my-tenant/my-namespace/my-topic";
+        //create topic on one broker
+        admin.topics().createNonPartitionedTopic(topicName);
+        PulsarService pulsar2 = startBroker(getDefaultConf());
+        doReturn(false).when(topics).isRequestHttps();
+        UriInfo uriInfo = mock(UriInfo.class);
+        doReturn(requestPath).when(uriInfo).getPath(anyBoolean());
+        Whitebox.setInternalState(topics, "uri", uriInfo);
+        //do produce on another broker
+        topics.setPulsar(pulsar2);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(Schema.INT64.getSchemaInfo()));
+        String message = "[]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        // Verify got redirect response
+        Assert.assertEquals(responseCaptor.getValue().getStatusInfo(), 
Response.Status.TEMPORARY_REDIRECT);
+        // Verify URI point to address of broker the topic was created on
+        Assert.assertEquals(responseCaptor.getValue().getLocation().toString(),
+                pulsar.getWebServiceAddress() + requestPath);
+    }
+    
+    @Test
+    public void testLookUpWithException() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        NamespaceService nameSpaceService = mock(NamespaceService.class);
+        CompletableFuture future = new CompletableFuture();
+        future.completeExceptionally(new BrokerServiceException("Fake 
Exception"));
+        CompletableFuture existFuture = new CompletableFuture();
+        existFuture.complete(true);
+        
doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any());
+        doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+        doReturn(nameSpaceService).when(pulsar).getNamespaceService();
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(Schema.INT64.getSchemaInfo()));
+        String message = "[]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        ArgumentCaptor<RestException> responseCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getMessage(), "Can't 
find owner of given topic.");
+    }
+
+    @Test
+    public void testLookUpTopicNotExist() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        NamespaceService nameSpaceService = mock(NamespaceService.class);
+        CompletableFuture existFuture = new CompletableFuture();
+        existFuture.complete(false);
+        doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+        doReturn(nameSpaceService).when(pulsar).getNamespaceService();
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(Schema.INT64.getSchemaInfo()));
+        String message = "[]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        ArgumentCaptor<RestException> responseCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getMessage(), "Fail to 
publish message: Topic not exist");
+    }
+
+    @Test
+    public void testProduceWithLongSchema() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        Consumer consumer = pulsarClient.newConsumer(Schema.INT64)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(Schema.INT64.getSchemaInfo()));
+        String message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"111111111111\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"222222222222\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"333333333333\",\"eventTime\":1603045262772,\"sequenceId\":3},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"444444444444\",\"eventTime\":1603045262772,\"sequenceId\":4},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"555555555555\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 5);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), -1);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+
+        List<Long> expectedMsg = Arrays.asList(111111111111l, 222222222222l, 
333333333333l, 444444444444l, 555555555555l);
+        Message<Long> msg = null;
+        // Assert all messages published by REST producer can be received by 
consumer in expected order.
+        for (int i = 0; i < 5; i++) {
+            msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertEquals(expectedMsg.get(i), 
Schema.INT64.decode(msg.getData()));
+            Assert.assertEquals("my-key", msg.getKey());
+        }
+    }
+
+    // Default schema is String schema
+    @Test
+    public void testProduceNoSchema() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        Consumer consumer = pulsarClient.newConsumer(StringSchema.utf8())
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        ProducerMessages producerMessages = new ProducerMessages();
+        String message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 5);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), -1);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+
+        List<String> expectedMsg = Arrays.asList("RestProducer:1", 
"RestProducer:2", "RestProducer:3", "RestProducer:4",
+                "RestProducer:5");
+        Message<String> msg = null;
+        // Assert all messages published by REST producer can be received by 
consumer in expected order.
+        for (int i = 0; i < 5; i++) {
+            msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertEquals(expectedMsg.get(i), 
StringSchema.utf8().decode(msg.getData()));
+            Assert.assertEquals("my-key", msg.getKey());
+        }
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class Seller {
+        public String state;
+        public String street;
+        public long zipCode;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class PC {
+        public String brand;
+        public String model;
+        public int year;
+        public GPU gpu;
+        public Seller seller;
+    }
+
+    private enum GPU {
+        AMD, NVIDIA
+    }
+
+    @Test
+    public void testProduceWithJsonSchema() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        GenericSchema jsonSchema = 
GenericJsonSchema.of(JSONSchema.of(SchemaDefinition.builder()
+                .withPojo(PC.class).build()).getSchemaInfo());
+        PC pc  = new PC("dell", "alienware", 2021, GPU.AMD,
+                new Seller("WA", "main street", 98004));
+        PC anotherPc  = new PC("asus", "rog", 2020, GPU.NVIDIA,
+                new Seller("CA", "back street", 90232));
+        Consumer consumer = pulsarClient.newConsumer(jsonSchema)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(jsonSchema.getSchemaInfo()));
+        String message = "[" +
+                "{\"key\":\"my-key\",\"payload\":\""
+                + 
ObjectMapperFactory.getThreadLocal().writeValueAsString(pc).replace("\"", 
"\\\"")
+                + "\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+                + "{\"key\":\"my-key\",\"payload\":\""
+                + 
ObjectMapperFactory.getThreadLocal().writeValueAsString(anotherPc).replace("\"",
 "\\\"")
+                + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace,
+                testTopicName, false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 2);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), -1);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+
+        List<PC> expected = Arrays.asList(pc, anotherPc);
+        Message<String> msg = null;
+        // Assert all messages published by REST producer can be received by 
consumer in expected order.
+        for (int i = 0; i < 2; i++) {
+            msg = consumer.receive(2, TimeUnit.SECONDS);
+            PC msgPc = ObjectMapperFactory.getThreadLocal().
+                    
treeToValue(((GenericJsonRecord)jsonSchema.decode(msg.getData())).getJsonNode(),
 PC.class);
+            Assert.assertEquals(msgPc.brand, expected.get(i).brand);
+            Assert.assertEquals(msgPc.model, expected.get(i).model);
+            Assert.assertEquals(msgPc.year, expected.get(i).year);
+            Assert.assertEquals(msgPc.gpu, expected.get(i).gpu);
+            Assert.assertEquals(msgPc.seller.state, 
expected.get(i).seller.state);
+            Assert.assertEquals(msgPc.seller.street, 
expected.get(i).seller.street);
+            Assert.assertEquals(msgPc.seller.zipCode, 
expected.get(i).seller.zipCode);
+            Assert.assertEquals("my-key", msg.getKey());
+        }
+    }
+
+    @Test
+    public void testProduceWithAvroSchema() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        GenericSchemaImpl avroSchema = 
GenericAvroSchema.of(AvroSchema.of(SchemaDefinition.builder()
+                .withPojo(PC.class).build()).getSchemaInfo());
+        PC pc  = new PC("dell", "alienware", 2021, GPU.AMD,
+                new Seller("WA", "main street", 98004));
+        PC anotherPc  = new PC("asus", "rog", 2020, GPU.NVIDIA,
+                new Seller("CA", "back street", 90232));
+        Consumer consumer = pulsarClient.newConsumer(avroSchema)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(avroSchema.getSchemaInfo()));
+
+        ReflectDatumWriter<PC> datumWriter = new 
ReflectDatumWriter(avroSchema.getAvroSchema());
+        ByteArrayOutputStream outputStream1 = new ByteArrayOutputStream();
+        ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream();
+
+        JsonEncoder encoder1 = 
EncoderFactory.get().jsonEncoder(avroSchema.getAvroSchema(), outputStream1);
+        JsonEncoder encoder2 = 
EncoderFactory.get().jsonEncoder(avroSchema.getAvroSchema(), outputStream2);
+
+        datumWriter.write(pc, encoder1);
+        encoder1.flush();
+        datumWriter.write(anotherPc, encoder2);
+        encoder2.flush();
+
+        String message = "[" +
+                "{\"key\":\"my-key\",\"payload\":\""
+                + outputStream1.toString().replace("\"", "\\\"")
+                + "\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+                + "{\"key\":\"my-key\",\"payload\":\""
+                + outputStream2.toString().replace("\"", "\\\"")
+                + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace,
+                testTopicName, false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 2);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), -1);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+
+        List<PC> expected = Arrays.asList(pc, anotherPc);
+        Message<String> msg = null;
+        // Assert all messages published by REST producer can be received by 
consumer in expected order.
+        for (int i = 0; i < 2; i++) {
+            msg = consumer.receive(2, TimeUnit.SECONDS);
+            GenericAvroRecord avroRecord = (GenericAvroRecord) 
avroSchema.decode(msg.getData());
+            
Assert.assertEquals(((Utf8)avroRecord.getAvroRecord().get("brand")).toString(), 
expected.get(i).brand);
+            
Assert.assertEquals(((Utf8)avroRecord.getAvroRecord().get("model")).toString(), 
expected.get(i).model);
+            Assert.assertEquals((int)avroRecord.getAvroRecord().get("year"), 
expected.get(i).year);
+            
Assert.assertEquals(((GenericData.EnumSymbol)avroRecord.getAvroRecord().get("gpu")).toString(),
 expected.get(i).gpu.toString());
+            
Assert.assertEquals(((Utf8)((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("state")).toString(),
 expected.get(i).seller.state);
+            
Assert.assertEquals(((Utf8)((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("street")).toString(),
 expected.get(i).seller.street);
+            
Assert.assertEquals(((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("zipCode"),
 expected.get(i).seller.zipCode);
+            Assert.assertEquals("my-key", msg.getKey());
+        }
+    }
+
+    @Test
+    public void testProduceWithRestAndClientThenConsumeWithClient() throws 
Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        Schema keyValueSchema = KeyValueSchemaImpl.of(StringSchema.utf8(), 
StringSchema.utf8(),
+                                                
KeyValueEncodingType.SEPARATED);
+        Producer producer = pulsarClient.newProducer(keyValueSchema)
+                                        .topic(topicName)
+                                        .create();
+        Consumer consumer = pulsarClient.newConsumer(keyValueSchema)
+                                        .topic(topicName)
+                                        .subscriptionName("my-sub")
+                                        
.subscriptionType(SubscriptionType.Exclusive)
+                                        
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                                        .subscribe();
+        for (int i = 0; i < 3; i++) {
+            producer.newMessage(keyValueSchema)
+                    .value(new KeyValue<>("my-key", "ClientProducer:" + i))
+                    .send();
+        }
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+        String message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 3);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), -1);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+
+        List<String> expectedMsg = Arrays.asList("ClientProducer:0", 
"ClientProducer:1", "ClientProducer:2",
+                                                 "RestProducer:1", 
"RestProducer:2", "RestProducer:3");
+        Message<String> msg = null;
+        // Assert both messages published by client producer and REST producer 
can be received
+        // by consumer in expected order.
+        for (int i = 0; i < 6; i++) {
+            msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertEquals(expectedMsg.get(i), 
StringSchema.utf8().decode(msg.getData()));
+            Assert.assertEquals("bXkta2V5", msg.getKey());
+        }
+    }
+
+    @Test
+    public void testProduceWithRestThenConsumeWithClient() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        Schema keyValueSchema = KeyValueSchemaImpl.of(StringSchema.utf8(), 
StringSchema.utf8(),
+                KeyValueEncodingType.SEPARATED);
+        Consumer consumer = pulsarClient.newConsumer(keyValueSchema)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+        String message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName,
+                false, producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        Object responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        ProducerAcks response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 5);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), -1);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+        // Specify schema version to use existing schema.
+        producerMessages = new ProducerMessages();
+        producerMessages.setSchemaVersion(response.getSchemaVersion());
+        message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":3},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":4},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+        responseEntity = responseCaptor.getValue().getEntity();
+        Assert.assertTrue(responseEntity instanceof ProducerAcks);
+        response = (ProducerAcks) responseEntity;
+        Assert.assertEquals(response.getMessagePublishResults().size(), 5);
+        Assert.assertEquals(response.getSchemaVersion(), 0);
+        for (int index = 0; index < 
response.getMessagePublishResults().size(); index++) {
+            
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+                    .getMessageId().split(":")[2]), -1);
+            
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
 0);
+            
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
 > 0);
+        }
+
+        List<String> expectedMsg = Arrays.asList("RestProducer:1", 
"RestProducer:2", "RestProducer:3",
+                "RestProducer:4", "RestProducer:5", "RestProducer:6",
+                "RestProducer:7", "RestProducer:8", "RestProducer:9",
+                "RestProducer:10");
+        Message<String> msg = null;
+        // Assert all messages published by REST producer can be received by 
consumer in expected order.
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertEquals(expectedMsg.get(i), 
StringSchema.utf8().decode(msg.getData()));
+            Assert.assertEquals("bXkta2V5", msg.getKey());
+        }
+    }
+
+    @Test
+    public void testProduceWithInCompatibleSchema() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        Producer producer = pulsarClient.newProducer(StringSchema.utf8())
+                .topic(topicName)
+                .create();
+
+        for (int i = 0; i < 3; i++) {
+            producer.send("message");
+        }
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+        producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+                writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+        String message = "[" +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
 +
+                
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
+        
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+                new TypeReference<List<ProducerMessage>>() {}));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
+        ArgumentCaptor<RestException> responseCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        
Assert.assertTrue(responseCaptor.getValue().getMessage().startsWith("Fail to 
publish message:"
+                + "java.util.concurrent.ExecutionException: 
org.apache.pulsar.broker.service.schema.exceptions."
+                + "SchemaException: Unable to add schema 
SchemaData(type=KEY_VALUE, isDeleted=false, "
+                + "timestamp="));
+        Assert.assertTrue(responseCaptor.getValue().getMessage().endsWith(
+                "user=Rest Producer, data=[0, 0, 0, 0, 0, 0, 0, 0], "
+                + "props={key.schema.properties={\"__charset\":\"UTF-8\"}, 
value.schema.properties={\"__charset\":"
+                + "\"UTF-8\"}, value.schema.type=STRING, 
key.schema.name=String, value.schema.name=String, "
+                + "kv.encoding.type=SEPARATED, key.schema.type=STRING}) to 
topic persistent:"
+                + "//my-tenant/my-namespace/my-topic"));
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
index 040842b..4b3a158 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
@@ -45,7 +45,7 @@ public class GenericJsonRecord extends VersionedGenericRecord 
{
         this(schemaVersion, fields, jn, null);
     }
 
-    GenericJsonRecord(byte[] schemaVersion,
+    public GenericJsonRecord(byte[] schemaVersion,
                       List<Field> fields,
                       JsonNode jn, SchemaInfo schemaInfo) {
         super(schemaVersion, fields);
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
index e74ebb1..821d5d3 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
@@ -32,7 +32,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor
 public class ProducerMessages {
     // Version of schema used for messages.
-    private long schemaVersion;
+    private long schemaVersion = -1;
 
     // Base64 encoded serialized schema for key
     private String keySchema;

Reply via email to