This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b2678be [REST] Rest API Produce message. (#8125)
b2678be is described below
commit b2678be0a97580d69da0b543a499efb3d9adbd5e
Author: Marvin Cai <[email protected]>
AuthorDate: Wed Sep 29 06:56:03 2021 -0700
[REST] Rest API Produce message. (#8125)
### Motivation
PIP 64:
https://github.com/apache/pulsar/wiki/PIP-64%3A-Introduce-REST-endpoints-for-producing%2C-consuming-and-reading-messages
Tested with Postman
### Modifications
Add produce message rest api.
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +
.../broker/rest/RestMessagePublishContext.java | 89 +++
.../java/org/apache/pulsar/broker/rest/Topics.java | 160 ++++
.../org/apache/pulsar/broker/rest/TopicsBase.java | 768 +++++++++++++++++++
.../apache/pulsar/broker/rest/package-info.java | 29 +-
.../pulsar/broker/service/BrokerService.java | 4 +
...ashRangeAutoSplitStickyKeyConsumerSelector.java | 2 +-
.../apache/pulsar/broker/service/ServerCnx.java | 9 +-
.../org/apache/pulsar/broker/admin/TopicsTest.java | 831 +++++++++++++++++++++
.../impl/schema/generic/GenericJsonRecord.java | 2 +-
.../pulsar/websocket/data/ProducerMessages.java | 2 +-
11 files changed, 1861 insertions(+), 37 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5cd6813..86b9d94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -803,6 +803,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
"org.apache.pulsar.broker.admin.v3", true, attributeMap);
webService.addRestResources("/lookup",
"org.apache.pulsar.broker.lookup", true, attributeMap);
+ webService.addRestResources("/topics",
+ "org.apache.pulsar.broker.rest", true,
attributeMap);
// Add metrics servlet
webService.addServlet("/metrics",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
new file mode 100644
index 0000000..9891572
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.rest;
+
+import io.netty.util.Recycler;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.Topic;
+
+/**
+ * PublishContext implementation for REST message publishing.
+ */
+@Slf4j
+public class RestMessagePublishContext implements Topic.PublishContext {
+
+ private Topic topic;
+ private long startTimeNs;
+ private CompletableFuture<PositionImpl> positionFuture;
+
+ /**
+ * Executed from managed ledger thread when the message is persisted.
+ */
+ @Override
+ public void completed(Exception exception, long ledgerId, long entryId) {
+ if (exception != null) {
+ positionFuture.completeExceptionally(exception);
+ if (log.isInfoEnabled()) {
+ log.info("Failed to write entry for rest produce request:
ledgerId: {}, entryId: {}. "
+ + "triggered send callback.",
+ ledgerId, entryId);
+ }
+ } else {
+ if (log.isInfoEnabled()) {
+ log.info("Success write topic for rest produce request: {},
ledgerId: {}, entryId: {}. "
+ + "triggered send callback.",
+ topic.getName(), ledgerId, entryId);
+ }
+ topic.recordAddLatency(System.nanoTime() - startTimeNs,
TimeUnit.MICROSECONDS);
+ positionFuture.complete(PositionImpl.get(ledgerId, entryId));
+ }
+ recycle();
+ }
+
+ // recycler
+ public static RestMessagePublishContext
get(CompletableFuture<PositionImpl> positionFuture, Topic topic,
+ long startTimeNs) {
+ RestMessagePublishContext callback = RECYCLER.get();
+ callback.positionFuture = positionFuture;
+ callback.topic = topic;
+ callback.startTimeNs = startTimeNs;
+ return callback;
+ }
+
+ private final Recycler.Handle<RestMessagePublishContext> recyclerHandle;
+
+ private
RestMessagePublishContext(Recycler.Handle<RestMessagePublishContext>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<RestMessagePublishContext> RECYCLER = new
Recycler<RestMessagePublishContext>() {
+ protected RestMessagePublishContext
newObject(Handle<RestMessagePublishContext> handle) {
+ return new RestMessagePublishContext(handle);
+ }
+ };
+
+ public void recycle() {
+ topic = null;
+ startTimeNs = -1;
+ recyclerHandle.recycle(this);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
new file mode 100644
index 0000000..a8095f0
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.rest;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import org.apache.pulsar.websocket.data.ProducerMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/")
+@Consumes(MediaType.APPLICATION_JSON)
+@Produces(MediaType.APPLICATION_JSON)
+@Api(value = "/persistent", description = "Apis for produce,consume and ack
message on topics.", tags = "topics")
+public class Topics extends TopicsBase {
+ private static final Logger log = LoggerFactory.getLogger(Topics.class);
+
+ @POST
+ @Path("/persistent/{tenant}/{namespace}/{topic}")
+ @ApiOperation(value = "Produce message to a persistent topic.", response =
String.class, responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
+ @ApiResponse(code = 412, message = "Namespace name is not valid"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ public void produceOnPersistentTopic(@Suspended final AsyncResponse
asyncResponse,
+ @ApiParam(value = "Specify the tenant",
required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace",
required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name",
required = true)
+ @PathParam("topic") @Encoded String
encodedTopic,
+ @QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
+ ProducerMessages producerMessages) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateProducePermission();
+ publishMessages(asyncResponse, producerMessages, authoritative);
+ } catch (Exception e) {
+ log.error("[{}] Failed to produce on topic {}", clientAppId(),
topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+
+ @POST
+ @Path("/persistent/{tenant}/{namespace}/{topic}/partitions/{partition}")
+ @ApiOperation(value = "Produce message to a partition of a persistent
topic.",
+ response = String.class, responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
+ @ApiResponse(code = 412, message = "Namespace name is not valid"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ public void produceOnPersistentTopicPartition(@Suspended final
AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the
tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the
namespace", required = true)
+ @PathParam("namespace") String
namespace,
+ @ApiParam(value = "Specify topic
name", required = true)
+ @PathParam("topic") @Encoded String
encodedTopic,
+ @ApiParam(value = "Specify topic
partition", required = true)
+ @PathParam("partition") int partition,
+ @QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
+ ProducerMessages producerMessages) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateProducePermission();
+ publishMessagesToPartition(asyncResponse, producerMessages,
authoritative, partition);
+ } catch (Exception e) {
+ log.error("[{}] Failed to produce on topic {}", clientAppId(),
topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+
+ @POST
+ @Path("/non-persistent/{tenant}/{namespace}/{topic}")
+ @ApiOperation(value = "Produce message to a persistent topic.", response =
String.class, responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
+ @ApiResponse(code = 412, message = "Namespace name is not valid"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ public void produceOnNonPersistentTopic(@Suspended final AsyncResponse
asyncResponse,
+ @ApiParam(value = "Specify the
tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the
namespace", required = true)
+ @PathParam("namespace") String
namespace,
+ @ApiParam(value = "Specify topic
name", required = true)
+ @PathParam("topic") @Encoded String
encodedTopic,
+ @QueryParam("authoritative")
@DefaultValue("false")
+ boolean authoritative,
+ ProducerMessages producerMessages) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateProducePermission();
+ publishMessages(asyncResponse, producerMessages, authoritative);
+ } catch (Exception e) {
+ log.error("[{}] Failed to produce on topic {}", clientAppId(),
topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+
+ @POST
+
@Path("/non-persistent/{tenant}/{namespace}/{topic}/partitions/{partition}")
+ @ApiOperation(value = "Produce message to a partition of a persistent
topic.",
+ response = String.class, responseContainer = "List")
+ @ApiResponses(value = {
+ @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
+ @ApiResponse(code = 412, message = "Namespace name is not valid"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ public void produceOnNonPersistentTopicPartition(@Suspended final
AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify
the tenant", required = true)
+ @PathParam("tenant") String
tenant,
+ @ApiParam(value = "Specify
the namespace", required = true)
+ @PathParam("namespace")
String namespace,
+ @ApiParam(value = "Specify
topic name", required = true)
+ @PathParam("topic") @Encoded
String encodedTopic,
+ @ApiParam(value = "Specify
topic partition", required = true)
+ @PathParam("partition") int
partition,
+ @QueryParam("authoritative")
@DefaultValue("false")
+ boolean
authoritative,
+ ProducerMessages
producerMessages) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateProducePermission();
+ publishMessagesToPartition(asyncResponse, producerMessages,
authoritative, partition);
+ } catch (Exception e) {
+ log.error("[{}] Failed to produce on topic {}", clientAppId(),
topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
new file mode 100644
index 0000000..76ce022
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -0,0 +1,768 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.rest;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroWriter;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonWriter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.websocket.data.ProducerAck;
+import org.apache.pulsar.websocket.data.ProducerAcks;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessages;
+
+/**
+ * Contains methods used by REST api to producer/consumer/read messages
to/from pulsar topics.
+ */
+@Slf4j
+public class TopicsBase extends PersistentTopicsBase {
+
+ private static String defaultProducerName = "RestProducer";
+
+ // Publish message to a topic, can be partitioned or non-partitioned
+ protected void publishMessages(AsyncResponse asyncResponse,
ProducerMessages request, boolean authoritative) {
+ String topic = topicName.getPartitionedTopicName();
+ try {
+ if
(pulsar().getBrokerService().getOwningTopics().containsKey(topic)
+ || !findOwnerBrokerForTopic(authoritative, asyncResponse))
{
+ // If we've done look up or or after look up this broker owns
some of the partitions
+ // then proceed to publish message else asyncResponse will be
complete by look up.
+ addOrGetSchemaForTopic(getSchemaData(request.getKeySchema(),
request.getValueSchema()),
+ request.getSchemaVersion() == -1 ? null : new
LongSchemaVersion(request.getSchemaVersion()))
+ .thenAccept(schemaMeta -> {
+ // Both schema version and schema data are
necessary.
+ if (schemaMeta.getLeft() != null &&
schemaMeta.getRight() != null) {
+ internalPublishMessages(topicName, request,
pulsar().getBrokerService()
+
.getOwningTopics().get(topic).values(), asyncResponse,
+
AutoConsumeSchema.getSchema(schemaMeta.getLeft().toSchemaInfo()),
+ schemaMeta.getRight());
+ } else {
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ "Fail to add or retrieve schema."));
+ }
+ }).exceptionally(e -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to publish message: " +
e.getMessage());
+ }
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message:"
+ + e.getMessage()));
+ return null;
+ });
+ }
+ } catch (Exception e) {
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message: "
+ + e.getMessage()));
+ }
+ }
+
+ // Publish message to single partition of a partitioned topic.
+ protected void publishMessagesToPartition(AsyncResponse asyncResponse,
ProducerMessages request,
+ boolean authoritative,
int partition) {
+ if (topicName.isPartitioned()) {
+ asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Topic
name can't contain "
+ + "'-partition-' suffix."));
+ }
+ String topic = topicName.getPartitionedTopicName();
+ try {
+ // If broker owns the partition then proceed to publish message,
else do look up.
+ if
((pulsar().getBrokerService().getOwningTopics().containsKey(topic)
+ && pulsar().getBrokerService().getOwningTopics().get(topic)
+ .contains(partition))
+ || !findOwnerBrokerForTopic(authoritative, asyncResponse))
{
+ addOrGetSchemaForTopic(getSchemaData(request.getKeySchema(),
request.getValueSchema()),
+ request.getSchemaVersion() == -1 ? null : new
LongSchemaVersion(request.getSchemaVersion()))
+ .thenAccept(schemaMeta -> {
+ // Both schema version and schema data are
necessary.
+ if (schemaMeta.getLeft() != null &&
schemaMeta.getRight() != null) {
+ internalPublishMessagesToPartition(topicName,
request, partition, asyncResponse,
+
AutoConsumeSchema.getSchema(schemaMeta.getLeft().toSchemaInfo()),
+ schemaMeta.getRight());
+ } else {
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ "Fail to add or retrieve schema."));
+ }
+ }).exceptionally(e -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to publish message to single
partition: " + e.getLocalizedMessage());
+ }
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message"
+ + "to single partition: "
+ + e.getMessage()));
+ return null;
+ });
+ }
+ } catch (Exception e) {
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, "Fail to publish message: "
+ + e.getMessage()));
+ }
+ }
+
+ private void internalPublishMessagesToPartition(TopicName topicName,
ProducerMessages request,
+ int partition, AsyncResponse
asyncResponse,
+ Schema schema, SchemaVersion
schemaVersion) {
+ try {
+ String producerName = (null == request.getProducerName() ||
request.getProducerName().isEmpty())
+ ? defaultProducerName : request.getProducerName();
+ List<Message> messages = buildMessage(request, schema,
producerName, topicName);
+ List<CompletableFuture<PositionImpl>> publishResults = new
ArrayList<>();
+ List<ProducerAck> produceMessageResults = new ArrayList<>();
+ for (int index = 0; index < messages.size(); index++) {
+ ProducerAck produceMessageResult = new ProducerAck();
+ produceMessageResult.setMessageId(partition + "");
+ produceMessageResults.add(produceMessageResult);
+
publishResults.add(publishSingleMessageToPartition(topicName.getPartition(partition).toString(),
+ messages.get(index)));
+ }
+ FutureUtil.waitForAll(publishResults).thenRun(() -> {
+ processPublishMessageResults(produceMessageResults,
publishResults);
+ asyncResponse.resume(Response.ok().entity(new
ProducerAcks(produceMessageResults,
+ ((LongSchemaVersion)
schemaVersion).getVersion())).build());
+ }).exceptionally(e -> {
+ // Some message may published successfully, so till return ok
with result for each individual message.
+ processPublishMessageResults(produceMessageResults,
publishResults);
+ asyncResponse.resume(Response.ok().entity(new
ProducerAcks(produceMessageResults,
+ ((LongSchemaVersion)
schemaVersion).getVersion())).build());
+ return null;
+ });
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail publish messages to single partition with rest
produce message "
+ + "request for topic {}: {} ", topicName,
e.getCause());
+ }
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()));
+ }
+ }
+
+ private void internalPublishMessages(TopicName topicName, ProducerMessages
request,
+ List<Integer>
partitionIndexes,
+ AsyncResponse
asyncResponse, Schema schema,
+ SchemaVersion
schemaVersion) {
+ if (partitionIndexes.size() < 1) {
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ new BrokerServiceException.TopicNotFoundException("Topic
not owned by current broker.")));
+ }
+ try {
+ String producerName = (null == request.getProducerName() ||
request.getProducerName().isEmpty())
+ ? defaultProducerName : request.getProducerName();
+ List<Message> messages = buildMessage(request, schema,
producerName, topicName);
+ List<CompletableFuture<PositionImpl>> publishResults = new
ArrayList<>();
+ List<ProducerAck> produceMessageResults = new ArrayList<>();
+ // Try to publish messages to all partitions this broker owns in
round robin mode.
+ for (int index = 0; index < messages.size(); index++) {
+ ProducerAck produceMessageResult = new ProducerAck();
+ produceMessageResult.setMessageId(partitionIndexes.get(index %
(int) partitionIndexes.size()) + "");
+ produceMessageResults.add(produceMessageResult);
+
publishResults.add(publishSingleMessageToPartition(topicName.getPartition(partitionIndexes
+ .get(index % (int)
partitionIndexes.size())).toString(),
+ messages.get(index)));
+ }
+ FutureUtil.waitForAll(publishResults).thenRun(() -> {
+ processPublishMessageResults(produceMessageResults,
publishResults);
+ asyncResponse.resume(Response.ok().entity(new
ProducerAcks(produceMessageResults,
+ ((LongSchemaVersion)
schemaVersion).getVersion())).build());
+ }).exceptionally(e -> {
+ // Some message may published successfully, so till return ok
with result for each individual message.
+ processPublishMessageResults(produceMessageResults,
publishResults);
+ asyncResponse.resume(Response.ok().entity(new
ProducerAcks(produceMessageResults,
+ ((LongSchemaVersion)
schemaVersion).getVersion())).build());
+ return null;
+ });
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to publish messages with rest produce message
request for topic {}: {} ",
+ topicName, e.getCause());
+ }
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()));
+ }
+ }
+
+ private CompletableFuture<PositionImpl>
publishSingleMessageToPartition(String topic, Message message) {
+ CompletableFuture<PositionImpl> publishResult = new
CompletableFuture<>();
+ pulsar().getBrokerService().getTopic(topic, false)
+ .thenAccept(t -> {
+ // TODO: Check message backlog and fail if backlog too large.
+ if (!t.isPresent()) {
+ // Topic not found, and remove from owning partition list.
+ publishResult.completeExceptionally(new
BrokerServiceException.TopicNotFoundException("Topic not "
+ + "owned by current broker."));
+ TopicName topicName = TopicName.get(topic);
+
pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName())
+ .remove(topicName.getPartitionIndex());
+ } else {
+ try {
+ t.get().publishMessage(messageToByteBuf(message),
+ RestMessagePublishContext.get(publishResult,
t.get(), System.nanoTime()));
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to publish single messages to topic
{}: {} ",
+ topicName, e.getCause());
+ }
+ publishResult.completeExceptionally(e);
+ }
+ }
+ });
+
+ return publishResult;
+ }
+
+ // Process results for all message publishing attempts
+ private void processPublishMessageResults(List<ProducerAck>
produceMessageResults,
+
List<CompletableFuture<PositionImpl>> publishResults) {
+ // process publish message result
+ for (int index = 0; index < publishResults.size(); index++) {
+ try {
+ PositionImpl position = publishResults.get(index).get();
+ MessageId messageId = new
MessageIdImpl(position.getLedgerId(), position.getEntryId(),
+
Integer.parseInt(produceMessageResults.get(index).getMessageId()));
+
produceMessageResults.get(index).setMessageId(messageId.toString());
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail publish [{}] message with rest produce
message request for topic {}: {} ",
+ index, topicName);
+ }
+ if (e instanceof
BrokerServiceException.TopicNotFoundException) {
+ // Topic ownership might changed, force to look up again.
+
pulsar().getBrokerService().getOwningTopics().remove(topicName.getPartitionedTopicName());
+ }
+ extractException(e, produceMessageResults.get(index));
+ }
+ }
+ }
+
+ // Return error code depends on exception we got indicating if client
should retry with same broker.
+ private void extractException(Exception e, ProducerAck
produceMessageResult) {
+ if (!(e instanceof BrokerServiceException.TopicFencedException && e
instanceof ManagedLedgerException)) {
+ produceMessageResult.setErrorCode(2);
+ } else {
+ produceMessageResult.setErrorCode(1);
+ }
+ produceMessageResult.setErrorMsg(e.getMessage());
+ }
+
+ // Look up topic owner for given topic. Return if asyncResponse has been
completed
+ // which indicating redirect or exception.
+ private boolean findOwnerBrokerForTopic(boolean authoritative,
AsyncResponse asyncResponse) {
+ PartitionedTopicMetadata metadata =
internalGetPartitionedMetadata(authoritative, false);
+ List<String> redirectAddresses = Collections.synchronizedList(new
ArrayList<>());
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ List<CompletableFuture<Void>> lookupFutures = new ArrayList<>();
+ if (!topicName.isPartitioned() && metadata.partitions > 1) {
+ // Partitioned topic with multiple partitions, need to do look up
for each partition.
+ for (int index = 0; index < metadata.partitions; index++) {
+
lookupFutures.add(lookUpBrokerForTopic(topicName.getPartition(index),
+ authoritative, redirectAddresses));
+ }
+ } else {
+ // Non-partitioned topic or specific topic partition.
+ lookupFutures.add(lookUpBrokerForTopic(topicName, authoritative,
redirectAddresses));
+ }
+
+ FutureUtil.waitForAll(lookupFutures)
+ .thenRun(() -> {
+ processLookUpResult(redirectAddresses, asyncResponse, future);
+ }).exceptionally(e -> {
+ processLookUpResult(redirectAddresses, asyncResponse, future);
+ return null;
+ });
+ try {
+ return future.get();
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to lookup topic for rest produce message
request for topic {}.", topicName.toString());
+ }
+ if (!asyncResponse.isDone()) {
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, "Internal error: "
+ + e.getMessage()));
+ }
+ return true;
+ }
+ }
+
+ private void processLookUpResult(List<String> redirectAddresses,
AsyncResponse asyncResponse,
+ CompletableFuture<Boolean> future) {
+ // Current broker doesn't own the topic or any partition of the topic,
redirect client to a broker
+ // that own partition of the topic or know who own partition of the
topic.
+ if
(!pulsar().getBrokerService().getOwningTopics().containsKey(topicName.getPartitionedTopicName()))
{
+ if (redirectAddresses.isEmpty()) {
+ // No broker to redirect, means look up for some partitions
failed,
+ // client should retry with other brokers.
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Can't find owner of given topic."));
+ future.complete(true);
+ } else {
+ // Redirect client to other broker owns the topic or know
which broker own the topic.
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Redirect rest produce request for topic {}
from {} to {}.",
+ topicName, pulsar().getWebServiceAddress(),
redirectAddresses.get(0));
+ }
+ URI redirectURI = new URI(String.format("%s%s",
redirectAddresses.get(0), uri.getPath(false)));
+
asyncResponse.resume(Response.temporaryRedirect(redirectURI).build());
+ future.complete(true);
+ } catch (URISyntaxException | NullPointerException e) {
+ if (log.isDebugEnabled()) {
+ log.error("Error in preparing redirect url with rest
produce message request for topic {}: {}",
+ topicName, e.getMessage(), e);
+ }
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ "Fail to redirect client request."));
+ future.complete(true);
+ }
+ }
+ } else {
+ future.complete(false);
+ }
+ }
+
+ // Look up topic owner for non-partitioned topic or single topic partition.
+ private CompletableFuture<Void> lookUpBrokerForTopic(TopicName
partitionedTopicName,
+ boolean
authoritative, List<String> redirectAddresses) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ if
(!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Too many concurrent lookup request.");
+ }
+ future.completeExceptionally(new
BrokerServiceException.TooManyRequestsException("Too many "
+ + "concurrent lookup request"));
+ return future;
+ }
+ CompletableFuture<Optional<LookupResult>> lookupFuture =
pulsar().getNamespaceService()
+ .getBrokerServiceUrlAsync(partitionedTopicName,
+
LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());
+
+ lookupFuture.thenAccept(optionalResult -> {
+ if (optionalResult == null || !optionalResult.isPresent()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to lookup topic for rest produce message
request for topic {}.",
+ partitionedTopicName);
+ }
+ completeLookup(Pair.of(Collections.emptyList(), false),
redirectAddresses, future);
+ return;
+ }
+
+ LookupResult result = optionalResult.get();
+ if
(result.getLookupData().getHttpUrl().equals(pulsar().getWebServiceAddress())) {
+ // Current broker owns the topic, add to owning topic.
+ if (log.isDebugEnabled()) {
+ log.debug("Complete topic look up for rest produce message
request for topic {}, "
+ + "current broker is owner broker: {}",
+ partitionedTopicName, result.getLookupData());
+ }
+
pulsar().getBrokerService().getOwningTopics().computeIfAbsent(partitionedTopicName
+ .getPartitionedTopicName(), (key) -> new
ConcurrentOpenHashSet<Integer>())
+ .add(partitionedTopicName.getPartitionIndex());
+ completeLookup(Pair.of(Collections.emptyList(), false),
redirectAddresses, future);
+ } else {
+ // Current broker doesn't own the topic or doesn't know who
own the topic.
+ if (log.isDebugEnabled()) {
+ log.debug("Complete topic look up for rest produce message
request for topic {}, "
+ + "current broker is not owner broker: {}",
+ partitionedTopicName, result.getLookupData());
+ }
+ if (result.isRedirect()) {
+ // Redirect lookup.
+
completeLookup(Pair.of(Arrays.asList(result.getLookupData().getHttpUrl(),
+ result.getLookupData().getHttpUrlTls()), false),
redirectAddresses, future);
+ } else {
+ // Found owner for topic.
+
completeLookup(Pair.of(Arrays.asList(result.getLookupData().getHttpUrl(),
+ result.getLookupData().getHttpUrlTls()), true),
redirectAddresses, future);
+ }
+ }
+ }).exceptionally(exception -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to lookup broker with rest produce message
request for topic {}: {}",
+ partitionedTopicName, exception.getMessage());
+ }
+ completeLookup(Pair.of(Collections.emptyList(), false),
redirectAddresses, future);
+ return null;
+ });
+ return future;
+ }
+
+ private CompletableFuture<Pair<SchemaData, SchemaVersion>>
addOrGetSchemaForTopic(SchemaData schemaData,
+
LongSchemaVersion schemaVersion) {
+ CompletableFuture<Pair<SchemaData, SchemaVersion>> future = new
CompletableFuture<>();
+ // If schema version presents try to fetch existing schema.
+ if (null != schemaVersion) {
+ String id =
TopicName.get(topicName.getPartitionedTopicName()).getSchemaName();
+ SchemaRegistry.SchemaAndMetadata schemaAndMetadata;
+ try {
+ schemaAndMetadata =
pulsar().getSchemaRegistryService().getSchema(id, schemaVersion).get();
+ future.complete(Pair.of(schemaAndMetadata.schema,
schemaAndMetadata.version));
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to retrieve schema of version {} for topic
{}: {}",
+ schemaVersion.getVersion(), topicName,
e.getMessage());
+ }
+ future.completeExceptionally(e);
+ }
+ } else if (null != schemaData) {
+ // Else try to add schema to topic.
+ SchemaVersion sv;
+ try {
+ sv = addSchema(schemaData).get();
+ future.complete(Pair.of(schemaData, sv));
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to add schema {} for topic {}: {}",
+ new String(schemaData.toSchemaInfo().getSchema()),
topicName, e.getMessage());
+ }
+ future.completeExceptionally(e);
+ }
+ } else {
+ // Indicating exception.
+ future.complete(Pair.of(null, null));
+ }
+ return future;
+ }
+
+ // Add a new schema to schema registry for a topic
+ private CompletableFuture<SchemaVersion> addSchema(SchemaData schemaData) {
+ // Only need to add to first partition the broker owns since the
schema id in schema registry are
+ // same for all partitions which is the partitionedTopicName
+ List<Integer> partitions =
pulsar().getBrokerService().getOwningTopics()
+ .get(topicName.getPartitionedTopicName()).values();
+ CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
+ for (int index = 0; index < partitions.size(); index++) {
+ CompletableFuture<SchemaVersion> future = new
CompletableFuture<>();
+ String topicPartitionName =
topicName.getPartition(partitions.get(index)).toString();
+ pulsar().getBrokerService().getTopic(topicPartitionName, false)
+ .thenAccept(topic -> {
+ if (!topic.isPresent()) {
+ future.completeExceptionally(new
BrokerServiceException.TopicNotFoundException(
+ "Topic " + topicPartitionName + " not found"));
+ } else {
+ topic.get().addSchema(schemaData).thenAccept(schemaVersion
-> future.complete(schemaVersion))
+ .exceptionally(exception -> {
+ future.completeExceptionally(exception);
+ return null;
+ });
+ }
+ });
+ try {
+ result.complete(future.get());
+ break;
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to add schema to topic " +
topicName.getPartitionedTopicName()
+ + " for partition " + partitions.get(index) + "
for REST produce request.");
+ }
+ }
+ }
+ // Not able to add schema to any partition
+ if (!result.isDone()) {
+ result.completeExceptionally(new SchemaException("Unable to add
schema " + schemaData
+ + " to topic " + topicName.getPartitionedTopicName()));
+ }
+ return result;
+ }
+
+ // Build schemaData from passed in schema string.
+ private SchemaData getSchemaData(String keySchema, String valueSchema) {
+ try {
+ SchemaInfoImpl valueSchemaInfo = (valueSchema == null ||
valueSchema.isEmpty())
+ ? (SchemaInfoImpl) StringSchema.utf8().getSchemaInfo() :
+ ObjectMapperFactory.getThreadLocal()
+ .readValue(valueSchema, SchemaInfoImpl.class);
+ if (null == valueSchemaInfo.getName()) {
+ valueSchemaInfo.setName(valueSchemaInfo.getType().toString());
+ }
+ // Value schema only
+ if (keySchema == null || keySchema.isEmpty()) {
+ return SchemaData.builder()
+ .data(valueSchemaInfo.getSchema())
+ .isDeleted(false)
+ .user("Rest Producer")
+ .timestamp(System.currentTimeMillis())
+ .type(valueSchemaInfo.getType())
+ .props(valueSchemaInfo.getProperties())
+ .build();
+ } else {
+ // Key_Value schema
+ SchemaInfoImpl keySchemaInfo =
ObjectMapperFactory.getThreadLocal()
+ .readValue(keySchema, SchemaInfoImpl.class);
+ if (null == keySchemaInfo.getName()) {
+ keySchemaInfo.setName(keySchemaInfo.getType().toString());
+ }
+ SchemaInfo schemaInfo =
KeyValueSchemaInfo.encodeKeyValueSchemaInfo("KVSchema-"
+ + topicName.getPartitionedTopicName(),
+ keySchemaInfo, valueSchemaInfo,
+ KeyValueEncodingType.SEPARATED);
+ return SchemaData.builder()
+ .data(schemaInfo.getSchema())
+ .isDeleted(false)
+ .user("Rest Producer")
+ .timestamp(System.currentTimeMillis())
+ .type(schemaInfo.getType())
+ .props(schemaInfo.getProperties())
+ .build();
+ }
+ } catch (IOException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to parse schema info for rest produce request
with key schema {} and value schema {}"
+ , keySchema, valueSchema);
+ }
+ return null;
+ }
+ }
+
+ // Convert message to ByteBuf
+ public ByteBuf messageToByteBuf(Message message) {
+ checkArgument(message instanceof MessageImpl, "Message must be type of
MessageImpl.");
+
+ MessageImpl msg = (MessageImpl) message;
+ MessageMetadata messageMetadata = msg.getMessageBuilder();
+ ByteBuf payload = msg.getDataBuffer();
+
messageMetadata.setCompression(CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE));
+ messageMetadata.setUncompressedSize(payload.readableBytes());
+
+ return
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata, payload);
+ }
+
+ // Build pulsar message from REST request.
+ private List<Message> buildMessage(ProducerMessages producerMessages,
Schema schema,
+ String producerName, TopicName
topicName) {
+ List<ProducerMessage> messages;
+ List<Message> pulsarMessages = new ArrayList<>();
+
+ messages = producerMessages.getMessages();
+ for (ProducerMessage message : messages) {
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setProducerName(producerName);
+ messageMetadata.setPublishTime(System.currentTimeMillis());
+ messageMetadata.setSequenceId(message.getSequenceId());
+ if (null != message.getReplicationClusters()) {
+
messageMetadata.addAllReplicateTos(message.getReplicationClusters());
+ }
+
+ if (null != message.getProperties()) {
+
messageMetadata.addAllProperties(message.getProperties().entrySet().stream().map(entry
-> {
+ org.apache.pulsar.common.api.proto.KeyValue keyValue =
+ new org.apache.pulsar.common.api.proto.KeyValue();
+ keyValue.setKey(entry.getKey());
+ keyValue.setValue(entry.getValue());
+ return keyValue;
+ }).collect(Collectors.toList()));
+ }
+ if (null != message.getKey()) {
+ // If has key schema, encode partition key, else use plain
text.
+ if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+ KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
+ messageMetadata.setPartitionKey(
+
Base64.getEncoder().encodeToString(encodeWithSchema(message.getKey(),
+ kvSchema.getKeySchema())));
+ messageMetadata.setPartitionKeyB64Encoded(true);
+ } else {
+ messageMetadata.setPartitionKey(message.getKey());
+ messageMetadata.setPartitionKeyB64Encoded(false);
+ }
+ }
+ if (null != message.getEventTime() &&
!message.getEventTime().isEmpty()) {
+
messageMetadata.setEventTime(Long.valueOf(message.getEventTime()));
+ }
+ if (message.isDisableReplication()) {
+ messageMetadata.clearReplicateTo();
+ messageMetadata.addReplicateTo("__local__");
+ }
+ if (message.getDeliverAt() != 0 && messageMetadata.hasEventTime())
{
+ messageMetadata.setDeliverAtTime(message.getDeliverAt());
+ } else if (message.getDeliverAfterMs() != 0) {
+
messageMetadata.setDeliverAtTime(messageMetadata.getEventTime() +
message.getDeliverAfterMs());
+ }
+ if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+ KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
+ pulsarMessages.add(MessageImpl.create(messageMetadata,
+ ByteBuffer.wrap(encodeWithSchema(message.getPayload(),
kvSchema.getValueSchema())),
+ schema, topicName.toString()));
+ } else {
+ pulsarMessages.add(MessageImpl.create(messageMetadata,
+ ByteBuffer.wrap(encodeWithSchema(message.getPayload(),
schema)), schema,
+ topicName.toString()));
+ }
+ }
+
+ return pulsarMessages;
+ }
+
+ // Encode message with corresponding schema, do necessary conversion
before encoding
+ private byte[] encodeWithSchema(String input, Schema schema) {
+ try {
+ switch (schema.getSchemaInfo().getType()) {
+ case INT8:
+ return schema.encode(Byte.parseByte(input));
+ case INT16:
+ return schema.encode(Short.parseShort(input));
+ case INT32:
+ return schema.encode(Integer.parseInt(input));
+ case INT64:
+ return schema.encode(Long.parseLong(input));
+ case STRING:
+ return schema.encode(input);
+ case FLOAT:
+ return schema.encode(Float.parseFloat(input));
+ case DOUBLE:
+ return schema.encode(Double.parseDouble(input));
+ case BOOLEAN:
+ return schema.encode(Boolean.parseBoolean(input));
+ case BYTES:
+ return schema.encode(input.getBytes());
+ case DATE:
+ return
schema.encode(DateFormat.getDateInstance().parse(input));
+ case TIME:
+ return schema.encode(new Time(Long.parseLong(input)));
+ case TIMESTAMP:
+ return schema.encode(new Timestamp(Long.parseLong(input)));
+ case INSTANT:
+ return schema.encode(Instant.parse(input));
+ case LOCAL_DATE:
+ return schema.encode(LocalDate.parse(input));
+ case LOCAL_TIME:
+ return schema.encode(LocalTime.parse(input));
+ case LOCAL_DATE_TIME:
+ return schema.encode(LocalDateTime.parse(input));
+ case JSON:
+ GenericJsonWriter jsonWriter = new GenericJsonWriter();
+ return jsonWriter.write(new GenericJsonRecord(null, null,
+
ObjectMapperFactory.getThreadLocal().readTree(input), schema.getSchemaInfo()));
+ case AVRO:
+ AvroBaseStructSchema avroSchema = ((AvroBaseStructSchema)
schema);
+ Decoder decoder =
DecoderFactory.get().jsonDecoder(avroSchema.getAvroSchema(), input);
+ DatumReader<GenericData.Record> reader = new
GenericDatumReader(avroSchema.getAvroSchema());
+ GenericRecord genericRecord = reader.read(null, decoder);
+ GenericAvroWriter avroWriter = new
GenericAvroWriter(avroSchema.getAvroSchema());
+ return avroWriter.write(new GenericAvroRecord(null,
+ avroSchema.getAvroSchema(), null, genericRecord));
+ case PROTOBUF_NATIVE:
+ case KEY_VALUE:
+ default:
+ throw new
PulsarClientException.InvalidMessageException("");
+ }
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Fail to encode value {} with schema {} for rest
produce request", input,
+ new String(schema.getSchemaInfo().getSchema()));
+ }
+ return new byte[0];
+ }
+ }
+
+ // Release lookup semaphore and add result to redirectAddresses if current
broker doesn't own the topic.
+ private synchronized void completeLookup(Pair<List<String>, Boolean>
result, List<String> redirectAddresses,
+ CompletableFuture<Void> future) {
+ pulsar().getBrokerService().getLookupRequestSemaphore().release();
+ // Left is lookup result of secure/insecure address if lookup succeed,
Right is address is the owner's address
+ // or it's a address to redirect lookup.
+ if (!result.getLeft().isEmpty()) {
+ if (result.getRight()) {
+ // If address is for owner of topic partition, add to head and
it'll have higher priority
+ // compare to broker for look redirect.
+ redirectAddresses.add(0, isRequestHttps() ?
result.getLeft().get(1) : result.getLeft().get(0));
+ } else {
+ redirectAddresses.add(redirectAddresses.size(),
isRequestHttps()
+ ? result.getLeft().get(1) : result.getLeft().get(0));
+ }
+ }
+ future.complete(null);
+ }
+
+ public void validateProducePermission() throws Exception {
+ if (pulsar().getConfiguration().isAuthenticationEnabled()
+ && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.UNAUTHORIZED, "Need to
authenticate to perform the request");
+ }
+
+ boolean isAuthorized =
pulsar().getBrokerService().getAuthorizationService()
+ .canProduce(topicName, originalPrincipal(),
clientAuthData());
+ if (!isAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to produce to topic %s"
+ + " with clientAppId [%s] and authdata
%s", topicName.toString(),
+ clientAppId(), clientAuthData()));
+ }
+ }
+ }
+
+}
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/package-info.java
similarity index 57%
copy from
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
copy to
pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/package-info.java
index e74ebb1..bafd56b 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/package-info.java
@@ -16,31 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.websocket.data;
-
-import java.util.List;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/**
- * Class represent messages to be published.
- */
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class ProducerMessages {
- // Version of schema used for messages.
- private long schemaVersion;
-
- // Base64 encoded serialized schema for key
- private String keySchema;
-
- // Base64 encoded serialized schema for value
- private String valueSchema;
-
- private String producerName;
-
- private List<ProducerMessage> messages;
-}
+package org.apache.pulsar.broker.rest;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f7cc662..58b3f43 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -191,6 +191,9 @@ public class BrokerService implements Closeable {
// Namespace --> Bundle --> topicName --> topic
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
ConcurrentOpenHashMap<String, Topic>>>
multiLayerTopicsMap;
+ // Keep track of topics and partitions served by this broker for fast
lookup.
+ @Getter
+ private final ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<Integer>> owningTopics;
private int numberOfNamespaceBundles = 0;
private final EventLoopGroup acceptorGroup;
@@ -271,6 +274,7 @@ public class BrokerService implements Closeable {
this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
+ this.owningTopics = new ConcurrentOpenHashMap<>();
this.pulsarStats = new PulsarStats(pulsar);
this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index 624108f..5cf36ee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.client.api.Range;
* 2.The whole range of hash value could be covered by all the consumers.
* 3.Once a consumer is removed, the left consumers could still serve the
whole range.
*
- * Initializing with a fixed hash range, by default 2 << 5.
+ * Initializing with a fixed hash range, by default 2 << 15.
* First consumer added, hash range looks like:
*
* 0 -> 65536(consumer-1)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ccd5a50..f0324ba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1152,12 +1152,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return null;
} else {
- // There was an early request to create a
producer with
- // same producerId. This can happen when
- // client
- // timeout is lower the broker timeouts. We
need to wait
- // until the previous producer creation
- // request
+ // There was an early request to create a
producer with same producerId.
+ // This can happen when client timeout is
lower than the broker timeouts.
+ // We need to wait until the previous producer
creation request
// either complete or fails.
ServerError error = null;
if (!existingProducerFuture.isDone()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
new file mode 100644
index 0000000..3f57806
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -0,0 +1,831 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Sets;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.admin.v2.PersistentTopics;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.rest.Topics;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerAcks;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessages;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+@PrepareForTest(PersistentTopics.class)
+public class TopicsTest extends MockedPulsarServiceBaseTest {
+
+ private Topics topics;
+ private final String testLocalCluster = "test";
+ private final String testTenant = "my-tenant";
+ private final String testNamespace = "my-namespace";
+ private final String testTopicName = "my-topic";
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ super.internalSetup();
+ topics = spy(new Topics());
+ topics.setPulsar(pulsar);
+ doReturn(TopicDomain.persistent.value()).when(topics).domain();
+ doReturn("test-app").when(topics).clientAppId();
+
doReturn(mock(AuthenticationDataHttps.class)).when(topics).clientAuthData();
+ admin.clusters().createCluster(testLocalCluster, new
ClusterDataImpl());
+ admin.tenants().createTenant(testTenant, new
TenantInfoImpl(Sets.newHashSet("role1", "role2"),
Sets.newHashSet(testLocalCluster)));
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace,
+ Sets.newHashSet(testLocalCluster));
+ }
+
+ @Override
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testProduceToNonPartitionedTopic() throws Exception {
+ admin.topics().createNonPartitionedTopic("persistent://" + testTenant
+ "/"
+ + testNamespace + "/" + testTopicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Schema<String> schema = StringSchema.utf8();
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(schema.getSchemaInfo()));
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(schema.getSchemaInfo()));
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 3);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+ }
+
+ @Test
+ public void testProduceToPartitionedTopic() throws Exception {
+ admin.topics().createPartitionedTopic("persistent://" + testTenant +
"/" + testNamespace
+ + "/" + testTopicName + "-p", 5);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Schema<String> schema = StringSchema.utf8();
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(schema.getSchemaInfo()));
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(schema.getSchemaInfo()));
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":6},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":7},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":8},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":9},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":10}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName + "-p", false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 10);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ int[] messagePerPartition = new int[5];
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
messagePerPartition[Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2])]++;
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+ for (int index = 0; index < messagePerPartition.length; index++) {
+ // We publish to each partition in round robin mode so each
partition should get at most 2 message.
+ Assert.assertTrue(messagePerPartition[index] <= 2);
+ }
+ }
+
+ @Test
+ public void testProduceToPartitionedTopicSpecificPartition() throws
Exception {
+ admin.topics().createPartitionedTopic("persistent://" + testTenant +
"/"
+ + testNamespace + "/" + testTopicName, 5);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Schema<String> schema = StringSchema.utf8();
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(schema.getSchemaInfo()));
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(schema.getSchemaInfo()));
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopicPartition(asyncResponse, testTenant,
testNamespace, testTopicName, 2,false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 4);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), 2);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+ }
+
+ @Test
+ public void testProduceFailed() throws Exception {
+ admin.topics().createNonPartitionedTopic("persistent://" + testTenant
+ "/" + testNamespace + "/" + testTopicName);
+ pulsar.getBrokerService().getTopic("persistent://" + testTenant + "/"
+ testNamespace
+ + "/" + testTopicName, false).thenAccept(topic -> {
+ try {
+ PersistentTopic mockPersistentTopic = spy((PersistentTopic)
topic.get());
+ AtomicInteger count = new AtomicInteger();
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
+ Topic.PublishContext publishContext =
invocationOnMock.getArgument(1);
+ if (count.getAndIncrement() < 2) {
+ publishContext.completed(null, -1, -1);
+ } else {
+ publishContext.completed(new
BrokerServiceException.TopicFencedException("Fake exception"),
+ -1, -1);
+ }
+ return null;
+ }
+ }).when(mockPersistentTopic).publishMessage(any(), any());
+ BrokerService mockBrokerService =
spy(pulsar.getBrokerService());
+
doReturn(CompletableFuture.completedFuture(Optional.of(mockPersistentTopic)))
+ .when(mockBrokerService).getTopic(anyString(),
anyBoolean());
+ doReturn(mockBrokerService).when(pulsar).getBrokerService();
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Schema<String> schema = StringSchema.utf8();
+ ProducerMessages producerMessages = new ProducerMessages();
+
producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(schema.getSchemaInfo()));
+
producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(schema.getSchemaInfo()));
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ // Previous request should trigger namespace bundle loading,
retry produce.
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+
Assert.assertEquals(response.getMessagePublishResults().size(), 4);
+ int errorResponse = 0;
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+ int errorCode =
response.getMessagePublishResults().get(index).getErrorCode();
+ if (0 == errorCode) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertTrue(response.getMessagePublishResults().get(index)
+ .getMessageId().length() > 0);
+ } else {
+ errorResponse++;
+ Assert.assertEquals(errorCode, 2);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorMsg(),
+
"org.apache.pulsar.broker.service.BrokerServiceException$"
+ + "TopicFencedException: Fake
exception");
+ }
+ }
+ // Add entry start to fail after 2nd operation, we published 4
msg so expecting 2 error response.
+ Assert.assertTrue(errorResponse == 2);
+ } catch (Throwable e) {
+ Assert.fail(e.getMessage());
+ }
+ }).get();
+ }
+
+ @Test
+ public void testLookUpWithRedirect() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ String requestPath =
"/admin/v3/topics/my-tenant/my-namespace/my-topic";
+ //create topic on one broker
+ admin.topics().createNonPartitionedTopic(topicName);
+ PulsarService pulsar2 = startBroker(getDefaultConf());
+ doReturn(false).when(topics).isRequestHttps();
+ UriInfo uriInfo = mock(UriInfo.class);
+ doReturn(requestPath).when(uriInfo).getPath(anyBoolean());
+ Whitebox.setInternalState(topics, "uri", uriInfo);
+ //do produce on another broker
+ topics.setPulsar(pulsar2);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(Schema.INT64.getSchemaInfo()));
+ String message = "[]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ // Verify got redirect response
+ Assert.assertEquals(responseCaptor.getValue().getStatusInfo(),
Response.Status.TEMPORARY_REDIRECT);
+ // Verify URI point to address of broker the topic was created on
+ Assert.assertEquals(responseCaptor.getValue().getLocation().toString(),
+ pulsar.getWebServiceAddress() + requestPath);
+ }
+
+ @Test
+ public void testLookUpWithException() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ NamespaceService nameSpaceService = mock(NamespaceService.class);
+ CompletableFuture future = new CompletableFuture();
+ future.completeExceptionally(new BrokerServiceException("Fake
Exception"));
+ CompletableFuture existFuture = new CompletableFuture();
+ existFuture.complete(true);
+
doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any());
+ doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+ doReturn(nameSpaceService).when(pulsar).getNamespaceService();
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(Schema.INT64.getSchemaInfo()));
+ String message = "[]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<RestException> responseCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getMessage(), "Can't
find owner of given topic.");
+ }
+
+ @Test
+ public void testLookUpTopicNotExist() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ NamespaceService nameSpaceService = mock(NamespaceService.class);
+ CompletableFuture existFuture = new CompletableFuture();
+ existFuture.complete(false);
+ doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+ doReturn(nameSpaceService).when(pulsar).getNamespaceService();
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(Schema.INT64.getSchemaInfo()));
+ String message = "[]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<RestException> responseCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getMessage(), "Fail to
publish message: Topic not exist");
+ }
+
+ @Test
+ public void testProduceWithLongSchema() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Consumer consumer = pulsarClient.newConsumer(Schema.INT64)
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(Schema.INT64.getSchemaInfo()));
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"111111111111\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"222222222222\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"333333333333\",\"eventTime\":1603045262772,\"sequenceId\":3},"
+
+
"{\"key\":\"my-key\",\"payload\":\"444444444444\",\"eventTime\":1603045262772,\"sequenceId\":4},"
+
+
"{\"key\":\"my-key\",\"payload\":\"555555555555\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 5);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+
+ List<Long> expectedMsg = Arrays.asList(111111111111l, 222222222222l,
333333333333l, 444444444444l, 555555555555l);
+ Message<Long> msg = null;
+ // Assert all messages published by REST producer can be received by
consumer in expected order.
+ for (int i = 0; i < 5; i++) {
+ msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertEquals(expectedMsg.get(i),
Schema.INT64.decode(msg.getData()));
+ Assert.assertEquals("my-key", msg.getKey());
+ }
+ }
+
+ // Default schema is String schema
+ @Test
+ public void testProduceNoSchema() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Consumer consumer = pulsarClient.newConsumer(StringSchema.utf8())
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ ProducerMessages producerMessages = new ProducerMessages();
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 5);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+
+ List<String> expectedMsg = Arrays.asList("RestProducer:1",
"RestProducer:2", "RestProducer:3", "RestProducer:4",
+ "RestProducer:5");
+ Message<String> msg = null;
+ // Assert all messages published by REST producer can be received by
consumer in expected order.
+ for (int i = 0; i < 5; i++) {
+ msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertEquals(expectedMsg.get(i),
StringSchema.utf8().decode(msg.getData()));
+ Assert.assertEquals("my-key", msg.getKey());
+ }
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private static class Seller {
+ public String state;
+ public String street;
+ public long zipCode;
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ private static class PC {
+ public String brand;
+ public String model;
+ public int year;
+ public GPU gpu;
+ public Seller seller;
+ }
+
+ private enum GPU {
+ AMD, NVIDIA
+ }
+
+ @Test
+ public void testProduceWithJsonSchema() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ GenericSchema jsonSchema =
GenericJsonSchema.of(JSONSchema.of(SchemaDefinition.builder()
+ .withPojo(PC.class).build()).getSchemaInfo());
+ PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
+ new Seller("WA", "main street", 98004));
+ PC anotherPc = new PC("asus", "rog", 2020, GPU.NVIDIA,
+ new Seller("CA", "back street", 90232));
+ Consumer consumer = pulsarClient.newConsumer(jsonSchema)
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(jsonSchema.getSchemaInfo()));
+ String message = "[" +
+ "{\"key\":\"my-key\",\"payload\":\""
+ +
ObjectMapperFactory.getThreadLocal().writeValueAsString(pc).replace("\"",
"\\\"")
+ + "\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+ + "{\"key\":\"my-key\",\"payload\":\""
+ +
ObjectMapperFactory.getThreadLocal().writeValueAsString(anotherPc).replace("\"",
"\\\"")
+ + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace,
+ testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 2);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+
+ List<PC> expected = Arrays.asList(pc, anotherPc);
+ Message<String> msg = null;
+ // Assert all messages published by REST producer can be received by
consumer in expected order.
+ for (int i = 0; i < 2; i++) {
+ msg = consumer.receive(2, TimeUnit.SECONDS);
+ PC msgPc = ObjectMapperFactory.getThreadLocal().
+
treeToValue(((GenericJsonRecord)jsonSchema.decode(msg.getData())).getJsonNode(),
PC.class);
+ Assert.assertEquals(msgPc.brand, expected.get(i).brand);
+ Assert.assertEquals(msgPc.model, expected.get(i).model);
+ Assert.assertEquals(msgPc.year, expected.get(i).year);
+ Assert.assertEquals(msgPc.gpu, expected.get(i).gpu);
+ Assert.assertEquals(msgPc.seller.state,
expected.get(i).seller.state);
+ Assert.assertEquals(msgPc.seller.street,
expected.get(i).seller.street);
+ Assert.assertEquals(msgPc.seller.zipCode,
expected.get(i).seller.zipCode);
+ Assert.assertEquals("my-key", msg.getKey());
+ }
+ }
+
+ @Test
+ public void testProduceWithAvroSchema() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ GenericSchemaImpl avroSchema =
GenericAvroSchema.of(AvroSchema.of(SchemaDefinition.builder()
+ .withPojo(PC.class).build()).getSchemaInfo());
+ PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
+ new Seller("WA", "main street", 98004));
+ PC anotherPc = new PC("asus", "rog", 2020, GPU.NVIDIA,
+ new Seller("CA", "back street", 90232));
+ Consumer consumer = pulsarClient.newConsumer(avroSchema)
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(avroSchema.getSchemaInfo()));
+
+ ReflectDatumWriter<PC> datumWriter = new
ReflectDatumWriter(avroSchema.getAvroSchema());
+ ByteArrayOutputStream outputStream1 = new ByteArrayOutputStream();
+ ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream();
+
+ JsonEncoder encoder1 =
EncoderFactory.get().jsonEncoder(avroSchema.getAvroSchema(), outputStream1);
+ JsonEncoder encoder2 =
EncoderFactory.get().jsonEncoder(avroSchema.getAvroSchema(), outputStream2);
+
+ datumWriter.write(pc, encoder1);
+ encoder1.flush();
+ datumWriter.write(anotherPc, encoder2);
+ encoder2.flush();
+
+ String message = "[" +
+ "{\"key\":\"my-key\",\"payload\":\""
+ + outputStream1.toString().replace("\"", "\\\"")
+ + "\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+ + "{\"key\":\"my-key\",\"payload\":\""
+ + outputStream2.toString().replace("\"", "\\\"")
+ + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace,
+ testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 2);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+
+ List<PC> expected = Arrays.asList(pc, anotherPc);
+ Message<String> msg = null;
+ // Assert all messages published by REST producer can be received by
consumer in expected order.
+ for (int i = 0; i < 2; i++) {
+ msg = consumer.receive(2, TimeUnit.SECONDS);
+ GenericAvroRecord avroRecord = (GenericAvroRecord)
avroSchema.decode(msg.getData());
+
Assert.assertEquals(((Utf8)avroRecord.getAvroRecord().get("brand")).toString(),
expected.get(i).brand);
+
Assert.assertEquals(((Utf8)avroRecord.getAvroRecord().get("model")).toString(),
expected.get(i).model);
+ Assert.assertEquals((int)avroRecord.getAvroRecord().get("year"),
expected.get(i).year);
+
Assert.assertEquals(((GenericData.EnumSymbol)avroRecord.getAvroRecord().get("gpu")).toString(),
expected.get(i).gpu.toString());
+
Assert.assertEquals(((Utf8)((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("state")).toString(),
expected.get(i).seller.state);
+
Assert.assertEquals(((Utf8)((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("street")).toString(),
expected.get(i).seller.street);
+
Assert.assertEquals(((GenericRecord)avroRecord.getAvroRecord().get("seller")).get("zipCode"),
expected.get(i).seller.zipCode);
+ Assert.assertEquals("my-key", msg.getKey());
+ }
+ }
+
+ @Test
+ public void testProduceWithRestAndClientThenConsumeWithClient() throws
Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Schema keyValueSchema = KeyValueSchemaImpl.of(StringSchema.utf8(),
StringSchema.utf8(),
+
KeyValueEncodingType.SEPARATED);
+ Producer producer = pulsarClient.newProducer(keyValueSchema)
+ .topic(topicName)
+ .create();
+ Consumer consumer = pulsarClient.newConsumer(keyValueSchema)
+ .topic(topicName)
+ .subscriptionName("my-sub")
+
.subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ for (int i = 0; i < 3; i++) {
+ producer.newMessage(keyValueSchema)
+ .value(new KeyValue<>("my-key", "ClientProducer:" + i))
+ .send();
+ }
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 3);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+
+ List<String> expectedMsg = Arrays.asList("ClientProducer:0",
"ClientProducer:1", "ClientProducer:2",
+ "RestProducer:1",
"RestProducer:2", "RestProducer:3");
+ Message<String> msg = null;
+ // Assert both messages published by client producer and REST producer
can be received
+ // by consumer in expected order.
+ for (int i = 0; i < 6; i++) {
+ msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertEquals(expectedMsg.get(i),
StringSchema.utf8().decode(msg.getData()));
+ Assert.assertEquals("bXkta2V5", msg.getKey());
+ }
+ }
+
+ @Test
+ public void testProduceWithRestThenConsumeWithClient() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Schema keyValueSchema = KeyValueSchemaImpl.of(StringSchema.utf8(),
StringSchema.utf8(),
+ KeyValueEncodingType.SEPARATED);
+ Consumer consumer = pulsarClient.newConsumer(keyValueSchema)
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName,
+ false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 5);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+ // Specify schema version to use existing schema.
+ producerMessages = new ProducerMessages();
+ producerMessages.setSchemaVersion(response.getSchemaVersion());
+ message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":3},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":4},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":5}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 5);
+ Assert.assertEquals(response.getSchemaVersion(), 0);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(Integer.parseInt(response.getMessagePublishResults().get(index)
+ .getMessageId().split(":")[2]), -1);
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
0);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length()
> 0);
+ }
+
+ List<String> expectedMsg = Arrays.asList("RestProducer:1",
"RestProducer:2", "RestProducer:3",
+ "RestProducer:4", "RestProducer:5", "RestProducer:6",
+ "RestProducer:7", "RestProducer:8", "RestProducer:9",
+ "RestProducer:10");
+ Message<String> msg = null;
+ // Assert all messages published by REST producer can be received by
consumer in expected order.
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertEquals(expectedMsg.get(i),
StringSchema.utf8().decode(msg.getData()));
+ Assert.assertEquals("bXkta2V5", msg.getKey());
+ }
+ }
+
+ @Test
+ public void testProduceWithInCompatibleSchema() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ Producer producer = pulsarClient.newProducer(StringSchema.utf8())
+ .topic(topicName)
+ .create();
+
+ for (int i = 0; i < 3; i++) {
+ producer.send("message");
+ }
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+ producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal().
+ writeValueAsString(StringSchema.utf8().getSchemaInfo()));
+ String message = "[" +
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2},"
+
+
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}]";
+
producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message,
+ new TypeReference<List<ProducerMessage>>() {}));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<RestException> responseCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+
Assert.assertTrue(responseCaptor.getValue().getMessage().startsWith("Fail to
publish message:"
+ + "java.util.concurrent.ExecutionException:
org.apache.pulsar.broker.service.schema.exceptions."
+ + "SchemaException: Unable to add schema
SchemaData(type=KEY_VALUE, isDeleted=false, "
+ + "timestamp="));
+ Assert.assertTrue(responseCaptor.getValue().getMessage().endsWith(
+ "user=Rest Producer, data=[0, 0, 0, 0, 0, 0, 0, 0], "
+ + "props={key.schema.properties={\"__charset\":\"UTF-8\"},
value.schema.properties={\"__charset\":"
+ + "\"UTF-8\"}, value.schema.type=STRING,
key.schema.name=String, value.schema.name=String, "
+ + "kv.encoding.type=SEPARATED, key.schema.type=STRING}) to
topic persistent:"
+ + "//my-tenant/my-namespace/my-topic"));
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
index 040842b..4b3a158 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
@@ -45,7 +45,7 @@ public class GenericJsonRecord extends VersionedGenericRecord
{
this(schemaVersion, fields, jn, null);
}
- GenericJsonRecord(byte[] schemaVersion,
+ public GenericJsonRecord(byte[] schemaVersion,
List<Field> fields,
JsonNode jn, SchemaInfo schemaInfo) {
super(schemaVersion, fields);
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
index e74ebb1..821d5d3 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessages.java
@@ -32,7 +32,7 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class ProducerMessages {
// Version of schema used for messages.
- private long schemaVersion;
+ private long schemaVersion = -1;
// Base64 encoded serialized schema for key
private String keySchema;