This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 18cb458f73b [improve][broker] Add callback parameters to the
SendCallback.sendComplete (#23196)
18cb458f73b is described below
commit 18cb458f73b8967eb7c1f1f133e0b7f9f1a60e93
Author: crossoverJie <[email protected]>
AuthorDate: Wed Aug 21 09:41:53 2024 +0800
[improve][broker] Add callback parameters to the SendCallback.sendComplete
(#23196)
---
.../nonpersistent/NonPersistentReplicator.java | 3 +-
.../service/persistent/PersistentReplicator.java | 3 +-
.../client/impl/ProduceWithMessageIdTest.java | 85 +++++++++++++++++++++-
.../client/impl/BatchMessageContainerImpl.java | 2 +-
.../apache/pulsar/client/impl/OpSendMsgStats.java | 38 ++++++++++
.../pulsar/client/impl/OpSendMsgStatsImpl.java | 73 +++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 34 ++++++---
.../apache/pulsar/client/impl/SendCallback.java | 5 +-
8 files changed, 224 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 6441230fad8..45b4ebf6e17 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -33,6 +33,7 @@ import
org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.OpSendMsgStats;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
@@ -173,7 +174,7 @@ public class NonPersistentReplicator extends
AbstractReplicator implements Repli
private MessageImpl msg;
@Override
- public void sendComplete(Exception exception) {
+ public void sendComplete(Throwable exception, OpSendMsgStats
opSendMsgStats) {
if (exception != null) {
log.error("[{}] Error producing on remote broker",
replicator.replicatorId, exception);
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 33e883ab940..b3d7546beed 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -60,6 +60,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.OpSendMsgStats;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
@@ -377,7 +378,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
private MessageImpl msg;
@Override
- public void sendComplete(Exception exception) {
+ public void sendComplete(Throwable exception, OpSendMsgStats
opSendMsgStats) {
if (exception != null && !(exception instanceof
PulsarClientException.InvalidMessageException)) {
log.error("[{}] Error producing on remote broker",
replicator.replicatorId, exception);
// cursor should be rewinded since it was incremented when
readMoreEntries
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
index b8efdeb9969..45f9a9c52e8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
@@ -18,14 +18,18 @@
*/
package org.apache.pulsar.client.impl;
+import static
org.apache.pulsar.client.impl.AbstractBatchMessageContainer.INITIAL_BATCH_BUFFER_SIZE;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MockBrokerService;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -38,13 +42,20 @@ import org.testng.annotations.Test;
@Test(groups = "broker-impl")
@Slf4j
-public class ProduceWithMessageIdTest {
+public class ProduceWithMessageIdTest extends ProducerConsumerBase {
MockBrokerService mockBrokerService;
@BeforeClass(alwaysRun = true)
- public void setup() {
+ public void setup() throws Exception {
mockBrokerService = new MockBrokerService();
mockBrokerService.start();
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
}
@AfterClass(alwaysRun = true)
@@ -86,7 +97,7 @@ public class ProduceWithMessageIdTest {
AtomicBoolean result = new AtomicBoolean(false);
producer.sendAsync(msg, new SendCallback() {
@Override
- public void sendComplete(Exception e) {
+ public void sendComplete(Throwable e, OpSendMsgStats
opSendMsgStats) {
log.info("sendComplete", e);
result.set(e == null);
}
@@ -115,4 +126,72 @@ public class ProduceWithMessageIdTest {
// the result is true only if broker received right message id.
Awaitility.await().untilTrue(result);
}
+
+ @Test
+ public void sendWithCallBack() throws Exception {
+
+ int batchSize = 10;
+
+ String topic = "persistent://public/default/testSendWithCallBack";
+ ProducerImpl<byte[]> producer =
+ (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic)
+ .enableBatching(true)
+ .batchingMaxMessages(batchSize)
+ .create();
+
+ CountDownLatch cdl = new CountDownLatch(1);
+ AtomicReference<OpSendMsgStats> sendMsgStats = new AtomicReference<>();
+ SendCallback sendComplete = new SendCallback() {
+ @Override
+ public void sendComplete(Throwable e, OpSendMsgStats
opSendMsgStats) {
+ log.info("sendComplete", e);
+ if (e == null){
+ cdl.countDown();
+ sendMsgStats.set(opSendMsgStats);
+ }
+ }
+
+ @Override
+ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
+
+ }
+
+ @Override
+ public SendCallback getNextSendCallback() {
+ return null;
+ }
+
+ @Override
+ public MessageImpl<?> getNextMessage() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<MessageId> getFuture() {
+ return null;
+ }
+ };
+ int totalReadabled = 0;
+ int totalUncompressedSize = 0;
+ for (int i = 0; i < batchSize; i++) {
+ MessageMetadata metadata = new MessageMetadata();
+ ByteBuffer buffer =
ByteBuffer.wrap("data".getBytes(StandardCharsets.UTF_8));
+ MessageImpl<byte[]> msg = MessageImpl.create(metadata, buffer,
Schema.BYTES, topic);
+ msg.getDataBuffer().retain();
+ totalReadabled += msg.getDataBuffer().readableBytes();
+ totalUncompressedSize += msg.getUncompressedSize();
+ producer.sendAsync(msg, sendComplete);
+ }
+
+ cdl.await();
+ OpSendMsgStats opSendMsgStats = sendMsgStats.get();
+ Assert.assertEquals(opSendMsgStats.getUncompressedSize(),
totalUncompressedSize + INITIAL_BATCH_BUFFER_SIZE);
+ Assert.assertEquals(opSendMsgStats.getSequenceId(), 0);
+ Assert.assertEquals(opSendMsgStats.getRetryCount(), 1);
+ Assert.assertEquals(opSendMsgStats.getBatchSizeByte(), totalReadabled);
+ Assert.assertEquals(opSendMsgStats.getNumMessagesInBatch(), batchSize);
+ Assert.assertEquals(opSendMsgStats.getHighestSequenceId(),
batchSize-1);
+ Assert.assertEquals(opSendMsgStats.getTotalChunks(), 0);
+ Assert.assertEquals(opSendMsgStats.getChunkId(), -1);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index a3c9d1bc9ab..44f1fb27465 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -229,7 +229,7 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
try {
// Need to protect ourselves from any exception being thrown in
the future handler from the application
if (firstCallback != null) {
- firstCallback.sendComplete(ex);
+ firstCallback.sendComplete(ex, null);
}
if (batchedMessageMetadataAndPayload != null) {
ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java
new file mode 100644
index 00000000000..dc28df50f28
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStats.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+
+public interface OpSendMsgStats {
+ long getUncompressedSize();
+
+ long getSequenceId();
+
+ int getRetryCount();
+
+ long getBatchSizeByte();
+
+ int getNumMessagesInBatch();
+
+ long getHighestSequenceId();
+
+ int getTotalChunks();
+
+ int getChunkId();
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java
new file mode 100644
index 00000000000..41bb742776c
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/OpSendMsgStatsImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Builder;
+
+@Builder
+public class OpSendMsgStatsImpl implements OpSendMsgStats {
+ private long uncompressedSize;
+ private long sequenceId;
+ private int retryCount;
+ private long batchSizeByte;
+ private int numMessagesInBatch;
+ private long highestSequenceId;
+ private int totalChunks;
+ private int chunkId;
+
+ @Override
+ public long getUncompressedSize() {
+ return uncompressedSize;
+ }
+
+ @Override
+ public long getSequenceId() {
+ return sequenceId;
+ }
+
+ @Override
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ @Override
+ public long getBatchSizeByte() {
+ return batchSizeByte;
+ }
+
+ @Override
+ public int getNumMessagesInBatch() {
+ return numMessagesInBatch;
+ }
+
+ @Override
+ public long getHighestSequenceId() {
+ return highestSequenceId;
+ }
+
+ @Override
+ public int getTotalChunks() {
+ return totalChunks;
+ }
+
+ @Override
+ public int getChunkId() {
+ return chunkId;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6d5a8145463..5c46057ae30 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -414,7 +414,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
@Override
- public void sendComplete(Exception e) {
+ public void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats) {
SendCallback loopingCallback = this;
MessageImpl<?> loopingMsg = currentMsg;
while (loopingCallback != null) {
@@ -424,7 +424,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
- private void onSendComplete(Exception e, SendCallback sendCallback,
MessageImpl<?> msg) {
+ private void onSendComplete(Throwable e, SendCallback sendCallback,
MessageImpl<?> msg) {
long createdAt = (sendCallback instanceof
ProducerImpl.DefaultSendMessageCallback)
? ((DefaultSendMessageCallback) sendCallback).createdAt :
this.createdAt;
long latencyNanos = System.nanoTime() - createdAt;
@@ -842,7 +842,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
log.warn("[{}] [{}] GetOrCreateSchema error", topic,
producerName, t);
if (t instanceof
PulsarClientException.IncompatibleSchemaException) {
msg.setSchemaState(MessageImpl.SchemaState.Broken);
-
callback.sendComplete((PulsarClientException.IncompatibleSchemaException) t);
+ callback.sendComplete(t, null);
}
} else {
log.info("[{}] [{}] GetOrCreateSchema succeed", topic,
producerName);
@@ -985,19 +985,19 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
case Closing:
case Closed:
callback.sendComplete(
- new
PulsarClientException.AlreadyClosedException("Producer already closed",
sequenceId));
+ new
PulsarClientException.AlreadyClosedException("Producer already closed",
sequenceId), null);
return false;
case ProducerFenced:
- callback.sendComplete(new
PulsarClientException.ProducerFencedException("Producer was fenced"));
+ callback.sendComplete(new
PulsarClientException.ProducerFencedException("Producer was fenced"), null);
return false;
case Terminated:
callback.sendComplete(
- new
PulsarClientException.TopicTerminatedException("Topic was terminated",
sequenceId));
+ new
PulsarClientException.TopicTerminatedException("Topic was terminated",
sequenceId), null);
return false;
case Failed:
case Uninitialized:
default:
- callback.sendComplete(new
PulsarClientException.NotConnectedException(sequenceId));
+ callback.sendComplete(new
PulsarClientException.NotConnectedException(sequenceId), null);
return false;
}
}
@@ -1012,20 +1012,20 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
} else {
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
callback.sendComplete(new
PulsarClientException.ProducerQueueIsFullError(
- "Producer send queue is full", sequenceId));
+ "Producer send queue is full", sequenceId), null);
return false;
}
if
(!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
semaphore.ifPresent(Semaphore::release);
callback.sendComplete(new
PulsarClientException.MemoryBufferIsFullError(
- "Client memory buffer is full", sequenceId));
+ "Client memory buffer is full", sequenceId), null);
return false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- callback.sendComplete(new PulsarClientException(e, sequenceId));
+ callback.sendComplete(new PulsarClientException(e, sequenceId),
null);
return false;
}
@@ -1302,7 +1302,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
private void completeCallbackAndReleaseSemaphore(long payloadSize,
SendCallback callback, Exception exception) {
semaphore.ifPresent(Semaphore::release);
client.getMemoryLimitController().releaseMemory(payloadSize);
- callback.sendComplete(exception);
+ callback.sendComplete(exception, null);
}
/**
@@ -1595,7 +1595,17 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
rpcLatencyHistogram.recordFailure(now - this.lastSentAt);
}
- callback.sendComplete(finalEx);
+ OpSendMsgStats opSendMsgStats = OpSendMsgStatsImpl.builder()
+ .uncompressedSize(uncompressedSize)
+ .sequenceId(sequenceId)
+ .retryCount(retryCount)
+ .batchSizeByte(batchSizeByte)
+ .numMessagesInBatch(numMessagesInBatch)
+ .highestSequenceId(highestSequenceId)
+ .totalChunks(totalChunks)
+ .chunkId(chunkId)
+ .build();
+ callback.sendComplete(finalEx, opSendMsgStats);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
index 369bb34a29a..f55d7ae7912 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SendCallback.java
@@ -20,18 +20,21 @@ package org.apache.pulsar.client.impl;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.classification.InterfaceStability;
/**
*
*/
[email protected]
public interface SendCallback {
/**
* invoked when send operation completes.
*
* @param e
+ * @param opSendMsgStats stats associated with the send operation
*/
- void sendComplete(Exception e);
+ void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats);
/**
* used to specify a callback to be invoked on completion of a send
operation for individual messages sent in a