lhotari commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r822335749



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -935,6 +943,59 @@ protected WriteInEventLoopCallback 
newObject(Handle<WriteInEventLoopCallback> ha
         };
     }
 
+    private static final class LastSendFutureWrapper {
+        private CompletableFuture<MessageId> lastSendFuture;
+        private AtomicBoolean onceHandle;
+
+        static LastSendFutureWrapper create(CompletableFuture<MessageId> 
lastSendFuture) {
+            LastSendFutureWrapper lastSendFutureWrapper = RECYCLER.get();
+            lastSendFutureWrapper.lastSendFuture = lastSendFuture;
+            lastSendFutureWrapper.onceHandle = new AtomicBoolean(false);
+
+            return lastSendFutureWrapper;
+        }
+
+        public CompletableFuture<Void> handleOnce() {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            if (onceHandle.compareAndSet(false, true)) {
+                lastSendFuture.handle((ignore, throwable) -> {
+                    if (throwable == null) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(throwable);
+                    }
+                    return null;
+                });
+
+                return future;
+            }
+
+            future.complete(null);
+
+            return future;
+        }
+
+        private void recycle() {
+            lastSendFuture = null;
+            onceHandle = null;
+            recyclerHandle.recycle(this);
+        }
+
+        private final Handle<LastSendFutureWrapper> recyclerHandle;
+
+        private LastSendFutureWrapper(Handle<LastSendFutureWrapper> 
recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<LastSendFutureWrapper> RECYCLER = new 
Recycler<LastSendFutureWrapper>() {
+            @Override
+            protected LastSendFutureWrapper 
newObject(Handle<LastSendFutureWrapper> handle) {
+                return new LastSendFutureWrapper(handle);
+            }
+        };

Review comment:
       I think that recycling and reusing objects causes more harm than 
benefits. I'd like to suggest that the instances aren't reused at all.

##########
File path: 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
##########
@@ -264,4 +266,49 @@ public void testGetNumOfPartitions() throws Exception {
         assertEquals(producerImpl.getNumOfPartitions(), 0);
     }
 
+    @Test
+    public void testFlushWhenLastSendFutureFailed() {
+        ProducerConfigurationData producerConfData = new 
ProducerConfigurationData();
+        ProducerImpl<Object> producerImpl = client.newProducerImpl(TOPIC_NAME, 
0, producerConfData,
+                null, null, null, Optional.empty());
+
+        // 1. When no data is sent to this producerImpl,
+        // its lastSendFuture is always in normal completion state
+        CompletableFuture<Void> lastSendFuture = 
producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertFalse(lastSendFuture.isCompletedExceptionally());
+
+        // 2. We set the lastSendFuture of this partition to an abnormal state,
+        // simulating that an exception occurred during the sending process
+        final String failedMessage = "failed last send future";
+        producerImpl.setLastSendFuture(FutureUtil.failedFuture(new 
Throwable(failedMessage)));
+
+        // 3. So when we get its lastSendFuture again,
+        // the future is already in an abnormal failure state
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertTrue(lastSendFuture.isCompletedExceptionally());
+        // 4. The following simple simulation application captures the 
exception processing
+        lastSendFuture.exceptionally(throwable -> {
+            assertNotNull(throwable);
+            assertEquals(throwable.getMessage(), failedMessage);
+            return null;
+        });
+
+        // 5. We have already handled the exception in the step 4,
+        // and then PartitionedProducerImpl will continue to send data.
+        // It should be noted that when this partition is not selected for 
data transmission this time,
+        // its lastSendFuture is still the future that was in an abnormal 
state in the previous step.
+        // When the application calls the flush operation again, in the 
previous logic,
+        // its exception future will be returned to the application,
+        // causing the application to always execute the exception handling 
logic.
+        // In fact, we have handled the exception before,
+        // and we did not send data to this partition this time,
+        // it should not affect this transmission.
+        // So we want the lastSendFuture here to be in normal state.
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());

Review comment:
       @wenbingshen Here are examples of testing the external behavior instead 
of observing internal state: 
https://github.com/apache/pulsar/blob/22a0beb5d4dc4c57fede791194573535bc7b8b10/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L572-L607
   
   Most Pulsar broker tests are more or less integration tests. The problem 
with Pulsar coding style is that it's not very modular. Class based unit tests 
aren't really meaningful since the class is not a modular unit in most cases. A 
meaningful unit is hardly ever a single class and it is considered as an 
anti-pattern to have class based unit tests since it leads to the bad test 
design and tests that are a maintenance burden (references: [Gulliver's Travels 
Tests](https://dhickey.ie/2014/03/03/gullivers-travels-test/), [What is a unit 
test](https://tanzu.vmware.com/content/blog/what-is-a-unit-test-the-answer-might-surprise-you))
   
   With modularity I mean in this case "units" (not necessarily a single class) 
would have a well defined public interface. In unit testing, only public 
interfaces should be tested and outputs should be asserted. "output" could be 
also calls to other collaborators. Different developers have different 
preferences of unit testing styles and there are multiple schools of how to do 
unit testing (for example ["Detroit and London Schools of 
TDD"](https://blog.devgenius.io/detroit-and-london-schools-of-test-driven-development-3d2f8dca71e5))
 . In Pulsar, we haven't had these deep conversations about the styles for TDD 
and how to improve the situation.
   
   I think that integration tests have more value than tests that observe 
internal private state. Once you let go of naming the test class to match the 
implementation class, it will be easier to name the test class. In Pulsar, the 
"SimpleProducerConsumerTest" class is a big container for a lot of 
(integration) tests. That style isn't optimal either. I think it's better to 
create more test classes that are categorized by behavior or scenarios instead 
of dumping everything in "SimpleProducerConsumerTest". There's already the 
"ProducerConsumerBase" base class and creating new test classes is easy. In 
some ways, "TopicDuplicationTest" is a good example where certain scenarios are 
in the same test class instead of mixing a lot of different type of scenarios 
as there is in SimpleProducerConsumerTest . I hope there's a useful starting 
point based on this information.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -935,6 +943,59 @@ protected WriteInEventLoopCallback 
newObject(Handle<WriteInEventLoopCallback> ha
         };
     }
 
+    private static final class LastSendFutureWrapper {
+        private CompletableFuture<MessageId> lastSendFuture;
+        private AtomicBoolean onceHandle;
+
+        static LastSendFutureWrapper create(CompletableFuture<MessageId> 
lastSendFuture) {
+            LastSendFutureWrapper lastSendFutureWrapper = RECYCLER.get();
+            lastSendFutureWrapper.lastSendFuture = lastSendFuture;
+            lastSendFutureWrapper.onceHandle = new AtomicBoolean(false);
+
+            return lastSendFutureWrapper;
+        }
+
+        public CompletableFuture<Void> handleOnce() {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            if (onceHandle.compareAndSet(false, true)) {
+                lastSendFuture.handle((ignore, throwable) -> {
+                    if (throwable == null) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(throwable);
+                    }
+                    return null;
+                });
+
+                return future;
+            }
+
+            future.complete(null);
+
+            return future;
+        }
+
+        private void recycle() {
+            lastSendFuture = null;
+            onceHandle = null;
+            recyclerHandle.recycle(this);
+        }
+
+        private final Handle<LastSendFutureWrapper> recyclerHandle;
+
+        private LastSendFutureWrapper(Handle<LastSendFutureWrapper> 
recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<LastSendFutureWrapper> RECYCLER = new 
Recycler<LastSendFutureWrapper>() {
+            @Override
+            protected LastSendFutureWrapper 
newObject(Handle<LastSendFutureWrapper> handle) {
+                return new LastSendFutureWrapper(handle);
+            }
+        };
+    }

Review comment:
       After removing the recycling solution and simplifying `handleOnce`, the 
code can be refactored to this:
   
   ```suggestion
       private static final class LastSendFutureWrapper {
           private final CompletableFuture<MessageId> lastSendFuture;
           private final AtomicBoolean throwOnce = new AtomicBoolean(false);
   
           private LastSendFutureWrapper(CompletableFuture<MessageId> 
lastSendFuture) {
               this.lastSendFuture = lastSendFuture;
           }
   
           static LastSendFutureWrapper create(CompletableFuture<MessageId> 
lastSendFuture) {
               return new LastSendFutureWrapper(lastSendFuture);
           }
   
           public CompletableFuture<Void> handleOnce() {
               return lastSendFuture.handle((ignore, t) -> {
                   if (t != null && throwOnce.compareAndSet(false, true)) {
                       throw FutureUtil.wrapToCompletionException(t);
                   }
                   return null;
               });
           }
       }
   ```
   
   `handleOnce` can call `.handle` on every call and there will be no need to 
have an extra CompletableFuture so it simplifies the solution.
   Rethrowing the exception in CompetableFuture methods requires wrapping the 
exception in a `CompletionException` unless it's already 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to