Repository: camel Updated Branches: refs/heads/master e8bfc8cfb -> 38c3cfa50
Camel connector allow to do custom logic before producer or consumer does anything. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38c3cfa5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38c3cfa5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38c3cfa5 Branch: refs/heads/master Commit: 38c3cfa505c75dc52ea3cc80a1723250dfd925dc Parents: e8bfc8c Author: Claus Ibsen <[email protected]> Authored: Fri Apr 7 14:32:07 2017 +0200 Committer: Claus Ibsen <[email protected]> Committed: Fri Apr 7 14:32:19 2017 +0200 ---------------------------------------------------------------------- .../component/connector/ConnectorComponent.java | 28 +++++++ .../component/connector/ConnectorProducer.java | 82 ++++++++++++++++++++ .../connector/DefaultConnectorComponent.java | 45 +++++++++++ .../connector/DefaultConnectorEndpoint.java | 12 ++- 4 files changed, 164 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java index 4a7dbce..901ed0b 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java @@ -20,6 +20,7 @@ import java.net.URISyntaxException; import java.util.Map; import org.apache.camel.Component; +import org.apache.camel.Processor; import org.apache.camel.catalog.CamelCatalog; /** @@ -76,4 +77,31 @@ public interface ConnectorComponent extends Component { */ void setComponentOptions(Map<String, Object> baseComponentOptions); + /** + * To perform custom processing before the producer is sending the message. + */ + void setBeforeProducer(Processor processor); + + Processor getBeforeProducer(); + + /** + * To perform custom processing after the producer has sent the message and received any reply (if InOut). + */ + void setAfterProducer(Processor processor); + + Processor getAfterProducer(); + + /** + * To perform custom processing when the consumer has just received a new incoming message. + */ + void setBeforeConsumer(Processor processor); + + Processor getBeforeConsumer(); + + /** + * To perform custom processing when the consumer is about to send back a reply message to the caller (if InOut). + */ + void setAfterConsumer(Processor processor); + + Processor getAfterConsumer(); } http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java new file mode 100644 index 0000000..b0d7225 --- /dev/null +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.connector; + +import java.util.concurrent.RejectedExecutionException; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ServiceHelper; + +public class ConnectorProducer extends DefaultProducer { + + private final Producer producer; + private final Processor beforeProducer; + private final Processor afterProducer; + + public ConnectorProducer(Endpoint endpoint, Producer producer, Processor beforeProducer, Processor afterProducer) { + super(endpoint); + this.producer = producer; + this.beforeProducer = beforeProducer; + this.afterProducer = afterProducer; + } + + @Override + public void process(Exchange exchange) throws Exception { + if (!isRunAllowed()) { + throw new RejectedExecutionException(); + } + + if (beforeProducer != null) { + beforeProducer.process(exchange); + } + + producer.process(exchange); + + if (afterProducer != null) { + afterProducer.process(exchange); + } + } + + @Override + protected void doStart() throws Exception { + ServiceHelper.startService(producer); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(producer); + } + + @Override + protected void doSuspend() throws Exception { + ServiceHelper.suspendService(producer); + } + + @Override + protected void doResume() throws Exception { + ServiceHelper.resumeService(producer); + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownService(producer); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java index f841ec0..4f3fd15 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.camel.Component; import org.apache.camel.ComponentVerifier; import org.apache.camel.Endpoint; +import org.apache.camel.Processor; import org.apache.camel.VerifiableComponent; import org.apache.camel.catalog.CamelCatalog; import org.apache.camel.catalog.DefaultCamelCatalog; @@ -47,6 +48,10 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme private final String componentName; private final ConnectorModel model; private Map<String, Object> componentOptions; + private Processor beforeProducer; + private Processor afterProducer; + private Processor beforeConsumer; + private Processor afterConsumer; protected DefaultConnectorComponent(String componentName, String className) { this.componentName = componentName; @@ -197,6 +202,46 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme super.doStop(); } + @Override + public Processor getBeforeProducer() { + return beforeProducer; + } + + @Override + public void setBeforeProducer(Processor beforeProducer) { + this.beforeProducer = beforeProducer; + } + + @Override + public Processor getAfterProducer() { + return afterProducer; + } + + @Override + public void setAfterProducer(Processor afterProducer) { + this.afterProducer = afterProducer; + } + + @Override + public Processor getBeforeConsumer() { + return beforeConsumer; + } + + @Override + public void setBeforeConsumer(Processor beforeConsumer) { + this.beforeConsumer = beforeConsumer; + } + + @Override + public Processor getAfterConsumer() { + return afterConsumer; + } + + @Override + public void setAfterConsumer(Processor afterConsumer) { + this.afterConsumer = afterConsumer; + } + // *************************************** // Helpers // *************************************** http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java ---------------------------------------------------------------------- diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java index 6550907..ba9e93b 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.connector; -import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.DelegateEndpoint; import org.apache.camel.Endpoint; @@ -44,12 +43,19 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat @Override public Producer createProducer() throws Exception { - return endpoint.createProducer(); + Producer producer = endpoint.createProducer(); + return new ConnectorProducer(endpoint, producer, getComponent().getBeforeProducer(), getComponent().getAfterProducer()); } @Override public Consumer createConsumer(Processor processor) throws Exception { - return endpoint.createConsumer(processor); + Consumer answer = endpoint.createConsumer(processor); + return answer; + } + + @Override + public ConnectorComponent getComponent() { + return (ConnectorComponent) super.getComponent(); } @Override
