This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fe21441f101 [improve] [pip] PIP-363: Add callback parameters to the
method: org.apache.pulsar.client.impl.SendCallback.sendComplete. (#22940)
fe21441f101 is described below
commit fe21441f101f3d6d47a243b81157e5c8bf3ad573
Author: crossoverJie <[email protected]>
AuthorDate: Wed Aug 14 01:28:48 2024 +0800
[improve] [pip] PIP-363: Add callback parameters to the method:
org.apache.pulsar.client.impl.SendCallback.sendComplete. (#22940)
---
pip/pip-363.md | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 111 insertions(+)
diff --git a/pip/pip-363.md b/pip/pip-363.md
new file mode 100644
index 00000000000..2b250e69871
--- /dev/null
+++ b/pip/pip-363.md
@@ -0,0 +1,111 @@
+# PIP-363: Add callback parameters to the method:
`org.apache.pulsar.client.impl.SendCallback.sendComplete`.
+
+# Background knowledge
+
+
+As introduced in
[PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md), Pulsar
has been fully integrated into the `OpenTelemetry` system, which defines some
metric specifications for [messaging
systems](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/#metric-messagingpublishduration).
+
+In the current Pulsar client code, it is not possible to obtain the number of
messages sent in batches(as well as some other sending data), making it
impossible to implement `messaging.publish.messages` metric.
+
+In the `opentelemetry-java-instrumentation` code, the
`org.apache.pulsar.client.impl.SendCallback` interface is used to instrument
data points. For specific implementation details, we can refer to
[this](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java#L89-L135).
+
+# Motivation
+
+
+In the current situation, `org.apache.pulsar.client.impl.ProducerImpl` does
not provide a public method to obtain the `numMessagesInBatch`.
+
+So, we can add some of
`org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg`'s key data into the
`org.apache.pulsar.client.impl.SendCallback.sendComplete` method.
+
+# Detailed Design
+
+Add callback parameters to the method:
`org.apache.pulsar.client.impl.SendCallback.sendComplete`:
+
+```java
+public interface SendCallback {
+
+ /**
+ * invoked when send operation completes.
+ *
+ * @param e
+ */
+ void sendComplete(Throwable e, OpSendMsgStats stats);
+}
+
+public interface OpSendMsgStats {
+ long getUncompressedSize();
+
+ long getSequenceId();
+
+ int getRetryCount();
+
+ long getBatchSizeByte();
+
+ int getNumMessagesInBatch();
+
+ long getHighestSequenceId();
+
+ int getTotalChunks();
+
+ int getChunkId();
+}
+
+@Builder
+public class OpSendMsgStatsImpl implements OpSendMsgStats {
+ private long uncompressedSize;
+ private long sequenceId;
+ private int retryCount;
+ private long batchSizeByte;
+ private int numMessagesInBatch;
+ private long highestSequenceId;
+ private int totalChunks;
+ private int chunkId;
+
+ @Override
+ public long getUncompressedSize() {
+ return uncompressedSize;
+ }
+
+ @Override
+ public long getSequenceId() {
+ return sequenceId;
+ }
+
+ @Override
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ @Override
+ public long getBatchSizeByte() {
+ return batchSizeByte;
+ }
+
+ @Override
+ public int getNumMessagesInBatch() {
+ return numMessagesInBatch;
+ }
+
+ @Override
+ public long getHighestSequenceId() {
+ return highestSequenceId;
+ }
+
+ @Override
+ public int getTotalChunks() {
+ return totalChunks;
+ }
+
+ @Override
+ public int getChunkId() {
+ return chunkId;
+ }
+}
+```
+
+# Links
+
+<!--
+Updated afterwards
+-->
+* Mailing List discussion thread:
https://lists.apache.org/thread/8pgmsvx1bxz4z1w8prpvpnfpt1kb57c9
+* Mailing List voting thread:
https://lists.apache.org/thread/t0olt3722j17gjtdxqqsl3cpy104ogpr