This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 6170ebe Add untilStarted & untilStopped to ReactiveMessagePipeline
(#214)
6170ebe is described below
commit 6170ebe7f3b6d7ee75071d9b6fb3d6a750a622e7
Author: Lari Hotari <[email protected]>
AuthorDate: Tue May 20 23:57:15 2025 +0300
Add untilStarted & untilStopped to ReactiveMessagePipeline (#214)
---
gradle/libs.versions.toml | 1 +
pulsar-client-reactive-adapter/build.gradle | 1 +
.../adapter/ReactiveMessagePipelineE2ETests.java | 35 ++++++++++++
.../client/adapter/SingletonPulsarContainer.java | 7 +++
.../internal/adapter/ReactiveConsumerAdapter.java | 15 ++++--
.../client/api/ReactiveMessagePipeline.java | 59 ++++++++++++++++++--
.../api/DefaultReactiveMessagePipeline.java | 63 +++++++++++++++++++++-
.../api/InternalConsumerListener.java} | 37 ++++++-------
.../client/api/ReactiveMessagePipelineTests.java | 43 ++++++++++++++-
9 files changed, 228 insertions(+), 33 deletions(-)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 05282e7..2fd4c57 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -52,6 +52,7 @@ log4j-slf4j2-impl = { module =
"org.apache.logging.log4j:log4j-slf4j2-impl", ver
mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
pulsar-client-api = { module = "org.apache.pulsar:pulsar-client-api",
version.ref = "pulsar" }
pulsar-client-shaded = { module = "org.apache.pulsar:pulsar-client",
version.ref = "pulsar" }
+pulsar-client-all = { module = "org.apache.pulsar:pulsar-client-all",
version.ref = "pulsar" }
rat-gradle = { module = "org.nosphere.apache:creadur-rat-gradle", version.ref
= "rat-gradle" }
reactor-core = { module = "io.projectreactor:reactor-core", version.ref =
"reactor" }
reactor-test = { module = "io.projectreactor:reactor-test", version.ref =
"reactor" }
diff --git a/pulsar-client-reactive-adapter/build.gradle
b/pulsar-client-reactive-adapter/build.gradle
index 984775a..37e6774 100644
--- a/pulsar-client-reactive-adapter/build.gradle
+++ b/pulsar-client-reactive-adapter/build.gradle
@@ -35,6 +35,7 @@ dependencies {
testImplementation libs.bundles.log4j
testImplementation libs.mockito.core
+ intTestImplementation libs.pulsar.client.all
intTestImplementation
project(':pulsar-client-reactive-producer-cache-caffeine')
intTestImplementation project(path:
':pulsar-client-reactive-producer-cache-caffeine-shaded', configuration:
'shadow')
intTestImplementation libs.junit.jupiter
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
index 6c8dada..a93988d 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
@@ -36,8 +36,11 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.MessageSpecBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
@@ -94,6 +97,38 @@ class ReactiveMessagePipelineE2ETests {
}
}
+ @Test
+ void shouldSupportWaitingForConsumingToStartAndStop() throws Exception {
+ try (PulsarClient pulsarClient =
SingletonPulsarContainer.createPulsarClient();
+ PulsarAdmin pulsarAdmin =
SingletonPulsarContainer.createPulsarAdmin()) {
+ String topicName = "test" + UUID.randomUUID();
+ ReactivePulsarClient reactivePulsarClient =
AdaptedReactivePulsarClientFactory.create(pulsarClient);
+ ReactiveMessagePipeline pipeline =
reactivePulsarClient.messageConsumer(Schema.STRING)
+ .subscriptionName("sub")
+ .topic(topicName)
+ .build()
+ .messagePipeline()
+ .messageHandler((message) -> Mono.empty())
+ .build()
+ .start();
+
+ // wait for consuming to start
+ pipeline.untilStarted().block(Duration.ofSeconds(5));
+ // there should be an existing subscription
+ List<String> subscriptions =
pulsarAdmin.topics().getSubscriptions(topicName);
+ assertThat(subscriptions).as("subscription should be
created").contains("sub");
+
+ // stop the pipeline
+ pipeline.stop();
+ // and wait for it to stop
+ pipeline.untilStopped().block(Duration.ofSeconds(5));
+ // there should be no consumers
+ TopicStats topicStats =
pulsarAdmin.topics().getStats(topicName);
+ SubscriptionStats subStats =
topicStats.getSubscriptions().get("sub");
+ assertThat(subStats.getConsumers()).isEmpty();
+ }
+ }
+
@ParameterizedTest
@EnumSource(MessageOrderScenario.class)
void shouldRetainMessageOrder(MessageOrderScenario
messageOrderScenario) throws Exception {
diff --git
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
index 431e448..d53e185 100644
---
a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
+++
b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.reactive.client.adapter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testcontainers.containers.PulsarContainer;
@@ -44,6 +45,12 @@ final class SingletonPulsarContainer {
.build();
}
+ static PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+ return PulsarAdmin.builder()
+
.serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl())
+ .build();
+ }
+
static DockerImageName getPulsarImage() {
return DockerImageName.parse("apachepulsar/pulsar:4.0.4");
}
diff --git
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
index 8e89a09..e66ba28 100644
---
a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
+++
b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@@ -45,12 +46,20 @@ class ReactiveConsumerAdapter<T> {
}
private Mono<Consumer<T>> createConsumerMono() {
- return AdapterImplementationFactory.adaptPulsarFuture(
- () ->
this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync());
+ return Mono.deferContextual((contextView) ->
AdapterImplementationFactory
+ .adaptPulsarFuture(
+ () ->
this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync())
+ .doOnSuccess((consumer) ->
contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
+ .ifPresent((listener) ->
listener.onConsumerCreated(consumer))));
}
private Mono<Void> closeConsumer(Consumer<?> consumer) {
- return Mono.fromFuture(consumer::closeAsync).doOnSuccess((__)
-> this.LOG.info("Consumer closed {}", consumer));
+ return Mono.deferContextual((contextView) ->
Mono.fromFuture(consumer::closeAsync).doFinally((signalType) -> {
+ this.LOG.info("Consumer closed {}", consumer);
+
contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
+ .ifPresent((listener) ->
listener.onConsumerClosed(consumer));
+ }));
+
}
<R> Mono<R> usingConsumer(Function<Consumer<T>, Mono<R>>
usingConsumerAction) {
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
index 47804a5..9613293 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
@@ -19,20 +19,26 @@
package org.apache.pulsar.reactive.client.api;
+import reactor.core.publisher.Mono;
+
/**
* Reactive message pipeline interface.
*/
public interface ReactiveMessagePipeline extends AutoCloseable {
/**
- * Starts the reactive pipeline.
- * @return the pipeline
+ * Starts the reactive pipeline asynchronously.
+ * @return the pipeline instance
+ * @see #untilStarted() For returning a reactive publisher (Mono) that
completes after
+ * the pipeline has actually started.
*/
ReactiveMessagePipeline start();
/**
- * Stops the reactive pipeline.
- * @return the reactive pipeline
+ * Stops the reactive pipeline asynchronously.
+ * @return the pipeline instance
+ * @see #untilStopped() For returning a reactive publisher (Mono) that
completes after
+ * the pipeline has actually stopped.
*/
ReactiveMessagePipeline stop();
@@ -43,11 +49,54 @@ public interface ReactiveMessagePipeline extends
AutoCloseable {
boolean isRunning();
/**
- * Closes the reactive pipeline.
+ * Closes the reactive pipeline asynchronously without waiting for
shutdown
+ * completion.
* @throws Exception if an error occurs
*/
default void close() throws Exception {
stop();
}
+ /**
+ * <p>
+ * Returns a reactive publisher (Mono) that completes after the
pipeline has
+ * successfully subscribed to the input topic(s) and started consuming
messages for
+ * the first time after pipeline creation. This method is not intended
to be used
+ * after a pipeline restarts following failure. Use this method to wait
for consumer
+ * and Pulsar subscription creation. This helps avoid race conditions
when sending
+ * messages immediately after the pipeline starts.
+ * </p>
+ * <p>
+ * The {@link #start()} method must be called before invoking this
method.
+ * </p>
+ * <p>
+ * To wait for the operation to complete synchronously, it is necessary
to call
+ * {@link Mono#block()} on the returned Mono.
+ * </p>
+ * @return a Mono that completes after the pipeline has created its
underlying Pulsar
+ * consumer
+ */
+ default Mono<Void> untilStarted() {
+ return Mono.empty();
+ }
+
+ /**
+ * <p>
+ * Returns a reactive publisher (Mono) that completes after the
pipeline has closed
+ * the underlying Pulsar consumer and stopped consuming new messages.
+ * </p>
+ * <p>
+ * The {@link #stop()} method must be called before invoking this
method.
+ * </p>
+ * <p>
+ * To wait for the operation to complete synchronously, it is necessary
to call
+ * {@link Mono#block()} on the returned Mono.
+ * </p>
+ * @return a Mono that completes when the pipeline has closed the
underlying Pulsar
+ * consumer
+ */
+ default Mono<Void> untilStopped() {
+ return Mono.empty();
+ }
+
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index cd94f19..eefdcf2 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.reactive.client.internal.api;
import java.time.Duration;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -67,6 +68,10 @@ class DefaultReactiveMessagePipeline<T> implements
ReactiveMessagePipeline {
private final MessageGroupingFunction groupingFunction;
+ private final AtomicReference<InternalConsumerListenerImpl>
consumerListener = new AtomicReference<>();
+
+ private final AtomicReference<CompletableFuture<Void>>
pipelineStoppedFuture = new AtomicReference<>();
+
DefaultReactiveMessagePipeline(ReactiveMessageConsumer<T>
messageConsumer,
Function<Message<T>, Publisher<Void>> messageHandler,
BiConsumer<Message<T>, Throwable> errorLogger,
Retry pipelineRetrySpec, Duration handlingTimeout,
Function<Mono<Void>, Publisher<Void>> transformer,
@@ -83,7 +88,14 @@ class DefaultReactiveMessagePipeline<T> implements
ReactiveMessagePipeline {
this.pipeline =
messageConsumer.consumeMany(this::createMessageConsumer)
.then()
.transform(transformer)
- .transform(this::decoratePipeline);
+ .transform(this::decoratePipeline)
+ .doFinally((signalType) -> {
+ CompletableFuture<Void> f =
this.pipelineStoppedFuture.get();
+ if (f != null) {
+ f.complete(null);
+ }
+ })
+ .doFirst(() -> this.pipelineStoppedFuture.set(new
CompletableFuture<>()));
}
private Mono<Void> decorateMessageHandler(Mono<Void> messageHandler) {
@@ -168,14 +180,26 @@ class DefaultReactiveMessagePipeline<T> implements
ReactiveMessagePipeline {
if (this.killSwitch.get() != null) {
throw new IllegalStateException("Message handler is
already running.");
}
- Disposable disposable = this.pipeline.subscribe(null,
this::logError, this::logUnexpectedCompletion);
+ InternalConsumerListenerImpl consumerListener = new
InternalConsumerListenerImpl();
+ Disposable disposable =
this.pipeline.contextWrite(Context.of(InternalConsumerListener.class,
consumerListener))
+ .subscribe(null, this::logError,
this::logUnexpectedCompletion);
if (!this.killSwitch.compareAndSet(null, disposable)) {
disposable.dispose();
throw new IllegalStateException("Message handler was
already running.");
}
+ this.consumerListener.set(consumerListener);
return this;
}
+ @Override
+ public Mono<Void> untilStarted() {
+ if (!isRunning()) {
+ throw new IllegalStateException("Pipeline isn't
running. Call start first.");
+ }
+ InternalConsumerListenerImpl internalConsumerListener =
this.consumerListener.get();
+ return internalConsumerListener.waitForConsumerCreated();
+ }
+
private void logError(Throwable throwable) {
LOG.error("ReactiveMessageHandler was unexpectedly
terminated.", throwable);
}
@@ -195,9 +219,44 @@ class DefaultReactiveMessagePipeline<T> implements
ReactiveMessagePipeline {
return this;
}
+ @Override
+ public Mono<Void> untilStopped() {
+ if (isRunning()) {
+ throw new IllegalStateException("Pipeline is running.
Call stop first.");
+ }
+ CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
+ if (f != null) {
+ return Mono.fromFuture(f, true);
+ }
+ else {
+ return Mono.empty();
+ }
+ }
+
@Override
public boolean isRunning() {
return this.killSwitch.get() != null;
}
+ private static final class InternalConsumerListenerImpl implements
InternalConsumerListener {
+
+ private final CompletableFuture<Void> createdFuture;
+
+ private InternalConsumerListenerImpl() {
+ this.createdFuture = new CompletableFuture<>();
+ }
+
+ @Override
+ public void onConsumerCreated(Object nativeConsumer) {
+ if (!this.createdFuture.isDone()) {
+ this.createdFuture.complete(null);
+ }
+ }
+
+ Mono<Void> waitForConsumerCreated() {
+ return Mono.fromFuture(this.createdFuture, true);
+ }
+
+ }
+
}
diff --git
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
similarity index 51%
copy from
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
copy to
pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
index 47804a5..8ee8f4b 100644
---
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
+++
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
@@ -17,37 +17,30 @@
* under the License.
*/
-package org.apache.pulsar.reactive.client.api;
+package org.apache.pulsar.reactive.client.internal.api;
/**
- * Reactive message pipeline interface.
+ * Internal interface to signal the creation and closing of a native consumer.
This is not
+ * to be intended to be used by applications.
*/
-public interface ReactiveMessagePipeline extends AutoCloseable {
+public interface InternalConsumerListener {
/**
- * Starts the reactive pipeline.
- * @return the pipeline
+ * Called when a new native consumer is created. This is called each
time a new
+ * consumer is created initially or as a result of a reactive pipeline
retry.
+ * @param nativeConsumer the native consumer instance
*/
- ReactiveMessagePipeline start();
-
- /**
- * Stops the reactive pipeline.
- * @return the reactive pipeline
- */
- ReactiveMessagePipeline stop();
-
- /**
- * Gets whether the reactive pipeline is running.
- * @return true if the reactive pipeline is running
- */
- boolean isRunning();
+ default void onConsumerCreated(Object nativeConsumer) {
+ // no-op
+ }
/**
- * Closes the reactive pipeline.
- * @throws Exception if an error occurs
+ * Called when a native consumer is closed. This is called each time a
consumer is
+ * closed as a result of a reactive pipeline retry or when the pipeline
is closed.
+ * @param nativeConsumer the native consumer instance
*/
- default void close() throws Exception {
- stop();
+ default void onConsumerClosed(Object nativeConsumer) {
+ // no-op
}
}
diff --git
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
index 3415481..f330a54 100644
---
a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
+++
b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -38,6 +39,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -49,6 +51,7 @@ import reactor.util.retry.Retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
class ReactiveMessagePipelineTests {
@@ -164,6 +167,27 @@ class ReactiveMessagePipelineTests {
}
+ @Test
+ void pipelineUntilStartedAndStopped() throws Exception {
+ int numMessages = 10;
+ Duration subscriptionDelay = Duration.ofSeconds(1);
+ TestConsumer testConsumer = new TestConsumer(numMessages,
subscriptionDelay);
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ Function<Message<String>, Publisher<Void>> messageHandler = (
+ message) -> Mono.empty().then().doFinally((__)
-> latch.countDown());
+ ReactiveMessagePipeline pipeline =
testConsumer.messagePipeline().messageHandler(messageHandler).build();
+ pipeline.start();
+ // timeout should occur since subscription delay is 1 second in
TestConsumer
+ assertThatThrownBy(() ->
pipeline.untilStarted().block(Duration.ofMillis(100)))
+ .isInstanceOf(IllegalStateException.class)
+ .hasCauseInstanceOf(TimeoutException.class);
+ // now wait for consuming to start
+ pipeline.untilStarted().block(Duration.ofSeconds(2));
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+ // now wait for consuming to stop
+ pipeline.stop().untilStopped().block(Duration.ofSeconds(1));
+ }
+
@Test
void streamingHandler() throws Exception {
int numMessages = 10;
@@ -480,10 +504,17 @@ class ReactiveMessagePipelineTests {
private final int numMessages;
+ private final Duration subscriptionDelay;
+
private volatile Runnable finishedCallback;
TestConsumer(int numMessages) {
+ this(numMessages, null);
+ }
+
+ TestConsumer(int numMessages, Duration subscriptionDelay) {
this.numMessages = numMessages;
+ this.subscriptionDelay = subscriptionDelay;
}
private final List<MessageId> acknowledgedMessages = new
CopyOnWriteArrayList<>();
@@ -496,7 +527,10 @@ class ReactiveMessagePipelineTests {
@Override
public <R> Flux<R> consumeMany(Function<Flux<Message<String>>,
Publisher<MessageResult<R>>> messageHandler) {
- return Flux.defer(() -> {
+ Flux<R> flux = Flux.deferContextual((contextView) -> {
+ Optional<InternalConsumerListener>
internalConsumerListener = contextView
+
.getOrEmpty(InternalConsumerListener.class);
+ internalConsumerListener.ifPresent((listener)
-> listener.onConsumerCreated(this));
Flux<Message<String>> messages = Flux.range(0,
this.numMessages)
.map(Object::toString)
.map(TestMessage::new);
@@ -511,8 +545,15 @@ class ReactiveMessagePipelineTests {
if (this.finishedCallback != null) {
this.finishedCallback.run();
}
+
internalConsumerListener.ifPresent((listener) ->
listener.onConsumerClosed(this));
});
});
+ if (this.subscriptionDelay != null) {
+ return
flux.delaySubscription(this.subscriptionDelay);
+ }
+ else {
+ return flux;
+ }
}
List<MessageId> getAcknowledgedMessages() {