This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 42ed9bf  support async send  msg return sequenceId when throw 
Exception (#6825)
42ed9bf is described below

commit 42ed9bf12e29099e5c1655484311441402dde3d7
Author: liudezhi <[email protected]>
AuthorDate: Thu Apr 30 21:26:54 2020 +0800

    support async send  msg return sequenceId when throw Exception (#6825)
    
    Master Issue: #6824
    ## Motivation
    
    When sending messages asynchronously fails, an exception will be thrown, 
but it is not known which message is abnormal, and the user does not know which 
messages need to be retried。
    
    ## Modifications
    
    This change can be supported on the client side,   when throwing an 
exception need to set sequenceId
    org.apache.pulsar.client.api.PulsarClientException
    
    
    ```java
    public class PulsarClientException extends IOException {
        private long sequenceId = -1;
    
        public PulsarClientException(String msg, long sequenceId) {
            super(msg);
            this.sequenceId = sequenceId;
        }
    ```
    Client examples
    ```java
      producer.newMessage().sequenceId(1).value(value.getBytes())
                    .sendAsync().thenAccept(msgId -> {
                        System.out.println(msgId);
                    }).exceptionally(ex -> {
                        System.out.println( 
((PulsarClientException)ex.getCause()).getSequenceId());
                        return null;
                    });
    ```
---
 .../client/api/SimpleProducerConsumerTest.java     |  39 +++++++
 .../pulsar/client/api/PulsarClientException.java   | 121 ++++++++++++++++++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  33 +++---
 3 files changed, 177 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 51a7eef..a2fde6c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -47,6 +47,7 @@ import java.nio.file.Paths;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1143,6 +1144,44 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
     }
 
     @Test
+    public void testtSendCallBackReturnSequenceId() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .enableBatching(false)
+                .topic("persistent://my-property/my-ns/my-topic5")
+                .sendTimeout(1, TimeUnit.SECONDS);
+
+        Producer<byte[]> producer = producerBuilder.create();
+        final String message = "my-message";
+
+        // Trigger the send timeout
+        stopBroker();
+        List<CompletableFuture<MessageId>> futures = new 
ArrayList<CompletableFuture<MessageId>>();
+        for(int i = 0 ; i < 3 ; i++) {
+             CompletableFuture<MessageId> future = 
producer.newMessage().sequenceId(i).value(message.getBytes()).sendAsync();
+             futures.add(future);
+        }
+        Thread.sleep(3000);
+        futures.get(0).exceptionally(ex -> {
+            long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
+            Assert.assertEquals(sequenceId, 0L);
+            return null;
+        });
+        futures.get(1).exceptionally(ex -> {
+            long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
+            Assert.assertEquals(sequenceId, 1L);
+            return null;
+        });
+        futures.get(2).exceptionally(ex -> {
+            long sequenceId = ((PulsarClientException) 
ex.getCause()).getSequenceId();
+            Assert.assertEquals(sequenceId, 2L);
+            return null;
+        });
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
     public void testSendCallBack() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 607c3db..16af009 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ExecutionException;
  */
 @SuppressWarnings("serial")
 public class PulsarClientException extends IOException {
-
+    private long sequenceId = -1;
     /**
      * Constructs an {@code PulsarClientException} with the specified detail 
message.
      *
@@ -40,6 +40,20 @@ public class PulsarClientException extends IOException {
     }
 
     /**
+     * Constructs an {@code PulsarClientException} with the specified detail 
message.
+     *
+     * @param msg
+     *        The detail message (which is saved for later retrieval
+     *        by the {@link #getMessage()} method)
+     * @param sequenceId
+     *        The sequenceId of the message
+     */
+    public PulsarClientException(String msg, long sequenceId) {
+        super(msg);
+        this.sequenceId = sequenceId;
+    }
+
+    /**
      * Constructs an {@code PulsarClientException} with the specified cause.
      *
      * @param t
@@ -52,6 +66,21 @@ public class PulsarClientException extends IOException {
     }
 
     /**
+     * Constructs an {@code PulsarClientException} with the specified cause.
+     *
+     * @param t
+     *        The cause (which is saved for later retrieval by the
+     *        {@link #getCause()} method).  (A null value is permitted,
+     *        and indicates that the cause is nonexistent or unknown.)
+     * @param sequenceId
+     *        The sequenceId of the message
+     */
+    public PulsarClientException(Throwable t, long sequenceId) {
+        super(t);
+        this.sequenceId = sequenceId;
+    }
+
+    /**
      * Invalid Service URL exception thrown by Pulsar client.
      */
     public static class InvalidServiceURL extends PulsarClientException {
@@ -141,6 +170,20 @@ public class PulsarClientException extends IOException {
         }
 
         /**
+         * Constructs an {@code TimeoutException} with the specified cause.
+         *
+         * @param t
+         *        The cause (which is saved for later retrieval by the
+         *        {@link #getCause()} method).  (A null value is permitted,
+         *        and indicates that the cause is nonexistent or unknown.)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public TimeoutException(Throwable t, long sequenceId) {
+            super(t, sequenceId);
+        }
+
+        /**
          * Constructs an {@code TimeoutException} with the specified detail 
message.
          *
          * @param msg
@@ -150,6 +193,18 @@ public class PulsarClientException extends IOException {
         public TimeoutException(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code TimeoutException} with the specified detail 
message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         */
+        public TimeoutException(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
+
     }
 
     /**
@@ -270,6 +325,19 @@ public class PulsarClientException extends IOException {
         public AlreadyClosedException(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code AlreadyClosedException} with the specified 
detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public AlreadyClosedException(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
     }
 
     /**
@@ -286,6 +354,19 @@ public class PulsarClientException extends IOException {
         public TopicTerminatedException(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code TopicTerminatedException} with the specified 
detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public TopicTerminatedException(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
     }
 
     /**
@@ -448,6 +529,10 @@ public class PulsarClientException extends IOException {
         public NotConnectedException() {
             super("Not connected to broker");
         }
+
+        public NotConnectedException(long sequenceId) {
+            super("Not connected to broker", sequenceId);
+        }
     }
 
     /**
@@ -464,6 +549,19 @@ public class PulsarClientException extends IOException {
         public InvalidMessageException(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code InvalidMessageException} with the specified 
detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public InvalidMessageException(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
     }
 
     /**
@@ -512,6 +610,19 @@ public class PulsarClientException extends IOException {
         public ProducerQueueIsFullError(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code ProducerQueueIsFullError} with the specified 
detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public ProducerQueueIsFullError(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
     }
 
     /**
@@ -720,6 +831,14 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    public long getSequenceId() {
+        return sequenceId;
+    }
+
+    public void setSequenceId(long sequenceId) {
+        this.sequenceId = sequenceId;
+    }
+
     public static boolean isRetriableError(Throwable t) {
         if (t instanceof AuthorizationException
                 || t instanceof InvalidServiceURL
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6585842..f7768a4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -342,11 +342,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     public void sendAsync(Message<?> message, SendCallback callback) {
         checkArgument(message instanceof MessageImpl);
 
-        if (!isValidProducerState(callback)) {
+        if (!isValidProducerState(callback, message.getSequenceId())) {
             return;
         }
 
-        if (!canEnqueueRequest(callback)) {
+        if (!canEnqueueRequest(callback, message.getSequenceId())) {
             return;
         }
 
@@ -381,7 +381,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
             PulsarClientException.InvalidMessageException 
invalidMessageException =
                 new PulsarClientException.InvalidMessageException(
-                    format("The producer %s of the topic %s can not reuse the 
same message", producerName, topic));
+                    format("The producer %s of the topic %s can not reuse the 
same message", producerName, topic), msg.getSequenceId());
             completeCallbackAndReleaseSemaphore(callback, 
invalidMessageException);
             compressedPayload.release();
             return;
@@ -477,9 +477,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 }
             }
         } catch (PulsarClientException e) {
+            e.setSequenceId(msg.getSequenceId());
             completeCallbackAndReleaseSemaphore(callback, e);
         } catch (Throwable t) {
-            completeCallbackAndReleaseSemaphore(callback, new 
PulsarClientException(t));
+            completeCallbackAndReleaseSemaphore(callback, new 
PulsarClientException(t, msg.getSequenceId()));
         }
     }
 
@@ -492,7 +493,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
         if (!isMultiSchemaEnabled(true)) {
             PulsarClientException.InvalidMessageException e = new 
PulsarClientException.InvalidMessageException(
-                    format("The producer %s of the topic %s is disabled the 
`MultiSchema`", producerName, topic));
+                    format("The producer %s of the topic %s is disabled the 
`MultiSchema`", producerName, topic)
+                    , msg.getSequenceId());
             completeCallbackAndReleaseSemaphore(callback, e);
             return false;
         }
@@ -626,7 +628,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
     }
 
-    private boolean isValidProducerState(SendCallback callback) {
+    private boolean isValidProducerState(SendCallback callback, long 
sequenceId) {
         switch (getState()) {
         case Ready:
             // OK
@@ -637,32 +639,32 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return true;
         case Closing:
         case Closed:
-            callback.sendComplete(new 
PulsarClientException.AlreadyClosedException("Producer already closed"));
+            callback.sendComplete(new 
PulsarClientException.AlreadyClosedException("Producer already closed", 
sequenceId));
             return false;
         case Terminated:
-            callback.sendComplete(new 
PulsarClientException.TopicTerminatedException("Topic was terminated"));
+            callback.sendComplete(new 
PulsarClientException.TopicTerminatedException("Topic was terminated", 
sequenceId));
             return false;
         case Failed:
         case Uninitialized:
         default:
-            callback.sendComplete(new 
PulsarClientException.NotConnectedException());
+            callback.sendComplete(new 
PulsarClientException.NotConnectedException(sequenceId));
             return false;
         }
     }
 
-    private boolean canEnqueueRequest(SendCallback callback) {
+    private boolean canEnqueueRequest(SendCallback callback, long sequenceId) {
         try {
             if (conf.isBlockIfQueueFull()) {
                 semaphore.acquire();
             } else {
                 if (!semaphore.tryAcquire()) {
-                    callback.sendComplete(new 
PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
+                    callback.sendComplete(new 
PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", 
sequenceId));
                     return false;
                 }
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            callback.sendComplete(new PulsarClientException(e));
+            callback.sendComplete(new PulsarClientException(e, sequenceId));
             return false;
         }
 
@@ -1352,7 +1354,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
                     PulsarClientException te = new 
PulsarClientException.TimeoutException(
                         format("The producer %s can not send message to the 
topic %s within given timeout",
-                            producerName, topic));
+                            producerName, topic), firstMsg.sequenceId);
                     failPendingMessages(cnx(), te);
                     stats.incrementSendFailed(pendingMessages.size());
                     // Since the pending queue is cleared now, set timer to 
expire after configured value.
@@ -1380,6 +1382,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 try {
                     // Need to protect ourselves from any exception being 
thrown in the future handler from the
                     // application
+                    ex.setSequenceId(op.sequenceId);
                     op.callback.sendComplete(ex);
                 } catch (Throwable t) {
                     log.warn("[{}] [{}] Got exception while completing the 
callback for msg {}:", topic, producerName,
@@ -1529,13 +1532,13 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             Thread.currentThread().interrupt();
             releaseSemaphoreForSendOp(op);
             if (op != null) {
-                op.callback.sendComplete(new PulsarClientException(ie));
+                op.callback.sendComplete(new PulsarClientException(ie, 
op.sequenceId));
             }
         } catch (Throwable t) {
             releaseSemaphoreForSendOp(op);
             log.warn("[{}] [{}] error while closing out batch -- {}", topic, 
producerName, t);
             if (op != null) {
-                op.callback.sendComplete(new PulsarClientException(t));
+                op.callback.sendComplete(new PulsarClientException(t, 
op.sequenceId));
             }
         }
     }

Reply via email to