Repository: camel
Updated Branches:
  refs/heads/master a49627216 -> 4233318d9


http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
new file mode 100644
index 0000000..6b1d81d
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import com.surftools.BeanstalkClient.Client;
+import org.junit.Before;
+
+public class ConnectionSettingsTest {
+    @Test
+    public void parseUriTest() {
+        final ConnectionSettingsFactory factory = 
BeanstalkComponent.connFactory;
+        assertEquals("Full URI", new ConnectionSettings("host.domain.tld", 
11300, "someTube"), factory.parseUri("host.domain.tld:11300/someTube"));
+        assertEquals("No port", new ConnectionSettings("host.domain.tld", 
Client.DEFAULT_PORT, "someTube"), factory.parseUri("host.domain.tld/someTube"));
+        assertEquals("Only tube", new ConnectionSettings(Client.DEFAULT_HOST, 
Client.DEFAULT_PORT, "someTube"), factory.parseUri("someTube"));
+    }
+
+    @Test
+    public void parseTubesTest() {
+        final ConnectionSettingsFactory factory = 
BeanstalkComponent.connFactory;
+        assertArrayEquals("Full URI", new String[] {"tube1", "tube2"}, 
factory.parseUri("host:90/tube1+tube2").tubes);
+        assertArrayEquals("No port", new String[] {"tube1", "tube2"}, 
factory.parseUri("host/tube1+tube2").tubes);
+        assertArrayEquals("Only tubes", new String[] {"tube1", "tube2"}, 
factory.parseUri("tube1+tube2").tubes);
+        assertArrayEquals("Empty URI", new String[0], 
factory.parseUri("").tubes);
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void notValidHost() {
+        final ConnectionSettingsFactory factory = 
BeanstalkComponent.connFactory;
+        fail(String.format("Calling on not valid URI must raise exception, but 
got result %s", factory.parseUri("not_valid?host/tube?")));
+    }
+
+    @Before
+    public void setUp() {
+        BeanstalkComponent.connFactory = new ConnectionSettingsFactory();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
new file mode 100644
index 0000000..14a0955
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.BeanstalkException;
+import com.surftools.BeanstalkClient.Job;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+public class ConsumerCompletionTest extends BeanstalkMockTestSupport {
+    final String testMessage = "hello, world";
+
+    boolean shouldIdie = false;
+    final Processor processor = new Processor() {
+        @Override
+        public void process(Exchange exchange) throws InterruptedException {
+            if (shouldIdie) throw new InterruptedException("die");
+        }
+    };
+
+    @Test
+    public void testDeleteOnComplete() throws Exception {
+        final long jobId = 111;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+        final Job jobMock = mock(Job.class);
+
+        when(jobMock.getJobId()).thenReturn(jobId);
+        when(jobMock.getData()).thenReturn(payload);
+        when(client.reserve(anyInt()))
+                .thenReturn(jobMock)
+                .thenReturn(null);
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMinimumMessageCount(1);
+        result.expectedBodiesReceived(testMessage);
+        result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+        result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+        result.assertIsSatisfied(2000);
+
+        verify(client, atLeastOnce()).reserve(anyInt());
+        verify(client).delete(jobId);
+    }
+
+    @Test
+    public void testReleaseOnFailure() throws Exception {
+        shouldIdie = true;
+        final long jobId = 111;
+        final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+        final int delay = BeanstalkComponent.DEFAULT_DELAY;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+        final Job jobMock = mock(Job.class);
+
+        when(jobMock.getJobId()).thenReturn(jobId);
+        when(jobMock.getData()).thenReturn(payload);
+        when(client.reserve(anyInt()))
+                .thenReturn(jobMock)
+                .thenReturn(null);
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMinimumMessageCount(1);
+        result.assertIsNotSatisfied(1000);
+
+        verify(client, atLeastOnce()).reserve(anyInt());
+        verify(client).release(jobId, priority, delay);
+    }
+
+    @Test
+    public void testBeanstalkException() throws Exception {
+        shouldIdie = false;
+        final Job jobMock = mock(Job.class);
+        final long jobId = 111;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+
+        when(jobMock.getJobId()).thenReturn(jobId);
+        when(jobMock.getData()).thenReturn(payload);
+        when(client.reserve(anyInt()))
+                .thenThrow(new BeanstalkException("test"))
+                .thenReturn(jobMock);
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        result.expectedBodiesReceived(testMessage);
+        result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+        result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+        result.assertIsSatisfied(100);
+
+        verify(client, atLeast(1)).reserve(anyInt());
+        verify(client, times(1)).close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("beanstalk:tube?consumer.onFailure=release").process(processor).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
new file mode 100644
index 0000000..903f272
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+
+public class EndpointTest {
+    CamelContext context = null;
+
+    @Before
+    public void setUp() throws Exception {
+        context = new DefaultCamelContext();
+        context.disableJMX();
+        context.start();
+    }
+
+    @Test
+    public void testPriority() {
+        BeanstalkEndpoint endpoint = 
context.getEndpoint("beanstalk:default?jobPriority=1000", 
BeanstalkEndpoint.class);
+        assertNotNull("Beanstalk endpoint", endpoint);
+        assertEquals("Priority", 1000, endpoint.getJobPriority());
+    }
+
+    @Test
+    public void testTimeToRun() {
+        BeanstalkEndpoint endpoint = 
context.getEndpoint("beanstalk:default?jobTimeToRun=10", 
BeanstalkEndpoint.class);
+        assertNotNull("Beanstalk endpoint", endpoint);
+        assertEquals("Time to run", 10, endpoint.getJobTimeToRun());
+    }
+
+    @Test
+    public void testDelay() {
+        BeanstalkEndpoint endpoint = 
context.getEndpoint("beanstalk:default?jobDelay=10", BeanstalkEndpoint.class);
+        assertNotNull("Beanstalk endpoint", endpoint);
+        assertEquals("Delay", 10, endpoint.getJobDelay());
+    }
+
+    @Test
+    public void testCommand() {
+        BeanstalkEndpoint endpoint = 
context.getEndpoint("beanstalk:default?command=release", 
BeanstalkEndpoint.class);
+        assertNotNull("Beanstalk endpoint", endpoint);
+        assertEquals("Command", BeanstalkComponent.COMMAND_RELEASE, 
endpoint.command);
+    }
+
+    @Test
+    public void testTubes() {
+        BeanstalkEndpoint endpoint = 
context.getEndpoint("beanstalk:host:11303/tube1+tube%2B+tube%3F?command=kick", 
BeanstalkEndpoint.class);
+        assertNotNull("Beanstalk endpoint", endpoint);
+        assertEquals("Command", BeanstalkComponent.COMMAND_KICK, 
endpoint.command);
+        assertEquals("Host", "host", endpoint.conn.host);
+        assertArrayEquals("Tubes", new String[] {"tube1", "tube+", "tube?"}, 
endpoint.conn.tubes);
+    }
+
+    @Test(expected=FailedToCreateProducerException.class)
+    public void testWrongCommand() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start").to("beanstalk:default?command=noCommand");
+            }
+        });
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        context.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
new file mode 100644
index 0000000..3ef5cb9
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.Client;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.camel.CamelContext;
+
+public final class Helper {
+    public static ConnectionSettings mockConn(final Client client) {
+        return new MockConnectionSettings(client);
+    }
+
+    public static void mockComponent(final Client client) {
+        BeanstalkComponent.setConnectionSettingsFactory(new 
ConnectionSettingsFactory() {
+            @Override
+            public ConnectionSettings parseUri(String uri) {
+                return new MockConnectionSettings(client);
+            }
+        });
+    }
+
+    public static void revertComponent() {
+        
BeanstalkComponent.setConnectionSettingsFactory(ConnectionSettingsFactory.DEFAULT);
+    }
+
+    public static BeanstalkEndpoint getEndpoint(String uri, CamelContext 
context, Client client) throws Exception {
+        BeanstalkEndpoint endpoint = new BeanstalkEndpoint(uri, 
context.getComponent("beanstalk"), mockConn(client));
+        context.addEndpoint(uri, endpoint);
+        return endpoint;
+    }
+
+    public static byte[] stringToBytes(final String s) throws IOException {
+        final ByteArrayOutputStream byteOS = new ByteArrayOutputStream();
+        final DataOutputStream dataStream = new DataOutputStream(byteOS);
+
+        try {
+            dataStream.writeBytes(s);
+            dataStream.flush();
+            return byteOS.toByteArray();
+        } finally {
+            dataStream.close();
+            byteOS.close();
+        }
+    }
+}
+
+class MockConnectionSettings extends ConnectionSettings {
+    final Client client;
+
+    public MockConnectionSettings(Client client) {
+        super("tube");
+        this.client = client;
+    }
+
+    @Override
+    public Client newReadingClient(boolean useBlockIO) {
+        return client;
+    }
+
+    @Override
+    public Client newWritingClient() {
+        return client;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
new file mode 100644
index 0000000..7a5a296
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.Job;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class ImmediateConsumerTest extends BeanstalkMockTestSupport {
+    final String testMessage = "hello, world";
+
+    boolean shouldIdie = false;
+    final Processor processor = new Processor() {
+        @Override
+        public void process(Exchange exchange) throws InterruptedException {
+            if (shouldIdie) throw new InterruptedException("die");
+        }
+    };
+
+    @Test
+    public void testDeleteOnSuccess() throws Exception {
+        final Job jobMock = mock(Job.class);
+        final long jobId = 111;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+
+        when(jobMock.getJobId()).thenReturn(jobId);
+        when(jobMock.getData()).thenReturn(payload);
+        when(client.reserve(anyInt()))
+            .thenReturn(jobMock)
+            .thenReturn(null);
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        result.expectedBodiesReceived(testMessage);
+        result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+        result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+        result.assertIsSatisfied(100);
+
+        verify(client, atLeast(1)).reserve(0);
+        verify(client, atLeast(1)).delete(jobId);
+    }
+
+    @Test
+    public void testDeleteOnFailure() throws Exception {
+        shouldIdie = true;
+        final long jobId = 111;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+        final Job jobMock = mock(Job.class);
+
+        when(jobMock.getJobId()).thenReturn(jobId);
+        when(jobMock.getData()).thenReturn(payload);
+        when(client.reserve(anyInt()))
+                .thenReturn(jobMock)
+                .thenReturn(null);
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMinimumMessageCount(1);
+        result.assertIsNotSatisfied(1000);
+
+        verify(client, atLeastOnce()).reserve(anyInt());
+        verify(client, atLeast(1)).delete(jobId);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("beanstalk:tube?consumer.awaitJob=false").process(processor).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
new file mode 100644
index 0000000..e3949a2
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.BeanstalkException;
+import org.apache.camel.component.beanstalk.processors.*;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.Producer;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.mockito.Mockito.*;
+
+public class ProducerTest extends BeanstalkMockTestSupport {
+    final String testMessage = "hello, world";
+
+    @EndpointInject(uri = "beanstalk:tube")
+    protected BeanstalkEndpoint endpoint;
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate direct;
+
+    @Test
+    public void testPut() throws Exception {
+        final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+        final int delay = BeanstalkComponent.DEFAULT_DELAY;
+        final int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+        final long jobId = 111;
+
+        when(client.put(priority, delay, timeToRun, 
payload)).thenReturn(jobId);
+
+        final Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(PutCommand.class));
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() { // TODO: SetBodyProcessor(?)
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody(testMessage);
+            }
+        });
+
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).put(priority, delay, timeToRun, payload);
+    }
+
+    @Test
+    public void testPutOut() throws Exception {
+        final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+        final int delay = BeanstalkComponent.DEFAULT_DELAY;
+        final int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+        final long jobId = 111;
+
+        when(client.put(priority, delay, timeToRun, 
payload)).thenReturn(jobId);
+
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(PutCommand.class));
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOut, new Processor() { // TODO: SetBodyProcessor(?)
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody(testMessage);
+            }
+        });
+
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getOut().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).put(priority, delay, timeToRun, payload);
+    }
+
+    @Test
+    public void testPutWithHeaders() throws Exception {
+        final long priority = 111;
+        final int delay = 5;
+        final int timeToRun = 65;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+        final long jobId = 111;
+
+        when(client.put(priority, delay, timeToRun, 
payload)).thenReturn(jobId);
+
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(PutCommand.class));
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() { // TODO: SetBodyProcessor(?)
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(Headers.PRIORITY, priority);
+                exchange.getIn().setHeader(Headers.DELAY, delay);
+                exchange.getIn().setHeader(Headers.TIME_TO_RUN, timeToRun);
+                exchange.getIn().setBody(testMessage);
+            }
+        });
+
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).put(priority, delay, timeToRun, payload);
+    }
+
+    @Test
+    public void testBury() throws Exception {
+        final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+        final long jobId = 111;
+
+        endpoint.setCommand(BeanstalkComponent.COMMAND_BURY);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(BuryCommand.class));
+
+        when(client.bury(jobId, priority)).thenReturn(true);
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+            }
+        });
+
+        assertEquals("Op result", Boolean.TRUE, 
exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).bury(jobId, priority);
+    }
+
+    @Test
+    public void testBuryNoJobId() throws Exception {
+        endpoint.setCommand(BeanstalkComponent.COMMAND_BURY);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(BuryCommand.class));
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {}
+        });
+
+        assertTrue("Exchange failed", exchange.isFailed());
+
+        verify(client, never()).bury(anyLong(), anyLong());
+    }
+
+    @Test
+    public void testBuryWithHeaders() throws Exception {
+        final long priority = 1000;
+        final long jobId = 111;
+
+        endpoint.setCommand(BeanstalkComponent.COMMAND_BURY);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(BuryCommand.class));
+
+        when(client.bury(jobId, priority)).thenReturn(true);
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(Headers.PRIORITY, priority);
+                exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+            }
+        });
+
+        assertEquals("Op result", Boolean.TRUE, 
exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).bury(jobId, priority);
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        final long jobId = 111;
+
+        endpoint.setCommand(BeanstalkComponent.COMMAND_DELETE);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(DeleteCommand.class));
+
+        when(client.delete(jobId)).thenReturn(true);
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+            }
+        });
+
+        assertEquals("Op result", Boolean.TRUE, 
exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).delete(jobId);
+    }
+
+    @Test
+    public void testDeleteNoJobId() throws Exception {
+        endpoint.setCommand(BeanstalkComponent.COMMAND_DELETE);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(DeleteCommand.class));
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {}
+        });
+
+        assertTrue("Exchange failed", exchange.isFailed());
+
+        verify(client, never()).delete(anyLong());
+    }
+
+    @Test
+    public void testRelease() throws Exception {
+        final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+        final int delay = BeanstalkComponent.DEFAULT_DELAY;
+        final long jobId = 111;
+
+        endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(ReleaseCommand.class));
+
+        when(client.release(jobId, priority, delay)).thenReturn(true);
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+            }
+        });
+
+        assertEquals("Op result", Boolean.TRUE, 
exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).release(jobId, priority, delay);
+    }
+
+    @Test
+    public void testReleaseNoJobId() throws Exception {
+        endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(ReleaseCommand.class));
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {}
+        });
+
+        assertTrue("Exchange failed", exchange.isFailed());
+
+        verify(client, never()).release(anyLong(), anyLong(), anyInt());
+    }
+
+    @Test
+    public void testReleaseWithHeaders() throws Exception {
+        final long priority = 1001;
+        final int delay = 124;
+        final long jobId = 111;
+
+        endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(ReleaseCommand.class));
+
+        when(client.release(jobId, priority, delay)).thenReturn(true);
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+                exchange.getIn().setHeader(Headers.PRIORITY, priority);
+                exchange.getIn().setHeader(Headers.DELAY, delay);
+            }
+        });
+
+        assertEquals("Op result", Boolean.TRUE, 
exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).release(jobId, priority, delay);
+    }
+
+    @Test
+    public void testTouch() throws Exception {
+        final long jobId = 111;
+
+        endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(TouchCommand.class));
+
+        when(client.touch(jobId)).thenReturn(true);
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+            }
+        });
+
+        assertEquals("Op result", Boolean.TRUE, 
exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+        assertEquals("Job ID in exchange", Long.valueOf(jobId), 
exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+        verify(client).touch(jobId);
+    }
+
+    @Test
+    public void testTouchNoJobId() throws Exception {
+        endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH);
+        Producer producer = endpoint.createProducer();
+        assertNotNull("Producer", producer);
+        assertThat("Producer class", producer, 
instanceOf(BeanstalkProducer.class));
+        assertThat("Processor class", ((BeanstalkProducer)producer).command, 
instanceOf(TouchCommand.class));
+
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {}
+        });
+
+        assertTrue("Exchange failed", exchange.isFailed());
+
+        verify(client, never()).touch(anyLong());
+    }
+
+    @Test
+    public void testHeaderOverride() throws Exception {
+        final long priority = 1020;
+        final int delay = 50;
+        final int timeToRun = 75;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+        final long jobId = 113;
+
+        when(client.put(priority, delay, timeToRun, 
payload)).thenReturn(jobId);
+
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.allMessages().body().isEqualTo(testMessage);
+        
resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
+
+        direct.sendBodyAndHeader(testMessage, Headers.TIME_TO_RUN, timeToRun);
+        resultEndpoint.assertIsSatisfied();
+
+        final Long jobIdIn = 
resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, 
Long.class);
+        assertNotNull("Job ID in 'In' message", jobIdIn);
+
+        verify(client).put(priority, delay, timeToRun, payload);
+    }
+
+    @Test
+    public void test1BeanstalkException() throws Exception {
+        final long priority = 1020;
+        final int delay = 50;
+        final int timeToRun = 75;
+        final byte[] payload = Helper.stringToBytes(testMessage);
+        final long jobId = 113;
+
+        when(client.put(priority, delay, timeToRun, payload))
+            .thenThrow(new BeanstalkException("test"))
+            .thenReturn(jobId);
+
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.allMessages().body().isEqualTo(testMessage);
+        
resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
+
+        direct.sendBodyAndHeader(testMessage, Headers.TIME_TO_RUN, timeToRun);
+        resultEndpoint.assertIsSatisfied();
+
+        final Long jobIdIn = 
resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, 
Long.class);
+        assertNotNull("Job ID in 'In' message", jobIdIn);
+
+        verify(client, times(1)).close();
+        verify(client, times(2)).put(priority, delay, timeToRun, payload);
+    }
+
+    @Test
+    public void test2BeanstalkException() throws Exception {
+        final long jobId = 111;
+
+        when(client.touch(jobId))
+            .thenThrow(new BeanstalkException("test"));
+
+        endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH);
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+            }
+        });
+
+        assertTrue("Exchange failed", exchange.isFailed());
+
+        verify(client, times(2)).touch(jobId);
+        verify(client, times(1)).close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct:start").to("beanstalk:tube?jobPriority=1020&jobDelay=50&jobTimeToRun=65").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
new file mode 100644
index 0000000..fb87c9a
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.ConnectionSettings;
+import org.apache.camel.component.beanstalk.ConnectionSettingsFactory;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+
+public abstract class BeanstalkCamelTestSupport extends CamelTestSupport {
+    final ConnectionSettingsFactory connFactory = 
ConnectionSettingsFactory.DEFAULT;
+    final String tubeName = String.format("test%d", 
System.currentTimeMillis());
+
+    protected Client reader = null;
+    protected Client writer = null;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        ConnectionSettings conn = connFactory.parseUri(tubeName);
+        writer = conn.newWritingClient();
+        reader = conn.newReadingClient(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
new file mode 100644
index 0000000..033955e
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class BuryProducerIntegrationTest extends BeanstalkCamelTestSupport {
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate direct;
+
+    @Ignore("requires reserve - bury sequence")
+    @Test
+    public void testBury() throws InterruptedException, IOException {
+        long jobId = writer.put(0, 0, 5, new byte[0]);
+        assertTrue("Valid Job Id", jobId > 0);
+
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+        resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true);
+        direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId);
+
+        assertMockEndpointsSatisfied();
+
+        final Long messageJobId = 
resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, 
Long.class);
+        assertNotNull("Job ID in message", messageJobId);
+        assertEquals("Message Job ID equals", jobId, messageJobId.longValue());
+
+        final Job job = reader.reserve(0);
+        assertNull("Beanstalk client has no message", job);
+
+        final Job buried = reader.peekBuried();
+        assertNotNull("Job in buried", buried);
+        assertEquals("Buried job id", jobId, buried.getJobId());
+    }
+
+    @Test(expected=CamelExecutionException.class)
+    public void testNoJobId() throws InterruptedException, IOException {
+        resultEndpoint.expectedMessageCount(0);
+        direct.sendBody(new byte[0]);
+
+        resultEndpoint.assertIsSatisfied();
+        assertListSize("Number of exceptions", resultEndpoint.getFailures(), 
1);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct:start").to("beanstalk:"+tubeName+"?command=bury").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
new file mode 100644
index 0000000..6e2b4de
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import org.apache.camel.component.beanstalk.Helper;
+import java.io.IOException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ConsumerIntegrationTest extends BeanstalkCamelTestSupport {
+    final String testMessage = "Hello, world!";
+
+    @EndpointInject(uri = "mock:result")
+    MockEndpoint result;
+
+    @Test
+    public void testReceive() throws IOException, InterruptedException {
+        long PRIO = 0;
+        int TTR = 10;
+        final long jobId = writer.put(PRIO, 0, TTR, 
Helper.stringToBytes(testMessage));
+
+        result.expectedMessageCount(1);
+        result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+        result.message(0).header(Exchange.CREATED_TIMESTAMP).isNotNull();
+        
result.message(0).header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
+        
result.message(0).header(Headers.PRIORITY).isEqualTo(Long.valueOf(PRIO));
+        result.message(0).header(Headers.TUBE).isEqualTo(tubeName);
+        result.message(0).header(Headers.STATE).isEqualTo("reserved");
+        result.message(0).header(Headers.AGE).isGreaterThan(0);
+        result.message(0).header(Headers.TIME_LEFT).isGreaterThan(0);
+        result.message(0).header(Headers.TIMEOUTS).isNotNull();
+        result.message(0).header(Headers.RELEASES).isNotNull();
+        result.message(0).header(Headers.BURIES).isNotNull();
+        result.message(0).header(Headers.KICKS).isNotNull();
+        result.message(0).body().isEqualTo(testMessage);
+        result.assertIsSatisfied(500);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("beanstalk:"+tubeName).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
new file mode 100644
index 0000000..9d9dc36
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class DeleteProducerIntegrationTest extends BeanstalkCamelTestSupport {
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate direct;
+
+    @Test
+    public void testDelete() throws InterruptedException, IOException {
+        long jobId = writer.put(0, 0, 5, new byte[0]);
+        assertTrue("Valid Job Id", jobId > 0);
+
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+        resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true);
+        direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId);
+
+        assertMockEndpointsSatisfied();
+
+        final Long messageJobId = 
resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, 
Long.class);
+        assertNotNull("Job ID in message", messageJobId);
+        assertEquals("Message Job ID equals", jobId, messageJobId.longValue());
+
+        final Job job = reader.peek(jobId);
+        assertNull("Job has been deleted", job);
+    }
+
+    @Test(expected=CamelExecutionException.class)
+    public void testNoJobId() throws InterruptedException, IOException {
+        resultEndpoint.expectedMessageCount(0);
+        direct.sendBody(new byte[0]);
+
+        resultEndpoint.assertIsSatisfied();
+        assertListSize("Number of exceptions", resultEndpoint.getFailures(), 
1);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct:start").to("beanstalk:"+tubeName+"?command=delete").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
new file mode 100644
index 0000000..ff96d3e
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Produce;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport {
+    final String testMessage = "Hello, world!";
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate direct;
+
+    @Test
+    public void testPut() throws InterruptedException, IOException {
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+        direct.sendBody(testMessage);
+
+        resultEndpoint.assertIsSatisfied();
+
+        final Long jobId = 
resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, 
Long.class);
+        assertNotNull("Job ID in 'In' message", jobId);
+
+        final Job job = reader.reserve(5);
+        assertNotNull("Beanstalk client got message", job);
+        assertEquals("Job body from the server", testMessage, new 
String(job.getData()));
+        assertEquals("Job ID from the server", jobId.longValue(), 
job.getJobId());
+        reader.delete(jobId.longValue());
+    }
+
+    @Test
+    public void testOut() throws InterruptedException, IOException {
+        final Endpoint endpoint = context.getEndpoint("beanstalk:"+tubeName);
+        final Exchange exchange = template.send(endpoint, 
ExchangePattern.InOut, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody(testMessage);
+            }
+        });
+
+        final Message out = exchange.getOut();
+        assertNotNull("Out message", out);
+
+        final Long jobId = out.getHeader(Headers.JOB_ID, Long.class);
+        assertNotNull("Job ID in 'Out' message", jobId);
+
+        final Job job = reader.reserve(5);
+        assertNotNull("Beanstalk client got message", job);
+        assertEquals("Job body from the server", testMessage, new 
String(job.getData()));
+        assertEquals("Job ID from the server", jobId.longValue(), 
job.getJobId());
+        reader.delete(jobId.longValue());
+    }
+
+    @Test
+    public void testDelay() throws InterruptedException, IOException {
+        final byte[] testBytes = new byte[0];
+
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+        resultEndpoint.expectedBodiesReceived(testBytes);
+        direct.sendBodyAndHeader(testBytes, Headers.DELAY, 10);
+
+        resultEndpoint.assertIsSatisfied();
+
+        final Long jobId = 
resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, 
Long.class);
+        assertNotNull("Job ID in message", jobId);
+
+        final Job job = reader.reserve(0);
+        assertNull("Beanstalk client has no message", job);
+        reader.delete(jobId.longValue());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct:start").to("beanstalk:"+tubeName+"?jobPriority=1000&jobTimeToRun=5").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
new file mode 100644
index 0000000..c77dc32
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class ReleaseProducerIntegrationTest extends BeanstalkCamelTestSupport {
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate direct;
+
+    @Ignore("requires reserve - release sequence")
+    @Test
+    public void testBury() throws InterruptedException, IOException {
+        long jobId = writer.put(0, 0, 5, new byte[0]);
+        assertTrue("Valid Job Id", jobId > 0);
+
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+        resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true);
+        direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId);
+
+        assertMockEndpointsSatisfied();
+
+        final Long messageJobId = 
resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, 
Long.class);
+        assertNotNull("Job ID in message", messageJobId);
+        assertEquals("Message Job ID equals", jobId, messageJobId.longValue());
+
+        final Job job = reader.reserve(0);
+        assertNull("Beanstalk client has no message", job);
+
+        final Job buried = reader.peekBuried();
+        assertNotNull("Job in buried", buried);
+        assertEquals("Buried job id", jobId, buried.getJobId());
+    }
+
+    @Test(expected=CamelExecutionException.class)
+    public void testNoJobId() throws InterruptedException, IOException {
+        resultEndpoint.expectedMessageCount(0);
+        direct.sendBody(new byte[0]);
+
+        resultEndpoint.assertIsSatisfied();
+        assertListSize("Number of exceptions", resultEndpoint.getFailures(), 
1);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct:start").to("beanstalk:"+tubeName+"?command=release").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
new file mode 100644
index 0000000..bfb4c45
--- /dev/null
+++ 
b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TouchProducerIntegrationTest extends BeanstalkCamelTestSupport {
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate direct;
+
+    @Ignore("requires reserve - touch sequence")
+    @Test
+    public void testBury() throws InterruptedException, IOException {
+        long jobId = writer.put(0, 0, 5, new byte[0]);
+        assertTrue("Valid Job Id", jobId > 0);
+
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+        resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true);
+        direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId);
+
+        assertMockEndpointsSatisfied();
+
+        final Long messageJobId = 
resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, 
Long.class);
+        assertNotNull("Job ID in message", messageJobId);
+        assertEquals("Message Job ID equals", jobId, messageJobId.longValue());
+
+        final Job job = reader.reserve(0);
+        assertNull("Beanstalk client has no message", job);
+
+        final Job buried = reader.peekBuried();
+        assertNotNull("Job in buried", buried);
+        assertEquals("Buried job id", jobId, buried.getJobId());
+    }
+
+    @Test(expected=CamelExecutionException.class)
+    public void testNoJobId() throws InterruptedException, IOException {
+        resultEndpoint.expectedMessageCount(0);
+        direct.sendBody(new byte[0]);
+
+        resultEndpoint.assertIsSatisfied();
+        assertListSize("Number of exceptions", resultEndpoint.getFailures(), 
1);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct:start").to("beanstalk:"+tubeName+"?command=touch").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index a53d65f..e185ace 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -64,6 +64,7 @@
     <module>camel-aws</module>
     <module>camel-base64</module>
     <module>camel-beanio</module>
+    <module>camel-beanstalk</module>
     <module>camel-bean-validator</module>
     <module>camel-barcode</module>
     <module>camel-bindy</module>

http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index be6230e..e27d215 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -68,6 +68,7 @@
     <backport-util-concurrent-version>3.1</backport-util-concurrent-version>
     <bcel-bundle-version>5.2_4</bcel-bundle-version>
     <beanio-version>2.0.7</beanio-version>
+    <beanstalkd-client-version>1.4.6</beanstalkd-client-version>
     <bsh-version>2.0b5</bsh-version>
     <!-- bouncycastle 1.50 does not work in OSGi - 
http://www.bouncycastle.org/jira/browse/BJA-476 -->
     <bouncycastle-version>1.49</bouncycastle-version>

Reply via email to