Repository: camel Updated Branches: refs/heads/master 6d48acedf -> 5bd029549
CAMEL-9964: Annotation based DefaultProducer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5bd02954 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5bd02954 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5bd02954 Branch: refs/heads/master Commit: 5bd02954916c6821fb0aec27c136cec9a5d8320e Parents: 6d48ace Author: lburgazzoli <lburgazz...@gmail.com> Authored: Mon May 23 16:32:37 2016 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Mon May 23 16:33:13 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/InvokeOnHeader.java | 36 +++++ .../java/org/apache/camel/InvokeOnHeaders.java | 34 +++++ .../apache/camel/impl/BaseSelectorProducer.java | 57 ++++++++ .../camel/impl/HeaderSelectorProducer.java | 132 +++++++++++++++++++ 4 files changed, 259 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5bd02954/camel-core/src/main/java/org/apache/camel/InvokeOnHeader.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/InvokeOnHeader.java b/camel-core/src/main/java/org/apache/camel/InvokeOnHeader.java new file mode 100644 index 0000000..8b3b369 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/InvokeOnHeader.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks a method as being invoked for a specific header value. + * + * @see Message#getHeader(String) + * @version + */ +@Repeatable(InvokeOnHeaders.class) +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface InvokeOnHeader { + String value(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5bd02954/camel-core/src/main/java/org/apache/camel/InvokeOnHeaders.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/InvokeOnHeaders.java b/camel-core/src/main/java/org/apache/camel/InvokeOnHeaders.java new file mode 100644 index 0000000..32c1b00 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/InvokeOnHeaders.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks a method as being invoked for a specific header value. + * + * @see Message#getHeader(String) + * @version + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface InvokeOnHeaders { + InvokeOnHeader[] value(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/5bd02954/camel-core/src/main/java/org/apache/camel/impl/BaseSelectorProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/BaseSelectorProducer.java b/camel-core/src/main/java/org/apache/camel/impl/BaseSelectorProducer.java new file mode 100644 index 0000000..34a41f8 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/BaseSelectorProducer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * A base class for selector-based producers. + */ +public abstract class BaseSelectorProducer extends DefaultProducer { + protected BaseSelectorProducer(Endpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + final Processor processor = getProcessor(exchange); + if (processor != null) { + processor.process(exchange); + } else { + onMissingProcessor(exchange); + } + } + + /** + * Determine the processor to use to handle the exchange. + * + * @param exchange the message exchange + * @return the processor to processes the message exchange + * @throws Exception + */ + protected abstract Processor getProcessor(Exchange exchange) throws Exception; + + /** + * Invoked when no processor has been defined to process the message exchnage. + * + * @param exchange the message exchange + * @throws Exception + */ + protected abstract void onMissingProcessor(Exchange exchange) throws Exception; +} http://git-wip-us.apache.org/repos/asf/camel/blob/5bd02954/camel-core/src/main/java/org/apache/camel/impl/HeaderSelectorProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/HeaderSelectorProducer.java b/camel-core/src/main/java/org/apache/camel/impl/HeaderSelectorProducer.java new file mode 100644 index 0000000..2e2d629 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/HeaderSelectorProducer.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.InvokeOnHeader; +import org.apache.camel.InvokeOnHeaders; +import org.apache.camel.Message; +import org.apache.camel.NoSuchHeaderException; +import org.apache.camel.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A selector-based produced which uses an header value to determine which processor + * should be invoked. + */ +public class HeaderSelectorProducer extends BaseSelectorProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(HeaderSelectorProducer.class); + + private final String header; + private final String defaultHeaderValue; + private final Object target; + private Map<String, Processor> handlers; + + public HeaderSelectorProducer(Endpoint endpoint, String header) { + this(endpoint, header, null, null); + } + + public HeaderSelectorProducer(Endpoint endpoint, String header, Object target) { + this(endpoint, header, null, target); + } + + public HeaderSelectorProducer(Endpoint endpoint, String header, String defaultHeaderValue) { + this(endpoint, header, defaultHeaderValue, null); + } + + public HeaderSelectorProducer(Endpoint endpoint, String header, String defaultHeaderValue, Object target) { + super(endpoint); + + this.header = header; + this.defaultHeaderValue = defaultHeaderValue; + this.target = target != null ? target : this; + this.handlers = new HashMap<>(); + } + + @Override + protected void doStart() throws Exception { + for (final Method method : target.getClass().getDeclaredMethods()) { + InvokeOnHeaders annotation = method.getAnnotation(InvokeOnHeaders.class); + if (annotation != null) { + for (InvokeOnHeader processor : annotation.value()) { + bind(processor, method); + } + } else { + bind(method.getAnnotation(InvokeOnHeader.class), method); + } + } + + handlers = Collections.unmodifiableMap(handlers); + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + handlers.clear(); + } + + @Override + protected Processor getProcessor(Exchange exchange) throws Exception { + final String action = exchange.getIn().getHeader(header, defaultHeaderValue, String.class); + if (action == null) { + throw new NoSuchHeaderException(exchange, header, String.class); + } + + return handlers.get(action); + } + + protected void onMissingProcessor(Exchange exchange) throws Exception { + throw new IllegalStateException( + "Unsupported operation " + exchange.getIn().getHeader(header) + ); + } + + protected final void bind(String key, Processor processor) { + if (handlers.containsKey(key)) { + LOGGER.warn("A processor is already set for action {}", key); + } + + this.handlers.put(key, processor); + } + + private void bind(InvokeOnHeader handler, final Method method) { + if (handler != null && method.getParameterCount() == 1) { + method.setAccessible(true); + + final Class<?> type = method.getParameterTypes()[0]; + + LOGGER.debug("bind key={}, class={}, method={}, type={}", + handler.value(), this.getClass(), method.getName(), type); + + if (Message.class.isAssignableFrom(type)) { + bind(handler.value(), e -> method.invoke(target, e.getIn())); + } else { + bind(handler.value(), e -> method.invoke(target, e)); + } + } + } +}