orpiske commented on code in PR #7474:
URL: https://github.com/apache/camel/pull/7474#discussion_r854146162


##########
core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc:
##########
@@ -219,3 +219,47 @@ from(from)
     .process(exchange -> LOG.info("Received an exchange: {}", 
exchange.getMessage().getBody()))
     .to(destination);
 ----
+
+You can also integrate the pausable API and the consumer listener with the 
circuit breaker EIP. For instance, it's
+possible to configure the circuit breaker so that it can manipulate the state 
of the listener based on success or on
+error conditions on the circuit.
+
+One example, would be to create a event watcher that checks for a downstream 
system availability. It watches for error events and, when they happen, it 
triggers a scheduled check. On success, it shuts down the scheduled check.
+
+An example implementation of this approach would be similar to this:
+
+[source,java]
+----
+CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("pausable");
+
+circuitBreaker.getEventPublisher()
+    .onSuccess(event -> {
+        LOG.info("Downstream call succeeded");
+        if (executorService != null) {
+            executorService.shutdownNow();
+            executorService = null;
+        }
+    })
+    .onError(event -> {
+        LOG.info(
+                "Downstream call error. Starting a thread to simulate checking 
for the downstream availability");
+
+        if (executorService == null) {
+            executorService = Executors.newSingleThreadScheduledExecutor();
+            // In a real world scenario, instead of incrementing, it could be 
pinging a remote system or
+            // running a similar check to determine whether it's available. 
That
+            executorService.scheduleAtFixedRate(() -> someCheckMethod(), 1, 1, 
TimeUnit.SECONDS);
+        }
+    });
+
+// Binds the configuration to the registry
+ getCamelContext().getRegistry().bind("pausableCircuit", circuitBreaker);
+
+from(from)
+    .pausable(new KafkaConsumerListener(), o -> canContinue())
+    .routeId("pausable-it")
+    .process(exchange -> LOG.info("Got record from Kafka: {}", 
exchange.getMessage().getBody()))
+    .circuitBreaker()
+        .resilience4jConfiguration().circuitBreaker("pausableCircuit").end()
+    .to(to);

Review Comment:
   Thanks @davsclaus. With regards to `end()` the CB, do you mean something 
like this?
   
   ```
   .circuitBreaker()
           .resilience4jConfiguration().circuitBreaker("pausableCircuit").end()
           .to(to);
   ```
   
   I tried a different interpretation of your suggestion (below), but the DSL 
doesn't seem very happy with it because the 
`Resilience4jConfigurationDefinition` doesn't resolve the `to`.  
   
   ```
   .circuitBreaker()
           .resilience4jConfiguration().circuitBreaker("pausableCircuit")
           .to(to)
   .end();
   ```
   
   Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to