This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit e14e44580a823445d6f67b18a957ac57874a8543 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Oct 15 09:59:21 2020 +0200 CAMEL-15176: Optimize component to do as much in init phase vs start phase. --- .../camel/component/direct/DirectEndpoint.java | 14 ++--- .../camel/component/directvm/DirectVmProducer.java | 73 +++++++++++++--------- components/components-init-work-in-progress.md | 12 ++-- 3 files changed, 56 insertions(+), 43 deletions(-) diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java index 941ce31..4b4e1fd 100644 --- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java +++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java @@ -42,6 +42,7 @@ import org.apache.camel.util.StringHelper; public class DirectEndpoint extends DefaultEndpoint { private final Map<String, DirectConsumer> consumers; + private String key; @UriPath(description = "Name of direct endpoint") @Metadata(required = true) @@ -68,6 +69,11 @@ public class DirectEndpoint extends DefaultEndpoint { } @Override + protected void doInit() throws Exception { + key = initKey(); + } + + @Override public Producer createProducer() throws Exception { return new DirectProducer(this); } @@ -80,7 +86,6 @@ public class DirectEndpoint extends DefaultEndpoint { } public void addConsumer(DirectConsumer consumer) { - String key = getKey(); synchronized (consumers) { if (consumers.putIfAbsent(key, consumer) != null) { throw new IllegalArgumentException( @@ -91,7 +96,6 @@ public class DirectEndpoint extends DefaultEndpoint { } public void removeConsumer(DirectConsumer consumer) { - String key = getKey(); synchronized (consumers) { consumers.remove(key, consumer); consumers.notifyAll(); @@ -99,7 +103,6 @@ public class DirectEndpoint extends DefaultEndpoint { } protected DirectConsumer getConsumer() throws InterruptedException { - String key = getKey(); synchronized (consumers) { DirectConsumer answer = consumers.get(key); if (answer == null && block) { @@ -116,9 +119,6 @@ public class DirectEndpoint extends DefaultEndpoint { consumers.wait(rem); } } - // if (answer != null && answer.getEndpoint() != this) { - // throw new IllegalStateException(); - // } return answer; } } @@ -160,7 +160,7 @@ public class DirectEndpoint extends DefaultEndpoint { this.failIfNoConsumers = failIfNoConsumers; } - protected String getKey() { + protected String initKey() { String uri = getEndpointUri(); if (uri.indexOf('?') != -1) { return StringHelper.before(uri, "?"); diff --git a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java index 1eed7e1..568eef7 100644 --- a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java +++ b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java @@ -54,43 +54,56 @@ public class DirectVmProducer extends DefaultAsyncProducer { return true; } - final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy(); + try { + final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy(); - // Only clone the Exchange if we actually need to filter out properties or headers. - final Exchange submitted - = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy() : exchange; + // Only clone the Exchange if we actually need to filter out properties or headers. + final Exchange submitted + = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy() : exchange; - // Clear properties in the copy if we are not propagating them. - if (!endpoint.isPropagateProperties()) { - submitted.getProperties().clear(); - } - - // Filter headers by Header Filter Strategy if there is one set. - if (headerFilterStrategy != null) { - submitted.getIn().getHeaders().entrySet() - .removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted)); - } - - return consumer.getAsyncProcessor().process(submitted, done -> { - Message msg = submitted.getMessage(); - - if (headerFilterStrategy != null) { - msg.getHeaders().entrySet() - .removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(), submitted)); + // Clear properties in the copy if we are not propagating them. + if (!endpoint.isPropagateProperties()) { + submitted.getProperties().clear(); } - if (exchange != submitted) { - // only need to copy back if they are different - exchange.setException(submitted.getException()); - exchange.getOut().copyFrom(msg); + // Filter headers by Header Filter Strategy if there is one set. + if (headerFilterStrategy != null) { + submitted.getIn().getHeaders().entrySet() + .removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted)); } - if (endpoint.isPropagateProperties()) { - exchange.getProperties().putAll(submitted.getProperties()); - } + return consumer.getAsyncProcessor().process(submitted, done -> { + try { + Message msg = submitted.getMessage(); + + if (headerFilterStrategy != null) { + msg.getHeaders().entrySet() + .removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(), + submitted)); + } + + if (exchange != submitted) { + // only need to copy back if they are different + exchange.setException(submitted.getException()); + exchange.getOut().copyFrom(msg); + } + + if (endpoint.isPropagateProperties()) { + exchange.getProperties().putAll(submitted.getProperties()); + } + } catch (Throwable e) { + exchange.setException(e); + } finally { + callback.done(done); + } + }); + + } catch (Throwable e) { + exchange.setException(e); + } - callback.done(done); - }); + callback.done(true); + return true; } } diff --git a/components/components-init-work-in-progress.md b/components/components-init-work-in-progress.md index 487cdab..740dac6 100644 --- a/components/components-init-work-in-progress.md +++ b/components/components-init-work-in-progress.md @@ -90,13 +90,13 @@ |camel-controlbus|DONE | | |camel-corda|REJECT|open network connection| |camel-couchbase|DONE | | -|camel-couchdb| | | +|camel-couchdb|DONE | | |camel-cron|DONE | | -|camel-crypto| | | -|camel-crypto-cms| | | +|camel-crypto|DONE | | +|camel-crypto-cms|REJECT | | |camel-csv|DONE| | |camel-cxf|REJECT|start a server| -|camel-cxf-transport| | | +|camel-cxf-transport|REJECT | | |camel-dataformat|DONE| | |camel-dataset|DONE| | |camel-debezium| | | @@ -106,8 +106,8 @@ |camel-debezium-postgres| | | |camel-debezium-sqlserver| | | |camel-digitalocean|REJECT|create a http client| -|camel-direct| | | -|camel-directvm| | | +|camel-direct|DONE | | +|camel-directvm|DONE | | |camel-disruptor|REJECT|start a thread| |camel-djl| | | |camel-dns| | |
