This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dbb157793b9  [fix][test]Fix flaky test because the too short receive 
time (#21273)
dbb157793b9 is described below

commit dbb157793b90e203f01a630fb0f0abafbb337762
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Oct 7 18:01:02 2023 +0800

     [fix][test]Fix flaky test because the too short receive time (#21273)
---
 .../client/impl/TransactionEndToEndTest.java       | 104 +++++++++++----------
 1 file changed, 53 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 348fb04b7dd..da8492e612f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -96,6 +96,8 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
     protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
     protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + 
"/message-ack-test";
     protected static final int NUM_PARTITIONS = 16;
+    private static final int waitTimeForCanReceiveMsgInSec = 5;
+    private static final int waitTimeForCannotReceiveMsgInSec = 5;
     @BeforeClass
     protected void setup() throws Exception {
         conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
@@ -173,7 +175,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
 
         // can't receive message anymore
-        assertNull(consumer.receive(2, TimeUnit.SECONDS));
+        assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS));
     }
 
 
@@ -240,14 +242,14 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .enableBatchIndexAcknowledgment(true)
                 .subscribe();
 
-        Message<Integer> message = consumer.receive(3, TimeUnit.SECONDS);
+        Message<Integer> message = 
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
         Assert.assertNull(message);
 
         // abort txn1
         txn1.abort().get();
         // after txn1 aborted, consumer will receive messages txn1 contains
         int receiveCounter = 0;
-        while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) {
+        while((message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS)) != null) {
             Assert.assertEquals(message.getValue().intValue(), receiveCounter);
             receiveCounter ++;
         }
@@ -347,7 +349,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
 
         // Can't receive transaction messages before commit.
-        Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
+        Message<byte[]> message = 
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
         Assert.assertNull(message);
 
         txn1.commit().get();
@@ -355,13 +357,13 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         int receiveCnt = 0;
         for (int i = 0; i < txnMessageCnt; i++) {
-            message = consumer.receive(5, TimeUnit.SECONDS);
+            message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             receiveCnt ++;
         }
         Assert.assertEquals(txnMessageCnt, receiveCnt);
 
-        message = consumer.receive(300, TimeUnit.MILLISECONDS);
+        message = consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS);
         Assert.assertNull(message);
 
         // cleanup.
@@ -398,13 +400,13 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Awaitility.await().until(consumer::isConnected);
 
         // Can't receive transaction messages before abort.
-        Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
+        Message<byte[]> message = 
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
         Assert.assertNull(message);
 
         txn.abort().get();
 
         // Cant't receive transaction messages after abort.
-        message = consumer.receive(300, TimeUnit.MILLISECONDS);
+        message = consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS);
         Assert.assertNull(message);
         Awaitility.await().until(() -> {
             boolean flag = true;
@@ -492,7 +494,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Transaction txn = getTxn();
 
         for (int i = 0; i < messageCount / 2; i++) {
-            Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+            Message<byte[]> message = 
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
             consumer.acknowledgeAsync(message.getMessageId(), txn).get();
         }
 
@@ -572,14 +574,14 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
             // consume and ack messages with txn
             for (int i = 0; i < messageCnt; i++) {
-                Message<byte[]> message = consumer.receive(5, 
TimeUnit.SECONDS);
+                Message<byte[]> message = 
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
                 log.info("receive msgId: {}, count : {}", 
message.getMessageId(), i);
                 consumer.acknowledgeAsync(message.getMessageId(), txn).get();
             }
 
             // the messages are pending ack state and can't be received
-            Message<byte[]> message = consumer.receive(300, 
TimeUnit.MILLISECONDS);
+            Message<byte[]> message = 
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
             Assert.assertNull(message);
 
             // 1) txn abort
@@ -588,7 +590,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             // after transaction abort, the messages could be received
             Transaction commitTxn = getTxn();
             for (int i = 0; i < messageCnt; i++) {
-                message = consumer.receive(2, TimeUnit.SECONDS);
+                message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
                 consumer.acknowledgeAsync(message.getMessageId(), 
commitTxn).get();
                 log.info("receive msgId: {}, count: {}", 
message.getMessageId(), i);
@@ -598,7 +600,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             commitTxn.commit().get();
 
             // after transaction commit, the messages can't be received
-            message = consumer.receive(300, TimeUnit.MILLISECONDS);
+            message = consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS);
             Assert.assertNull(message);
 
             Field field = TransactionImpl.class.getDeclaredField("state");
@@ -635,7 +637,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .topic(topicTwo).subscriptionName(sub).subscribe();
         String content = "test";
         producer.send(content);
-        assertEquals(consumer.receive(3, TimeUnit.SECONDS).getValue(), 
content);
+        assertEquals(consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getValue(), content);
 
         // cleanup.
         producer.close();
@@ -674,7 +676,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         log.info("produce transaction messages finished");
 
         // Can't receive transaction messages before commit.
-        Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
+        Message<byte[]> message = 
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
         Assert.assertNull(message);
         log.info("transaction messages can't be received before transaction 
committed");
 
@@ -683,7 +685,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         int ackedMessageCount = 0;
         int receiveCnt = 0;
         for (int i = 0; i < messageCnt; i++) {
-            message = consumer.receive(5, TimeUnit.SECONDS);
+            message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             receiveCnt ++;
             if (i % 2 == 0) {
@@ -693,7 +695,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
         Assert.assertEquals(messageCnt, receiveCnt);
 
-        message = consumer.receive(300, TimeUnit.MILLISECONDS);
+        message = consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS);
         Assert.assertNull(message);
 
         String checkTopic = TopicName.get(topic).getPartition(0).toString();
@@ -705,14 +707,14 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         receiveCnt = 0;
         for (int i = 0; i < messageCnt - ackedMessageCount; i++) {
-            message = consumer.receive(2, TimeUnit.SECONDS);
+            message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             consumer.acknowledge(message);
             receiveCnt ++;
         }
         Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt);
 
-        message = consumer.receive(300, TimeUnit.MILLISECONDS);
+        message = consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS);
         Assert.assertNull(message);
 
         topic = TopicName.get(topic).getPartition(0).toString();
@@ -803,7 +805,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             Message<byte[]> message = null;
             Thread.sleep(1000L);
             for (int i = 0; i < messageCnt; i++) {
-                message = consumer.receive(1, TimeUnit.SECONDS);
+                message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
                 if (i % 3 == 0) {
                     
consumer.acknowledgeCumulativeAsync(message.getMessageId(), abortTxn).get();
@@ -828,14 +830,14 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             }
 
             // the messages are pending ack state and can't be received
-            message = consumer.receive(300, TimeUnit.MILLISECONDS);
+            message = consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS);
             Assert.assertNull(message);
 
             abortTxn.abort().get();
             consumer.redeliverUnacknowledgedMessages();
             Transaction commitTxn = getTxn();
             for (int i = 0; i < messageCnt; i++) {
-                message = consumer.receive(1, TimeUnit.SECONDS);
+                message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
                 if (i % 3 == 0) {
                     
consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTxn).get();
@@ -857,7 +859,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 Assert.assertTrue(reCommitError.getCause() instanceof 
TransactionNotFoundException);
             }
 
-            message = consumer.receive(300, TimeUnit.MILLISECONDS);
+            message = consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS);
             Assert.assertNull(message);
         }
 
@@ -919,7 +921,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Awaitility.await().until(consumer::isConnected);
 
         for (int i = 0; i < txnCnt * messageCnt; i++) {
-            Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+            Message<byte[]> message = 
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
         }
 
@@ -960,7 +962,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             txn.commit().get();
 
             for (int i = 0; i < 1000; i++) {
-                Message<byte[]> message = consumer.receive(5, 
TimeUnit.SECONDS);
+                Message<byte[]> message = 
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
                 Assert.assertEquals(Integer.valueOf(new 
String(message.getData())), Integer.valueOf(i));
             }
@@ -1016,7 +1018,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
 
 
-        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+        Message<byte[]> message = 
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
         consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
         consumeTxn.commit().get();
         try {
@@ -1062,7 +1064,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         constructor.setAccessible(true);
 
         TransactionImpl timeoutTxnSkipClientTimeout = 
constructor.newInstance(pulsarClient, 5,
-                        timeoutTxn.getTxnID().getLeastSigBits(), 
timeoutTxn.getTxnID().getMostSigBits());
+                timeoutTxn.getTxnID().getLeastSigBits(), 
timeoutTxn.getTxnID().getMostSigBits());
 
         try {
             timeoutTxnSkipClientTimeout.commit().get();
@@ -1092,7 +1094,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .newTransaction(new TransactionCoordinatorID(0), 1, 
null).get();
         Awaitility.await().until(() -> {
             try {
-               
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
+                
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
                 return false;
             } catch (Exception e) {
                 return true;
@@ -1125,17 +1127,17 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         Transaction consumeTimeoutTxn = pulsarClient
                 .newTransaction()
-                .withTransactionTimeout(3, TimeUnit.SECONDS)
+                .withTransactionTimeout(7, TimeUnit.SECONDS)
                 .build().get();
 
-        Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+        Message<String> message = 
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
 
         consumer.acknowledgeAsync(message.getMessageId(), 
consumeTimeoutTxn).get();
 
-        Message<String> reReceiveMessage = consumer.receive(300, 
TimeUnit.MILLISECONDS);
+        Message<String> reReceiveMessage = 
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
         assertNull(reReceiveMessage);
 
-        reReceiveMessage = consumer.receive(5, TimeUnit.SECONDS);
+        reReceiveMessage = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
 
         assertEquals(reReceiveMessage.getValue(), message.getValue());
 
@@ -1182,9 +1184,9 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
         Transaction txn = getTxn();
         if (ackType == CommandAck.AckType.Individual) {
-            consumer.acknowledgeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), txn);
+            
consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getMessageId(), txn);
         } else {
-            consumer.acknowledgeCumulativeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), txn);
+            
consumer.acknowledgeCumulativeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec,
 TimeUnit.SECONDS).getMessageId(), txn);
         }
         topic = TopicName.get(topic).toString();
         boolean exist = false;
@@ -1307,7 +1309,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                     .InvalidTxnStatusException);
         }
         try {
-            Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+            Message<String> message = 
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
             consumer.acknowledgeAsync(message.getMessageId(), 
transaction).get();
             Assert.fail();
         } catch (Exception e) {
@@ -1349,7 +1351,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         Message<byte[]> message = null;
         for (int i = 0; i < transactionCumulativeAck; i++) {
-            message = consumer.receive(5, TimeUnit.SECONDS);
+            message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
         }
 
         // receive transaction in order
@@ -1372,7 +1374,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         // receive the rest of the message
         for (int i = 0; i < count; i++) {
-            message = consumer.receive(5, TimeUnit.SECONDS);
+            message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
         }
 
         Transaction commitTransaction = getTxn();
@@ -1385,7 +1387,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         commitTransaction.commit().get();
 
         // then redeliver will not receive any message
-        message = consumer.receive(300, TimeUnit.MILLISECONDS);
+        message = consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS);
         assertNull(message);
 
         // cleanup.
@@ -1460,7 +1462,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         // receive the batch messages add to a list
         for (int i = 0; i < 5; i++) {
-            messageIds.add(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId());
+            messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getMessageId());
         }
 
         MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
@@ -1520,7 +1522,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .build().get();
 
         // consumer receive the message the first time, redeliverCount = 0
-        consumer.acknowledgeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), transaction).get();
+        
consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getMessageId(), transaction).get();
 
         transaction.abort().get();
 
@@ -1528,17 +1530,17 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .build().get();
 
         // consumer receive the message the second time, redeliverCount = 1, 
also can be received
-        consumer.acknowledgeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), transaction).get();
+        
consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getMessageId(), transaction).get();
 
         transaction.abort().get();
 
         // consumer receive the message the third time, redeliverCount = 2,
         // the message will be sent to DLQ, can't receive
-        assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));
+        assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS));
 
         assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);
 
-        assertEquals(value, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
+        assertEquals(value, new 
String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getValue()));
 
         // cleanup.
         consumer.close();
@@ -1584,7 +1586,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
                 .build().get();
 
-        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+        Message<byte[]> message = 
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
         assertEquals(value1, new String(message.getValue()));
         // consumer receive the batch message one the first time, 
redeliverCount = 0
         consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
@@ -1594,7 +1596,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         // consumer will receive the batch message two and then receive
         // the message one and message two again, redeliverCount = 1
         for (int i = 0; i < 3; i ++) {
-            message = consumer.receive(5, TimeUnit.SECONDS);
+            message = consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
         }
 
         transaction = pulsarClient.newTransaction().withTransactionTimeout(5, 
TimeUnit.MINUTES)
@@ -1608,12 +1610,12 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         // consumer receive the batch message the third time, redeliverCount = 
2,
         // the message will be sent to DLQ, can't receive
-        assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));
+        assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec, 
TimeUnit.SECONDS));
 
         assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 6);
 
-        assertEquals(value1, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
-        assertEquals(value2, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
+        assertEquals(value1, new 
String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getValue()));
+        assertEquals(value2, new 
String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getValue()));
 
         // cleanup.
         consumer.close();
@@ -1663,17 +1665,17 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         // Failover consumer will receive the messages immediately while
         // the shared consumer will get them after the delay
-        Message<String> msg = sharedConsumer.receive(300, 
TimeUnit.MILLISECONDS);
+        Message<String> msg = 
sharedConsumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
         assertNull(msg);
 
         for (int i = 0; i < 10; i++) {
-            msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS);
+            msg = failoverConsumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
             assertEquals(msg.getValue(), "msg-" + i);
         }
 
         Set<String> receivedMsgs = new TreeSet<>();
         for (int i = 0; i < 10; i++) {
-            msg = sharedConsumer.receive(10, TimeUnit.SECONDS);
+            msg = sharedConsumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS);
             receivedMsgs.add(msg.getValue());
         }
 

Reply via email to