Repository: camel Updated Branches: refs/heads/master 24c92d464 -> 2d4194852
CAMEL-10228: camel-sql - Preserver attachments Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d419485 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d419485 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d419485 Branch: refs/heads/master Commit: 2d4194852f1ca11c5376d7c6843ff4962060198d Parents: 24c92d4 Author: Arno Noordover <[email protected]> Authored: Sat Aug 20 22:20:04 2016 +0200 Committer: Arno Noordover <[email protected]> Committed: Sat Aug 20 22:20:04 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/component/sql/SqlProducer.java | 4 + .../sql/SqlProducerOutputAttachment.java | 96 ++++++++++++++++++++ 2 files changed, 100 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2d419485/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java index f369234..2467335 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java @@ -162,6 +162,7 @@ public class SqlProducer extends DefaultProducer { if (isResultSet) { // preserve headers first, so we can override the SQL_ROW_COUNT header exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments()); rs = ps.getResultSet(); SqlOutputType outputType = getEndpoint().getOutputType(); @@ -211,6 +212,7 @@ public class SqlProducer extends DefaultProducer { // if no OUT message yet then create one and propagate headers if (!exchange.hasOut()) { exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments()); } if (isResultSet) { @@ -282,6 +284,8 @@ public class SqlProducer extends DefaultProducer { ResultSetIterator iterator = getEndpoint().queryForStreamList(con, ps, rs); //pass through all headers exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments()); + if (getEndpoint().isNoop()) { exchange.getOut().setBody(exchange.getIn().getBody()); } else if (getEndpoint().getOutputHeader() != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/2d419485/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputAttachment.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputAttachment.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputAttachment.java new file mode 100644 index 0000000..5b128fa --- /dev/null +++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputAttachment.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sql; + +import java.net.MalformedURLException; +import java.net.URL; +import javax.activation.DataHandler; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.notNullValue; + +public class SqlProducerOutputAttachment extends CamelTestSupport { + + private EmbeddedDatabase db; + + @Before + public void setUp() throws Exception { + db = new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build(); + + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + + db.shutdown(); + } + + @Test + public void testHeaderAndAttachmentAreAvailableAfterProducer() + throws InterruptedException, MalformedURLException { + MockEndpoint mock = getMockEndpoint("mock:query"); + DataHandler content = new DataHandler(new URL("http://www.nu.nl")); + + mock.expectedMessageCount(1); + mock.expectedHeaderReceived(SqlConstants.SQL_ROW_COUNT, 1); + mock.expectedHeaderReceived("TheProjectID", 1); + mock.expectedHeaderReceived("maintain", "this"); + mock.expects(() -> { + assertThat(mock.getReceivedExchanges().get(0).getIn().getAttachments().size(), is(1)); + assertThat(mock.getReceivedExchanges().get(0).getIn().getAttachment("att1"), notNullValue()); + assertThat(mock.getReceivedExchanges().get(0).getIn().getAttachment("att1"), is(content)); + }); + mock.message(0).body().isEqualTo("Hi there!"); + + Exchange exchange = context.getEndpoint("direct:query").createExchange(); + exchange.getIn().setBody("Hi there!"); + exchange.getIn().setHeader("myProject", "Camel"); + exchange.getIn().setHeader("maintain", "this"); + exchange.getIn().addAttachment("att1", content); + template.send("direct:query", exchange); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // required for the sql component + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + + from("direct:query") + .to("sql:select id from projects where project = :#myProject?outputType=SelectOne&outputHeader=TheProjectID").to("mock:query"); + } + }; + } +}
