This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch poll-dyn in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4a89cabad6cfe28f5b7cd560e19b2c1bbf6a400e Author: Claus Ibsen <[email protected]> AuthorDate: Sun Feb 9 10:53:19 2025 +0100 CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse endpoints during dynamic poll EIP --- .../org/apache/camel/poll-dynamic/azure-files | 2 + .../org/apache/camel/send-dynamic/azure-files | 2 + .../file/azure/FilesPollDynamicAware.java | 24 ++++ .../file/azure/FilesSendDynamicAware.java | 24 ++++ .../services/org/apache/camel/poll-dynamic/file | 2 + .../camel/component/file/FilePollDynamicAware.java | 23 ++++ .../file/GenericFilePollDynamicAware.java | 67 ++++++++++ .../services/org/apache/camel/poll-dynamic/ftp | 2 + .../services/org/apache/camel/poll-dynamic/ftps | 2 + .../services/org/apache/camel/poll-dynamic/sftp | 2 + .../component/file/remote/FtpPollDynamicAware.java | 24 ++++ .../file/remote/FtpsPollDynamicAware.java | 24 ++++ .../file/remote/SftpPollDynamicAware.java | 24 ++++ .../services/org/apache/camel/poll-dynamic/smb | 2 + .../camel/component/smb/SmbPollDynamicAware.java | 24 ++++ .../component/smb/PollDynamicFileNameTest.java | 99 ++++++++++++++ .../apache/camel/spi/annotations/PollDynamic.java | 33 +++++ .../org/apache/camel/spi/PollDynamicAware.java | 119 +++++++++++++++++ .../camel/processor/PollDynamicAwareResolver.java | 67 ++++++++++ .../org/apache/camel/processor/PollEnricher.java | 36 ++---- .../management/mbean/ManagedPollEnricherMBean.java | 6 + .../management/mbean/ManagedPollEnricher.java | 10 ++ .../support/component/PollDynamicAwareSupport.java | 144 +++++++++++++++++++++ .../apache/camel/spi/annotations/PollDynamic.java | 33 +++++ 24 files changed, 768 insertions(+), 27 deletions(-) diff --git a/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/azure-files b/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/azure-files new file mode 100644 index 00000000000..c9844afecd1 --- /dev/null +++ b/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/azure-files @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.file.azure.FilesPollDynamicAware diff --git a/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/azure-files b/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/azure-files new file mode 100644 index 00000000000..d7daea26902 --- /dev/null +++ b/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/azure-files @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.file.azure.FilesSendDynamicAware diff --git a/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesPollDynamicAware.java b/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesPollDynamicAware.java new file mode 100644 index 00000000000..a7c54d39281 --- /dev/null +++ b/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesPollDynamicAware.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.azure; + +import org.apache.camel.component.file.GenericFilePollDynamicAware; +import org.apache.camel.spi.annotations.PollDynamic; + +@PollDynamic(FilesComponent.SCHEME) +public class FilesPollDynamicAware extends GenericFilePollDynamicAware { +} diff --git a/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesSendDynamicAware.java b/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesSendDynamicAware.java new file mode 100644 index 00000000000..a224262eb0d --- /dev/null +++ b/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesSendDynamicAware.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.azure; + +import org.apache.camel.component.file.GenericFileSendDynamicAware; +import org.apache.camel.spi.annotations.SendDynamic; + +@SendDynamic(FilesComponent.SCHEME) +public class FilesSendDynamicAware extends GenericFileSendDynamicAware { +} diff --git a/components/camel-file/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/file b/components/camel-file/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/file new file mode 100644 index 00000000000..122b90b3c89 --- /dev/null +++ b/components/camel-file/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/file @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.file.FilePollDynamicAware diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FilePollDynamicAware.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FilePollDynamicAware.java new file mode 100644 index 00000000000..e0a63b50650 --- /dev/null +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FilePollDynamicAware.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.spi.annotations.PollDynamic; + +@PollDynamic("file") +public class FilePollDynamicAware extends GenericFilePollDynamicAware { +} diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollDynamicAware.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollDynamicAware.java new file mode 100644 index 00000000000..0e002f60df3 --- /dev/null +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollDynamicAware.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.support.component.PollDynamicAwareSupport; +import org.apache.camel.util.URISupport; + +public abstract class GenericFilePollDynamicAware extends PollDynamicAwareSupport { + + public static final String PROP_FILE_NAME = "fileName"; + + @Override + public boolean isLenientProperties() { + return false; + } + + @Override + public DynamicAwareEntry prepare(Exchange exchange, String uri, String originalUri) throws Exception { + Map<String, Object> properties = endpointProperties(exchange, uri); + return new DynamicAwareEntry(uri, originalUri, properties, null); + } + + @Override + public String resolveStaticUri(Exchange exchange, DynamicAwareEntry entry) throws Exception { + String uri = entry.getUri(); + // windows path problems such as C:\temp was by simple language evaluated \t as a tab character + // which should then be reversed + uri = uri.replace("\t", "\\\\t"); + + boolean fileName = entry.getProperties().containsKey(PROP_FILE_NAME); + // if any of the above are in use, then they should not be pre evaluated + // and we need to rebuild a new uri with them as-is + if (fileName) { + Map<String, Object> params = entry.getProperties(); + Map<String, Object> originalParams = URISupport.parseQuery(URISupport.extractQuery(entry.getOriginalUri())); + compute(originalParams, PROP_FILE_NAME, params); + return asEndpointUri(exchange, uri, params); + } else { + return uri; + } + } + + private static void compute(Map<String, Object> originalParams, String propFileName, Map<String, Object> params) { + Object val = originalParams.get(propFileName); + if (val != null) { + params.put(propFileName, val.toString()); + } + } + +} diff --git a/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/ftp b/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/ftp new file mode 100644 index 00000000000..ff02a276d11 --- /dev/null +++ b/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/ftp @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.file.remote.FtpPollDynamicAware diff --git a/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/ftps b/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/ftps new file mode 100644 index 00000000000..5797fc23040 --- /dev/null +++ b/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/ftps @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.file.remote.FtpsPollDynamicAware diff --git a/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/sftp b/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/sftp new file mode 100644 index 00000000000..e56ca8dc8b3 --- /dev/null +++ b/components/camel-ftp/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/sftp @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.file.remote.SftpPollDynamicAware diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpPollDynamicAware.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpPollDynamicAware.java new file mode 100644 index 00000000000..4311acd528c --- /dev/null +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpPollDynamicAware.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import org.apache.camel.component.file.GenericFilePollDynamicAware; +import org.apache.camel.spi.annotations.PollDynamic; + +@PollDynamic("ftp") +public class FtpPollDynamicAware extends GenericFilePollDynamicAware { +} diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpsPollDynamicAware.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpsPollDynamicAware.java new file mode 100644 index 00000000000..3693f994150 --- /dev/null +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpsPollDynamicAware.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import org.apache.camel.component.file.GenericFilePollDynamicAware; +import org.apache.camel.spi.annotations.PollDynamic; + +@PollDynamic("ftps") +public class FtpsPollDynamicAware extends GenericFilePollDynamicAware { +} diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpPollDynamicAware.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpPollDynamicAware.java new file mode 100644 index 00000000000..14117e61383 --- /dev/null +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpPollDynamicAware.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import org.apache.camel.component.file.GenericFilePollDynamicAware; +import org.apache.camel.spi.annotations.PollDynamic; + +@PollDynamic("sftp") +public class SftpPollDynamicAware extends GenericFilePollDynamicAware { +} diff --git a/components/camel-smb/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/smb b/components/camel-smb/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/smb new file mode 100644 index 00000000000..913461c58fd --- /dev/null +++ b/components/camel-smb/src/generated/resources/META-INF/services/org/apache/camel/poll-dynamic/smb @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.smb.SmbPollDynamicAware diff --git a/components/camel-smb/src/main/java/org/apache/camel/component/smb/SmbPollDynamicAware.java b/components/camel-smb/src/main/java/org/apache/camel/component/smb/SmbPollDynamicAware.java new file mode 100644 index 00000000000..9a6dd8b186c --- /dev/null +++ b/components/camel-smb/src/main/java/org/apache/camel/component/smb/SmbPollDynamicAware.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.smb; + +import org.apache.camel.component.file.GenericFilePollDynamicAware; +import org.apache.camel.spi.annotations.PollDynamic; + +@PollDynamic("smb") +public class SmbPollDynamicAware extends GenericFilePollDynamicAware { +} diff --git a/components/camel-smb/src/test/java/org/apache/camel/component/smb/PollDynamicFileNameTest.java b/components/camel-smb/src/test/java/org/apache/camel/component/smb/PollDynamicFileNameTest.java new file mode 100644 index 00000000000..2fd7f496e74 --- /dev/null +++ b/components/camel-smb/src/test/java/org/apache/camel/component/smb/PollDynamicFileNameTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.smb; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PollDynamicFileNameTest extends SmbServerTestSupport { + + protected String getSmbUrl() { + return String.format( + "smb:%s/%s?username=%s&password=%s&path=/noop&noop=true", + service.address(), service.shareName(), service.userName(), service.password()); + } + + @Override + public void doPostSetup() throws Exception { + prepareSmbServer(); + } + + @Test + public void testPollEnrichFileOne() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(2); + getMockEndpoint("mock:result").message(0).body().isEqualTo("Hello World"); + getMockEndpoint("mock:result").message(1).body().isNull(); + + template.sendBodyAndHeader("direct:start", "Foo", "target", "myfile.txt"); + template.sendBodyAndHeader("direct:start", "Bar", "target", "unknown.txt"); + + MockEndpoint.assertIsSatisfied(context); + + // there should only be 1 file endpoint + long c = context.getEndpoints().stream() + .filter(e -> e.getEndpointKey().startsWith("smb") && e.getEndpointUri().contains("?fileName=")).count(); + Assertions.assertEquals(1, c, "There should only be 1 smb endpoint"); + } + + @Test + public void testPollEnrichFileTwo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World"); + + template.sendBodyAndHeader(getSmbUrl(), "Bye World", Exchange.FILE_NAME, "myfile2.txt"); + + template.sendBodyAndHeader("direct:start", "Foo", "target", "myfile.txt"); + template.sendBodyAndHeader("direct:start", "Bar", "target", "myfile2.txt"); + + MockEndpoint.assertIsSatisfied(context); + + // there should only be 1 file endpoint + long c = context.getEndpoints().stream() + .filter(e -> e.getEndpointKey().startsWith("smb") && e.getEndpointUri().contains("?fileName=")).count(); + Assertions.assertEquals(1, c, "There should only be 1 smb endpoint"); + } + + private void prepareSmbServer() throws Exception { + // prepares the smb Server by creating a file on the server that we want + // to unit test that we can pool and store as a local file + Endpoint endpoint = context.getEndpoint(getSmbUrl()); + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody("Hello World"); + exchange.getIn().setHeader(Exchange.FILE_NAME, "myfile.txt"); + + Producer producer = endpoint.createProducer(); + producer.start(); + producer.process(exchange); + producer.stop(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start") + .poll(getSmbUrl() + "&fileName=${header.target}", 2000) + .to("mock:result"); + } + }; + } + +} diff --git a/core/camel-api/src/generated/java/org/apache/camel/spi/annotations/PollDynamic.java b/core/camel-api/src/generated/java/org/apache/camel/spi/annotations/PollDynamic.java new file mode 100644 index 00000000000..fd5325c72e4 --- /dev/null +++ b/core/camel-api/src/generated/java/org/apache/camel/spi/annotations/PollDynamic.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Target({ ElementType.TYPE }) +@ServiceFactory("poll-dynamic") +public @interface PollDynamic { + + String value(); + +} diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/PollDynamicAware.java b/core/camel-api/src/main/java/org/apache/camel/spi/PollDynamicAware.java new file mode 100644 index 00000000000..9e6b22d6cd3 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/PollDynamicAware.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.Map; + +import org.apache.camel.CamelContextAware; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Service; + +/** + * Used for components that can optimise the usage of {@link org.apache.camel.processor.PollProcessor} (poll/pollEnrich) + * to reuse a static {@link Endpoint} and {@link org.apache.camel.DynamicPollingConsumer} that supports using headers to + * provide the dynamic parts. For example many of the file based components supports this. + */ +public interface PollDynamicAware extends Service, CamelContextAware { + + /** + * Sets the component name. + * + * @param scheme name of the component + */ + void setScheme(String scheme); + + /** + * Gets the component name + */ + String getScheme(); + + /** + * Whether to traverse the given parameters, and resolve any parameter values which uses the RAW token syntax: + * <tt>key=RAW(value)</tt>. And then remove the RAW tokens, and replace the content of the value, with just the + * value. + */ + default boolean resolveRawParameterValues() { + return true; + } + + /** + * Whether the endpoint is lenient or not. + * + * @see Endpoint#isLenientProperties() + */ + boolean isLenientProperties(); + + /** + * An entry of detailed information from the recipient uri, which allows the {@link PollDynamicAware} implementation + * to prepare the static uri to be used for the optimised poll. + */ + class DynamicAwareEntry { + + private final String uri; + private final String originalUri; + private final Map<String, Object> properties; + private final Map<String, Object> lenientProperties; + + public DynamicAwareEntry(String uri, String originalUri, Map<String, Object> properties, + Map<String, Object> lenientProperties) { + this.uri = uri; + this.originalUri = originalUri; + this.properties = properties; + this.lenientProperties = lenientProperties; + } + + public String getUri() { + return uri; + } + + public String getOriginalUri() { + return originalUri; + } + + public Map<String, Object> getProperties() { + return properties; + } + + public Map<String, Object> getLenientProperties() { + return lenientProperties; + } + } + + /** + * Prepares for using optimised dynamic polling consumer by parsing the uri and returning an entry of details. + * + * @param exchange the exchange + * @param uri the resolved uri which is intended to be used + * @param originalUri the original uri of the endpoint before any dynamic evaluation + * @return prepared information about the dynamic endpoint to use + * @throws Exception is thrown if error parsing the uri + */ + DynamicAwareEntry prepare(Exchange exchange, String uri, String originalUri) throws Exception; + + /** + * Resolves the static part of the uri that are used for creating a single {@link Endpoint} and + * {@link org.apache.camel.DynamicPollingConsumer} that will be reused for processing the optimised poll/pollEnrich. + * + * @param exchange the exchange + * @param entry prepared information about the dynamic endpoint to use + * @return the static uri, or <tt>null</tt> to not let poll/pollEnrich use this optimisation. + * @throws Exception is thrown if error resolving the static uri. + */ + String resolveStaticUri(Exchange exchange, DynamicAwareEntry entry) throws Exception; + +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollDynamicAwareResolver.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollDynamicAwareResolver.java new file mode 100644 index 00000000000..dc743ca4901 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollDynamicAwareResolver.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.FactoryFinder; +import org.apache.camel.spi.PollDynamicAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PollDynamicAwareResolver { + + public static final String RESOURCE_PATH = "META-INF/services/org/apache/camel/poll-dynamic/"; + + private static final Logger LOG = LoggerFactory.getLogger(PollDynamicAwareResolver.class); + + private FactoryFinder factoryFinder; + + public PollDynamicAware resolve(CamelContext context, String scheme) { + + // use factory finder to find a custom implementations + Class<?> type = null; + try { + type = findFactory(scheme, context); + } catch (Exception e) { + // ignore + } + + if (type != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found PollDynamicAware: {} via: {}{}", type.getName(), factoryFinder.getResourcePath(), scheme); + } + if (PollDynamicAware.class.isAssignableFrom(type)) { + PollDynamicAware answer = (PollDynamicAware) context.getInjector().newInstance(type, false); + answer.setScheme(scheme); + answer.setCamelContext(context); + return answer; + } else { + throw new IllegalArgumentException("Type is not a PollDynamicAware implementation. Found: " + type.getName()); + } + } + + return null; + } + + private Class<?> findFactory(String name, CamelContext context) { + if (factoryFinder == null) { + factoryFinder = context.getCamelContextExtension().getFactoryFinder(RESOURCE_PATH); + } + return factoryFinder.findClass(name).orElse(null); + } + +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index 0e81b621b02..12cfd2db423 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -32,15 +32,14 @@ import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.PollingConsumer; -import org.apache.camel.Processor; import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.spi.ConsumerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.HeadersMapFactory; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.PollDynamicAware; import org.apache.camel.spi.RouteIdAware; -import org.apache.camel.spi.SendDynamicAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.support.DefaultConsumer; @@ -70,7 +69,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class); - private SendDynamicAware dynamicAware; + private PollDynamicAware dynamicAware; private volatile String scheme; private CamelContext camelContext; private ConsumerCache consumerCache; @@ -143,7 +142,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout this.routeId = routeId; } - public SendDynamicAware getDynamicAware() { + public PollDynamicAware getDynamicAware() { return dynamicAware; } @@ -260,8 +259,6 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // use dynamic endpoint so calculate the endpoint to use Object recipient = null; - Processor preAwareProcessor = null; - Processor postAwareProcessor = null; String staticUri = null; boolean prototype = cacheSize < 0; try { @@ -272,14 +269,12 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout String uri = resolveUri(exchange, recipient); String scheme = resolveScheme(exchange, uri); if (dynamicAware.getScheme().equals(scheme)) { - SendDynamicAware.DynamicAwareEntry entry = dynamicAware.prepare(exchange, uri, originalUri); + PollDynamicAware.DynamicAwareEntry entry = dynamicAware.prepare(exchange, uri, originalUri); if (entry != null) { staticUri = dynamicAware.resolveStaticUri(exchange, entry); - preAwareProcessor = dynamicAware.createPreProcessor(exchange, entry); - postAwareProcessor = dynamicAware.createPostProcessor(exchange, entry); if (staticUri != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Optimising toD via SendDynamicAware component: {} to use static uri: {}", scheme, + LOG.debug("Optimising poll via PollDynamicAware component: {} to use static uri: {}", scheme, URISupport.sanitizeUri(staticUri)); } } @@ -290,7 +285,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout targetRecipient = prepareRecipient(exchange, targetRecipient); if (targetRecipient == null) { if (LOG.isDebugEnabled()) { - LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint"); + LOG.debug("Poll dynamic evaluated as null so cannot poll from any endpoint"); } // no endpoint to send to, so ignore callback.done(true); @@ -321,8 +316,6 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // grab the real delegate consumer that performs the actual polling final boolean bridgeErrorHandler = isBridgeErrorHandler(consumer); - final Processor preProcessor = preAwareProcessor; - final Processor postProcessor = postAwareProcessor; DynamicPollingConsumer dynamicConsumer = null; if (consumer instanceof DynamicPollingConsumer dyn) { @@ -331,10 +324,6 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout Exchange resourceExchange; try { - if (preProcessor != null) { - preProcessor.process(exchange); - } - if (timeout < 0) { LOG.debug("Consumer receive: {}", consumer); resourceExchange = dynamicConsumer != null ? dynamicConsumer.receive(exchange) : consumer.receive(); @@ -359,13 +348,6 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout callback.done(true); return true; } finally { - try { - if (postProcessor != null) { - postProcessor.process(exchange); - } - } catch (Exception e) { - exchange.setException(e); - } // return the consumer back to the cache consumerCache.releasePollingConsumer(endpoint, consumer); // and stop prototype endpoints @@ -577,7 +559,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout try { if (scheme != null) { // find out if the component can be optimised for send-dynamic - SendDynamicAwareResolver resolver = new SendDynamicAwareResolver(); + PollDynamicAwareResolver resolver = new PollDynamicAwareResolver(); dynamicAware = resolver.resolve(camelContext, scheme); if (dynamicAware == null) { // okay fallback and try with default component name @@ -592,7 +574,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout } if (dynamicAware != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Detected SendDynamicAware component: {} optimising poll: {}", scheme, + LOG.debug("Detected PollDynamicAware component: {} optimising poll: {}", scheme, URISupport.sanitizeUri(uri)); } } @@ -601,7 +583,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // ignore if (LOG.isDebugEnabled()) { LOG.debug( - "Error creating optimised SendDynamicAwareResolver for uri: {} due to {}. This exception is ignored", + "Error creating optimised PollDynamicAwareResolver for uri: {} due to {}. This exception is ignored", URISupport.sanitizeUri(uri), e.getMessage(), e); } } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java index 9e677be2500..439a8675033 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedPollEnricherMBean.java @@ -45,6 +45,12 @@ public interface ManagedPollEnricherMBean extends ManagedProcessorMBean, Managed @ManagedAttribute(description = "Whether to aggregate when there was an exception thrown during calling the resource endpoint") Boolean isAggregateOnException(); + @ManagedAttribute(description = "Whether to allow components to optimise poll if they are PollDynamicAware") + Boolean isAllowOptimisedComponents(); + + @ManagedAttribute(description = "Whether an optimised component (PollDynamicAware) is in use") + Boolean isOptimised(); + @Override @ManagedOperation(description = "Statistics of the endpoints that has been poll enriched from") TabularData extendedInformation(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java index ab6b19f602d..db939964dc0 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java @@ -114,6 +114,16 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll return processor.isAggregateOnException(); } + @Override + public Boolean isAllowOptimisedComponents() { + return processor.isAllowOptimisedComponents(); + } + + @Override + public Boolean isOptimised() { + return processor.getDynamicAware() != null; + } + @Override public TabularData extendedInformation() { try { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/component/PollDynamicAwareSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/component/PollDynamicAwareSupport.java new file mode 100644 index 00000000000..6b8dfe9dda4 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/component/PollDynamicAwareSupport.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.component; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.spi.EndpointUriFactory; +import org.apache.camel.spi.PollDynamicAware; +import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.StringHelper; +import org.apache.camel.util.URISupport; + +/** + * Support class for {@link PollDynamicAware} implementations. + */ +public abstract class PollDynamicAwareSupport extends ServiceSupport implements PollDynamicAware { + + private CamelContext camelContext; + private Set<String> knownProperties; + private Set<String> knownPrefixes; + private String scheme; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public void setScheme(String scheme) { + this.scheme = scheme; + } + + @Override + public String getScheme() { + return scheme; + } + + @Override + public boolean resolveRawParameterValues() { + return true; + } + + @Override + protected void doInit() throws Exception { + if (knownProperties == null || knownPrefixes == null) { + // optimize to eager load the list of known properties/prefixes + EndpointUriFactory factory = getCamelContext().getCamelContextExtension().getEndpointUriFactory(getScheme()); + if (factory == null) { + throw new IllegalStateException("Cannot find EndpointUriFactory for component: " + getScheme()); + } + knownProperties = factory.propertyNames(); + knownPrefixes = factory.multiValuePrefixes(); + } + } + + public Map<String, Object> endpointProperties(Exchange exchange, String uri) throws Exception { + Map<String, Object> properties; + // optimize as we know its only query parameters that can be dynamic + Map<String, Object> map = URISupport.parseQuery(URISupport.extractQuery(uri)); + if (map != null && !map.isEmpty() && isLenientProperties()) { + if (resolveRawParameterValues()) { + // parameters using raw syntax: RAW(value) + // should have the token removed, so its only the value we have in parameters, as we are about to create + // an endpoint and want to have the parameter values without the RAW tokens + RawParameterHelper.resolveRawParameterValues(exchange.getContext(), map); + } + // okay so only add the known properties as they are the non lenient properties + properties = new LinkedHashMap<>(); + map.forEach((k, v) -> { + boolean accept = knownProperties.contains(k); + // we should put the key from a multi-value (prefix) in the + // properties too, or the property may be lost + if (!accept && !knownPrefixes.isEmpty()) { + accept = knownPrefixes.stream().anyMatch(k::startsWith); + } + if (accept) { + properties.put(k, v); + } + }); + } else { + properties = map; + } + + return properties; + } + + public Map<String, Object> endpointLenientProperties(Exchange exchange, String uri) throws Exception { + Map<String, Object> properties; + // optimize as we know its only query parameters that can be dynamic + Map<String, Object> map = URISupport.parseQuery(URISupport.extractQuery(uri)); + if (map != null && !map.isEmpty()) { + if (resolveRawParameterValues()) { + // parameters using raw syntax: RAW(value) + // should have the token removed, so its only the value we have in parameters, as we are about to create + // an endpoint and want to have the parameter values without the RAW tokens + RawParameterHelper.resolveRawParameterValues(exchange.getContext(), map); + } + properties = new LinkedHashMap<>(); + map.forEach((k, v) -> { + // we only accept if the key is not an existing known property + // or that the key is not from a multi-value (prefix) + boolean accept = !knownProperties.contains(k); + if (accept && !knownPrefixes.isEmpty()) { + accept = knownPrefixes.stream().noneMatch(k::startsWith); + } + if (accept) { + properties.put(k, v.toString()); + } + }); + } else { + properties = map; + } + return properties; + } + + public String asEndpointUri(Exchange exchange, String uri, Map<String, Object> properties) throws Exception { + String query = URISupport.createQueryString(properties, false); + return StringHelper.before(uri, "?", uri) + "?" + query; + } + +} diff --git a/tooling/spi-annotations/src/main/java/org/apache/camel/spi/annotations/PollDynamic.java b/tooling/spi-annotations/src/main/java/org/apache/camel/spi/annotations/PollDynamic.java new file mode 100644 index 00000000000..fd5325c72e4 --- /dev/null +++ b/tooling/spi-annotations/src/main/java/org/apache/camel/spi/annotations/PollDynamic.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Target({ ElementType.TYPE }) +@ServiceFactory("poll-dynamic") +public @interface PollDynamic { + + String value(); + +}
