Updated Branches: refs/heads/master 23d9d028f -> 4c3d1526c
CAMEL-6028 added CamelFileName Expression support to camel-hdfs producer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4c3d1526 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4c3d1526 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4c3d1526 Branch: refs/heads/master Commit: 4c3d1526c9d285e138a16b9fd275171f4255ec65 Parents: 23d9d02 Author: boday <[email protected]> Authored: Thu Oct 17 09:41:05 2013 -0700 Committer: boday <[email protected]> Committed: Thu Oct 17 09:41:05 2013 -0700 ---------------------------------------------------------------------- .../camel/component/hdfs/HdfsProducer.java | 21 ++++++++++++++-- .../camel/component/hdfs/HdfsProducerTest.java | 25 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4c3d1526/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java index 11a73e0..8ac00eb 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.login.Configuration; import org.apache.camel.Exchange; +import org.apache.camel.Expression; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.IOHelper; @@ -185,8 +186,7 @@ public class HdfsProducer extends DefaultProducer { if (ostream != null) { IOHelper.close(ostream, "output stream", log); } - StringBuilder actualPath = new StringBuilder(hdfsPath); - actualPath.append(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class)); + StringBuilder actualPath = getHdfsPathUsingFileNameHeader(exchange); ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config); } else if (ostream == null) { // must have ostream @@ -235,6 +235,23 @@ public class HdfsProducer extends DefaultProducer { log.debug("Wrote body to hdfs-file {}", path); } + /** + * helper method to construct the hdfsPath from the CamelFileName String or Expression + * @param exchange + * @return + */ + private StringBuilder getHdfsPathUsingFileNameHeader(Exchange exchange) { + StringBuilder actualPath = new StringBuilder(hdfsPath); + String fileName = ""; + Object value = exchange.getIn().getHeader(Exchange.FILE_NAME); + if (value instanceof String) { + fileName = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); + } else if (value instanceof Expression) { + fileName = ((Expression) value).evaluate(exchange, String.class); + } + return actualPath.append(fileName); + } + private StringBuilder newFileName() { StringBuilder actualPath = new StringBuilder(hdfsPath); actualPath.append(splitNum); http://git-wip-us.apache.org/repos/asf/camel/blob/4c3d1526/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java index 937958b..a219190 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java @@ -44,6 +44,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.junit.Before; import org.junit.Test; +import static org.apache.camel.language.simple.SimpleLanguage.simple; + public class HdfsProducerTest extends HdfsTestSupport { private static final Path TEMP_DIR = new Path(new File("target/test/").getAbsolutePath()); @@ -357,6 +359,29 @@ public class HdfsProducerTest extends HdfsTestSupport { } } + @Test + public void testWriteTextWithDynamicFilenameExpression() throws Exception { + if (!canTest()) { + return; + } + + for (int i = 0; i < 5; i++) { + template.sendBodyAndHeader("direct:write_dynamic_filename", "CIAO" + i, Exchange.FILE_NAME, simple("file-${body}")); + } + + for (int i = 0; i < 5; i++) { + InputStream in = null; + try { + in = new URL("file:///" + TEMP_DIR.toUri() + "/test-camel-dynamic/file-CIAO" + i).openStream(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + IOUtils.copyBytes(in, bos, 4096, false); + assertEquals("CIAO" + i, new String(bos.toByteArray())); + } finally { + IOHelper.close(in); + } + } + } + @Override public void tearDown() throws Exception { if (!canTest()) {
