This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dd60e6e45969eb2cd0e6332dc5c5dfe1d9b542a4 Author: Sijie Guo <[email protected]> AuthorDate: Tue Dec 15 16:12:09 2020 -0800 Add more information in send timeout exception (#8931) *Motivation* Currently the TimeoutException doesn't provide any useful information for troubleshooting. This change adds more information for troubleshooting. (cherry picked from commit c87557310615969114c7b4e9a257ae542d5bee07) --- .../apache/pulsar/client/impl/ProducerImpl.java | 57 +++++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 3ab4207..cc76656 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -63,6 +63,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.CryptoException; +import org.apache.pulsar.client.api.PulsarClientException.TimeoutException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -743,6 +744,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private ByteBufPair cmd; private long sequenceId; private ClientCnx cnx; + private OpSendMsg op; static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) { WriteInEventLoopCallback c = RECYCLER.get(); @@ -750,6 +752,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne c.cnx = cnx; c.sequenceId = op.sequenceId; c.cmd = op.cmd; + c.op = op; return c; } @@ -762,6 +765,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne try { cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); + op.updateSentTimestamp(); } finally { recycle(); } @@ -772,6 +776,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne cnx = null; cmd = null; sequenceId = -1; + op = null; recyclerHandle.recycle(this); } @@ -830,7 +835,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne format("The producer %s of the topic %s was already closed when closing the producers", producerName, topic)); pendingMessages.forEach(msg -> { - msg.callback.sendComplete(ex); + msg.sendComplete(ex); msg.cmd.release(); msg.recycle(); }); @@ -954,7 +959,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // Need to protect ourselves from any exception being thrown in the future handler from the // application - op.callback.sendComplete(null); + op.sendComplete(null); } catch (Throwable t) { log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, sequenceId, t); @@ -1011,7 +1016,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne pendingMessages.remove(); releaseSemaphoreForSendOp(op); try { - op.callback.sendComplete( + op.sendComplete( new PulsarClientException.ChecksumException( format("The checksum of the message which is produced by producer %s to the topic " + "%s is corrupted", producerName, topic))); @@ -1045,7 +1050,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne pendingMessages.remove(); releaseSemaphoreForSendOp(op); try { - op.callback.sendComplete( + op.sendComplete( new PulsarClientException.NotAllowedException( format("The size of the message which is produced by producer %s to the topic " + "%s is not allowed", producerName, topic))); @@ -1107,6 +1112,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne Runnable rePopulate; long sequenceId; long createdAt; + long firstSentAt; + long lastSentAt; + int retryCount; long batchSizeByte = 0; int numMessagesInBatch = 1; long highestSequenceId; @@ -1145,6 +1153,38 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return op; } + void updateSentTimestamp() { + this.lastSentAt = System.nanoTime(); + if (this.firstSentAt == -1L) { + this.firstSentAt = this.lastSentAt; + } + ++this.retryCount; + } + + void sendComplete(final Exception e) { + SendCallback callback = this.callback; + if (null != callback) { + Exception finalEx = e; + if (finalEx != null && finalEx instanceof TimeoutException) { + TimeoutException te = (TimeoutException) e; + long sequenceId = te.getSequenceId(); + long ns = System.nanoTime(); + String errMsg = String.format( + "%s : createdAt %s ns ago, firstSentAt %s ns ago, lastSentAt %s ns ago, retryCount %s", + te.getMessage(), + ns - this.createdAt, + ns - this.firstSentAt, + ns - this.lastSentAt, + retryCount + ); + + finalEx = new TimeoutException(errMsg, sequenceId); + } + + callback.sendComplete(finalEx); + } + } + void recycle() { msg = null; msgs = null; @@ -1153,6 +1193,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne rePopulate = null; sequenceId = -1L; createdAt = -1L; + firstSentAt = -1L; + lastSentAt = -1L; highestSequenceId = -1L; totalChunks = 0; chunkId = -1; @@ -1521,7 +1563,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) { // Need to protect ourselves from any exception being thrown in the future handler from the // application - op.callback.sendComplete(ex); + op.sendComplete(ex); } } catch (Throwable t) { log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, @@ -1645,13 +1687,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne Thread.currentThread().interrupt(); releaseSemaphoreForSendOp(op); if (op != null) { - op.callback.sendComplete(new PulsarClientException(ie, op.sequenceId)); + op.sendComplete(new PulsarClientException(ie, op.sequenceId)); } } catch (Throwable t) { releaseSemaphoreForSendOp(op); log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t); if (op != null) { - op.callback.sendComplete(new PulsarClientException(t, op.sequenceId)); + op.sendComplete(new PulsarClientException(t, op.sequenceId)); } } } @@ -1694,6 +1736,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne cnx.channel(), op.sequenceId); } cnx.ctx().write(op.cmd, cnx.ctx().voidPromise()); + op.updateSentTimestamp(); stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte); } cnx.ctx().flush();
