This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 07c502761c4 CAMEL-20881: Only create
DefaultPollingConsumerPollStrategy when needed. (#14558)
07c502761c4 is described below
commit 07c502761c4d450f5b2b7bba19ce4f69f507212d
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jun 17 14:11:47 2024 +0200
CAMEL-20881: Only create DefaultPollingConsumerPollStrategy when needed.
(#14558)
---
.../RemoteFilePollingConsumerPollStrategy.java | 12 ++++++++----
.../engine/LimitedPollingConsumerPollStrategy.java | 21 ++++++++++-----------
.../support/DefaultPollingConsumerPollStrategy.java | 7 ++-----
.../apache/camel/support/ScheduledPollConsumer.java | 7 ++++---
.../apache/camel/support/ScheduledPollEndpoint.java | 2 +-
5 files changed, 25 insertions(+), 24 deletions(-)
diff --git
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java
index ba18b2b886b..222b806637b 100644
---
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java
+++
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java
@@ -19,12 +19,16 @@ package org.apache.camel.component.file.remote;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.support.DefaultPollingConsumerPollStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Remote file consumer polling strategy that attempts to help recovering from
lost connections.
*/
public class RemoteFilePollingConsumerPollStrategy extends
DefaultPollingConsumerPollStrategy {
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoteFilePollingConsumerPollStrategy.class);
+
@Override
public boolean rollback(Consumer consumer, Endpoint endpoint, int
retryCounter, Exception e) throws Exception {
if (consumer instanceof RemoteFileConsumer) {
@@ -34,16 +38,16 @@ public class RemoteFilePollingConsumerPollStrategy extends
DefaultPollingConsume
if (rfc.isRunAllowed()) {
// disconnect from the server to force it to re login at next
// poll to recover
- if (log.isWarnEnabled()) {
- log.warn("Trying to recover by force disconnecting from
remote server and re-connecting at next poll: {}",
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Trying to recover by force disconnecting from
remote server and re-connecting at next poll: {}",
rfc.remoteServer());
}
try {
rfc.forceDisconnect();
} catch (Exception t) {
// ignore the exception
- if (log.isDebugEnabled()) {
- log.debug("Error occurred during force disconnecting
from: {}. This exception will be ignored.",
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Error occurred during force disconnecting
from: {}. This exception will be ignored.",
rfc.remoteServer(), t);
}
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/LimitedPollingConsumerPollStrategy.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/LimitedPollingConsumerPollStrategy.java
index 89c7edc3bca..25189de2786 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/LimitedPollingConsumerPollStrategy.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/LimitedPollingConsumerPollStrategy.java
@@ -21,9 +21,10 @@ import java.util.Map;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
-import org.apache.camel.Service;
import org.apache.camel.support.DefaultPollingConsumerPollStrategy;
import org.apache.camel.support.service.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link org.apache.camel.spi.PollingConsumerPollStrategy} which supports
suspending consumers if they failed for X
@@ -33,7 +34,9 @@ import org.apache.camel.support.service.ServiceHelper;
* will be suspended/stopped. This prevents the log to get flooded with failed
attempts, for example during nightly
* runs.
*/
-public class LimitedPollingConsumerPollStrategy extends
DefaultPollingConsumerPollStrategy implements Service {
+public class LimitedPollingConsumerPollStrategy extends
DefaultPollingConsumerPollStrategy {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LimitedPollingConsumerPollStrategy.class);
private final Map<Consumer, Integer> state = new HashMap<>();
private int limit = 3;
@@ -69,7 +72,7 @@ public class LimitedPollingConsumerPollStrategy extends
DefaultPollingConsumerPo
} else {
times += 1;
}
- log.debug("Rollback occurred after {} times when consuming {}", times,
endpoint);
+ LOG.debug("Rollback occurred after {} times when consuming {}", times,
endpoint);
boolean retry = false;
@@ -94,7 +97,7 @@ public class LimitedPollingConsumerPollStrategy extends
DefaultPollingConsumerPo
* @throws Exception is thrown if error suspending the consumer
*/
protected void onSuspend(Consumer consumer, Endpoint endpoint) throws
Exception {
- log.warn("Suspending consumer {} after {} attempts to consume from {}.
You have to manually resume the consumer!",
+ LOG.warn("Suspending consumer {} after {} attempts to consume from {}.
You have to manually resume the consumer!",
consumer, limit, endpoint);
ServiceHelper.suspendService(consumer);
}
@@ -104,7 +107,7 @@ public class LimitedPollingConsumerPollStrategy extends
DefaultPollingConsumerPo
*
* @param consumer the consumer
* @param endpoint the endpoint
- * @return whether or not to retry immediately, is default
<tt>false</tt>
+ * @return whether to retry immediately, is default
<tt>false</tt>
* @throws Exception can be thrown in case something goes wrong
*/
protected boolean onRollback(Consumer consumer, Endpoint endpoint) throws
Exception {
@@ -113,12 +116,8 @@ public class LimitedPollingConsumerPollStrategy extends
DefaultPollingConsumerPo
}
@Override
- public void start() {
- // noop
- }
-
- @Override
- public void stop() {
+ protected void doStop() throws Exception {
state.clear();
}
+
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPollingConsumerPollStrategy.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPollingConsumerPollStrategy.java
index 6cf252f4e29..d62ecf9a235 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPollingConsumerPollStrategy.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPollingConsumerPollStrategy.java
@@ -19,15 +19,12 @@ package org.apache.camel.support;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.spi.PollingConsumerPollStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.camel.support.service.ServiceSupport;
/**
* A default implementation that will not retry on rollback.
*/
-public class DefaultPollingConsumerPollStrategy implements
PollingConsumerPollStrategy {
-
- protected final Logger log = LoggerFactory.getLogger(getClass());
+public class DefaultPollingConsumerPollStrategy extends ServiceSupport
implements PollingConsumerPollStrategy {
@Override
public boolean begin(Consumer consumer, Endpoint endpoint) {
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
index 6d02602c9c6..97d38d0f5b9 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
@@ -61,7 +61,7 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
private long delay = 500;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
private boolean useFixedDelay = true;
- private PollingConsumerPollStrategy pollStrategy = new
DefaultPollingConsumerPollStrategy();
+ private PollingConsumerPollStrategy pollStrategy;
private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
private boolean sendEmptyMessageWhenIdle;
private boolean greedy;
@@ -615,8 +615,9 @@ public abstract class ScheduledPollConsumer extends
DefaultConsumer
LOG.debug("Using backoff[multiplier={}, idleThreshold={},
errorThreshold={}] on {}", backoffMultiplier,
backoffIdleThreshold, backoffErrorThreshold,
getEndpoint());
}
-
- ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
+ if (pollStrategy == null) {
+ pollStrategy = new DefaultPollingConsumerPollStrategy();
+ }
}
@Override
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java
index ea42b8c93f5..2f3976d3db0 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollEndpoint.java
@@ -61,7 +61,7 @@ public abstract class ScheduledPollEndpoint extends
DefaultEndpoint {
@UriParam(label = "consumer,advanced",
description = "A pluggable
org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your
custom implementation"
+ " to control error handling usually occurred
during the poll operation before an Exchange have been created and being routed
in Camel.")
- private PollingConsumerPollStrategy pollStrategy = new
DefaultPollingConsumerPollStrategy();
+ private PollingConsumerPollStrategy pollStrategy;
@UriParam(defaultValue = "TRACE", label = "consumer,scheduler",
description = "The consumer logs a start/complete log line when
it polls. This option allows you to configure the logging level for that.")
private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;