lhotari commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r822335749
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -935,6 +943,59 @@ protected WriteInEventLoopCallback
newObject(Handle<WriteInEventLoopCallback> ha
};
}
+ private static final class LastSendFutureWrapper {
+ private CompletableFuture<MessageId> lastSendFuture;
+ private AtomicBoolean onceHandle;
+
+ static LastSendFutureWrapper create(CompletableFuture<MessageId>
lastSendFuture) {
+ LastSendFutureWrapper lastSendFutureWrapper = RECYCLER.get();
+ lastSendFutureWrapper.lastSendFuture = lastSendFuture;
+ lastSendFutureWrapper.onceHandle = new AtomicBoolean(false);
+
+ return lastSendFutureWrapper;
+ }
+
+ public CompletableFuture<Void> handleOnce() {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ if (onceHandle.compareAndSet(false, true)) {
+ lastSendFuture.handle((ignore, throwable) -> {
+ if (throwable == null) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(throwable);
+ }
+ return null;
+ });
+
+ return future;
+ }
+
+ future.complete(null);
+
+ return future;
+ }
+
+ private void recycle() {
+ lastSendFuture = null;
+ onceHandle = null;
+ recyclerHandle.recycle(this);
+ }
+
+ private final Handle<LastSendFutureWrapper> recyclerHandle;
+
+ private LastSendFutureWrapper(Handle<LastSendFutureWrapper>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<LastSendFutureWrapper> RECYCLER = new
Recycler<LastSendFutureWrapper>() {
+ @Override
+ protected LastSendFutureWrapper
newObject(Handle<LastSendFutureWrapper> handle) {
+ return new LastSendFutureWrapper(handle);
+ }
+ };
Review comment:
I think that recycling and reusing objects causes more harm than
benefits. I'd like to suggest that the instances aren't reused at all.
##########
File path:
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
##########
@@ -264,4 +266,49 @@ public void testGetNumOfPartitions() throws Exception {
assertEquals(producerImpl.getNumOfPartitions(), 0);
}
+ @Test
+ public void testFlushWhenLastSendFutureFailed() {
+ ProducerConfigurationData producerConfData = new
ProducerConfigurationData();
+ ProducerImpl<Object> producerImpl = client.newProducerImpl(TOPIC_NAME,
0, producerConfData,
+ null, null, null, Optional.empty());
+
+ // 1. When no data is sent to this producerImpl,
+ // its lastSendFuture is always in normal completion state
+ CompletableFuture<Void> lastSendFuture =
producerImpl.getLastSendFuture();
+ assertTrue(lastSendFuture.isDone());
+ assertFalse(lastSendFuture.isCompletedExceptionally());
+
+ // 2. We set the lastSendFuture of this partition to an abnormal state,
+ // simulating that an exception occurred during the sending process
+ final String failedMessage = "failed last send future";
+ producerImpl.setLastSendFuture(FutureUtil.failedFuture(new
Throwable(failedMessage)));
+
+ // 3. So when we get its lastSendFuture again,
+ // the future is already in an abnormal failure state
+ lastSendFuture = producerImpl.getLastSendFuture();
+ assertTrue(lastSendFuture.isDone());
+ assertTrue(lastSendFuture.isCompletedExceptionally());
+ // 4. The following simple simulation application captures the
exception processing
+ lastSendFuture.exceptionally(throwable -> {
+ assertNotNull(throwable);
+ assertEquals(throwable.getMessage(), failedMessage);
+ return null;
+ });
+
+ // 5. We have already handled the exception in the step 4,
+ // and then PartitionedProducerImpl will continue to send data.
+ // It should be noted that when this partition is not selected for
data transmission this time,
+ // its lastSendFuture is still the future that was in an abnormal
state in the previous step.
+ // When the application calls the flush operation again, in the
previous logic,
+ // its exception future will be returned to the application,
+ // causing the application to always execute the exception handling
logic.
+ // In fact, we have handled the exception before,
+ // and we did not send data to this partition this time,
+ // it should not affect this transmission.
+ // So we want the lastSendFuture here to be in normal state.
+ lastSendFuture = producerImpl.getLastSendFuture();
+ assertTrue(lastSendFuture.isDone());
Review comment:
@wenbingshen Here are examples of testing the external behavior instead
of observing internal state:
https://github.com/apache/pulsar/blob/22a0beb5d4dc4c57fede791194573535bc7b8b10/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L572-L607
Most Pulsar broker tests are more or less integration tests. The problem
with Pulsar coding style is that it's not very modular. Class based unit tests
aren't really meaningful since the class is not a modular unit in most cases. A
meaningful unit is hardly ever a single class and it is considered as an
anti-pattern to have class based unit tests since it leads to the bad test
design and tests that are a maintenance burden (references: [Gulliver's Travels
Tests](https://dhickey.ie/2014/03/03/gullivers-travels-test/), [What is a unit
test](https://tanzu.vmware.com/content/blog/what-is-a-unit-test-the-answer-might-surprise-you))
With modularity I mean in this case "units" (not necessarily a single class)
would have a well defined public interface. In unit testing, only public
interfaces should be tested and outputs should be asserted. "output" could be
also calls to other collaborators. Different developers have different
preferences of unit testing styles and there are multiple schools of how to do
unit testing (for example ["Detroit and London Schools of
TDD"](https://blog.devgenius.io/detroit-and-london-schools-of-test-driven-development-3d2f8dca71e5))
. In Pulsar, we haven't had these deep conversations about the styles for TDD
and how to improve the situation.
I think that integration tests have more value than tests that observe
internal private state. Once you let go of naming the test class to match the
implementation class, it will be easier to name the test class. In Pulsar, the
"SimpleProducerConsumerTest" class is a big container for a lot of
(integration) tests. That style isn't optimal either. I think it's better to
create more test classes that are categorized by behavior or scenarios instead
of dumping everything in "SimpleProducerConsumerTest". There's already the
"ProducerConsumerBase" base class and creating new test classes is easy. In
some ways, "TopicDuplicationTest" is a good example where certain scenarios are
in the same test class instead of mixing a lot of different type of scenarios
as there is in SimpleProducerConsumerTest . I hope there's a useful starting
point based on this information.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -935,6 +943,59 @@ protected WriteInEventLoopCallback
newObject(Handle<WriteInEventLoopCallback> ha
};
}
+ private static final class LastSendFutureWrapper {
+ private CompletableFuture<MessageId> lastSendFuture;
+ private AtomicBoolean onceHandle;
+
+ static LastSendFutureWrapper create(CompletableFuture<MessageId>
lastSendFuture) {
+ LastSendFutureWrapper lastSendFutureWrapper = RECYCLER.get();
+ lastSendFutureWrapper.lastSendFuture = lastSendFuture;
+ lastSendFutureWrapper.onceHandle = new AtomicBoolean(false);
+
+ return lastSendFutureWrapper;
+ }
+
+ public CompletableFuture<Void> handleOnce() {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ if (onceHandle.compareAndSet(false, true)) {
+ lastSendFuture.handle((ignore, throwable) -> {
+ if (throwable == null) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(throwable);
+ }
+ return null;
+ });
+
+ return future;
+ }
+
+ future.complete(null);
+
+ return future;
+ }
+
+ private void recycle() {
+ lastSendFuture = null;
+ onceHandle = null;
+ recyclerHandle.recycle(this);
+ }
+
+ private final Handle<LastSendFutureWrapper> recyclerHandle;
+
+ private LastSendFutureWrapper(Handle<LastSendFutureWrapper>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<LastSendFutureWrapper> RECYCLER = new
Recycler<LastSendFutureWrapper>() {
+ @Override
+ protected LastSendFutureWrapper
newObject(Handle<LastSendFutureWrapper> handle) {
+ return new LastSendFutureWrapper(handle);
+ }
+ };
+ }
Review comment:
After removing the recycling solution and simplifying `handleOnce`, the
code can be refactored to this:
```suggestion
private static final class LastSendFutureWrapper {
private final CompletableFuture<MessageId> lastSendFuture;
private final AtomicBoolean throwOnce = new AtomicBoolean(false);
private LastSendFutureWrapper(CompletableFuture<MessageId>
lastSendFuture) {
this.lastSendFuture = lastSendFuture;
}
static LastSendFutureWrapper create(CompletableFuture<MessageId>
lastSendFuture) {
return new LastSendFutureWrapper(lastSendFuture);
}
public CompletableFuture<Void> handleOnce() {
return lastSendFuture.handle((ignore, t) -> {
if (t != null && throwOnce.compareAndSet(false, true)) {
throw FutureUtil.wrapToCompletionException(t);
}
return null;
});
}
}
```
`handleOnce` can call `.handle` on every call and there will be no need to
have an extra CompletableFuture so it simplifies the solution.
Rethrowing the exception in CompetableFuture methods requires wrapping the
exception in a `CompletionException` unless it's already
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]