Repository: camel Updated Branches: refs/heads/master a49627216 -> 4233318d9
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java new file mode 100644 index 0000000..6b1d81d --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import org.junit.Test; +import static org.junit.Assert.*; + +import com.surftools.BeanstalkClient.Client; +import org.junit.Before; + +public class ConnectionSettingsTest { + @Test + public void parseUriTest() { + final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory; + assertEquals("Full URI", new ConnectionSettings("host.domain.tld", 11300, "someTube"), factory.parseUri("host.domain.tld:11300/someTube")); + assertEquals("No port", new ConnectionSettings("host.domain.tld", Client.DEFAULT_PORT, "someTube"), factory.parseUri("host.domain.tld/someTube")); + assertEquals("Only tube", new ConnectionSettings(Client.DEFAULT_HOST, Client.DEFAULT_PORT, "someTube"), factory.parseUri("someTube")); + } + + @Test + public void parseTubesTest() { + final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory; + assertArrayEquals("Full URI", new String[] {"tube1", "tube2"}, factory.parseUri("host:90/tube1+tube2").tubes); + assertArrayEquals("No port", new String[] {"tube1", "tube2"}, factory.parseUri("host/tube1+tube2").tubes); + assertArrayEquals("Only tubes", new String[] {"tube1", "tube2"}, factory.parseUri("tube1+tube2").tubes); + assertArrayEquals("Empty URI", new String[0], factory.parseUri("").tubes); + } + + @Test(expected=IllegalArgumentException.class) + public void notValidHost() { + final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory; + fail(String.format("Calling on not valid URI must raise exception, but got result %s", factory.parseUri("not_valid?host/tube?"))); + } + + @Before + public void setUp() { + BeanstalkComponent.connFactory = new ConnectionSettingsFactory(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java new file mode 100644 index 0000000..14a0955 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import com.surftools.BeanstalkClient.BeanstalkException; +import com.surftools.BeanstalkClient.Job; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import static org.mockito.Mockito.*; + +public class ConsumerCompletionTest extends BeanstalkMockTestSupport { + final String testMessage = "hello, world"; + + boolean shouldIdie = false; + final Processor processor = new Processor() { + @Override + public void process(Exchange exchange) throws InterruptedException { + if (shouldIdie) throw new InterruptedException("die"); + } + }; + + @Test + public void testDeleteOnComplete() throws Exception { + final long jobId = 111; + final byte[] payload = Helper.stringToBytes(testMessage); + final Job jobMock = mock(Job.class); + + when(jobMock.getJobId()).thenReturn(jobId); + when(jobMock.getData()).thenReturn(payload); + when(client.reserve(anyInt())) + .thenReturn(jobMock) + .thenReturn(null); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMinimumMessageCount(1); + result.expectedBodiesReceived(testMessage); + result.expectedPropertyReceived(Headers.JOB_ID, jobId); + result.message(0).header(Headers.JOB_ID).isEqualTo(jobId); + result.assertIsSatisfied(2000); + + verify(client, atLeastOnce()).reserve(anyInt()); + verify(client).delete(jobId); + } + + @Test + public void testReleaseOnFailure() throws Exception { + shouldIdie = true; + final long jobId = 111; + final long priority = BeanstalkComponent.DEFAULT_PRIORITY; + final int delay = BeanstalkComponent.DEFAULT_DELAY; + final byte[] payload = Helper.stringToBytes(testMessage); + final Job jobMock = mock(Job.class); + + when(jobMock.getJobId()).thenReturn(jobId); + when(jobMock.getData()).thenReturn(payload); + when(client.reserve(anyInt())) + .thenReturn(jobMock) + .thenReturn(null); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMinimumMessageCount(1); + result.assertIsNotSatisfied(1000); + + verify(client, atLeastOnce()).reserve(anyInt()); + verify(client).release(jobId, priority, delay); + } + + @Test + public void testBeanstalkException() throws Exception { + shouldIdie = false; + final Job jobMock = mock(Job.class); + final long jobId = 111; + final byte[] payload = Helper.stringToBytes(testMessage); + + when(jobMock.getJobId()).thenReturn(jobId); + when(jobMock.getData()).thenReturn(payload); + when(client.reserve(anyInt())) + .thenThrow(new BeanstalkException("test")) + .thenReturn(jobMock); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + result.expectedBodiesReceived(testMessage); + result.expectedPropertyReceived(Headers.JOB_ID, jobId); + result.message(0).header(Headers.JOB_ID).isEqualTo(jobId); + result.assertIsSatisfied(100); + + verify(client, atLeast(1)).reserve(anyInt()); + verify(client, times(1)).close(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("beanstalk:tube?consumer.onFailure=release").process(processor).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java new file mode 100644 index 0000000..903f272 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import org.apache.camel.CamelContext; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; +import org.junit.Ignore; + +public class EndpointTest { + CamelContext context = null; + + @Before + public void setUp() throws Exception { + context = new DefaultCamelContext(); + context.disableJMX(); + context.start(); + } + + @Test + public void testPriority() { + BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?jobPriority=1000", BeanstalkEndpoint.class); + assertNotNull("Beanstalk endpoint", endpoint); + assertEquals("Priority", 1000, endpoint.getJobPriority()); + } + + @Test + public void testTimeToRun() { + BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?jobTimeToRun=10", BeanstalkEndpoint.class); + assertNotNull("Beanstalk endpoint", endpoint); + assertEquals("Time to run", 10, endpoint.getJobTimeToRun()); + } + + @Test + public void testDelay() { + BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?jobDelay=10", BeanstalkEndpoint.class); + assertNotNull("Beanstalk endpoint", endpoint); + assertEquals("Delay", 10, endpoint.getJobDelay()); + } + + @Test + public void testCommand() { + BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?command=release", BeanstalkEndpoint.class); + assertNotNull("Beanstalk endpoint", endpoint); + assertEquals("Command", BeanstalkComponent.COMMAND_RELEASE, endpoint.command); + } + + @Test + public void testTubes() { + BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:host:11303/tube1+tube%2B+tube%3F?command=kick", BeanstalkEndpoint.class); + assertNotNull("Beanstalk endpoint", endpoint); + assertEquals("Command", BeanstalkComponent.COMMAND_KICK, endpoint.command); + assertEquals("Host", "host", endpoint.conn.host); + assertArrayEquals("Tubes", new String[] {"tube1", "tube+", "tube?"}, endpoint.conn.tubes); + } + + @Test(expected=FailedToCreateProducerException.class) + public void testWrongCommand() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("beanstalk:default?command=noCommand"); + } + }); + } + + @After + public void tearDown() throws Exception { + context.stop(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java new file mode 100644 index 0000000..3ef5cb9 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import com.surftools.BeanstalkClient.Client; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import org.apache.camel.CamelContext; + +public final class Helper { + public static ConnectionSettings mockConn(final Client client) { + return new MockConnectionSettings(client); + } + + public static void mockComponent(final Client client) { + BeanstalkComponent.setConnectionSettingsFactory(new ConnectionSettingsFactory() { + @Override + public ConnectionSettings parseUri(String uri) { + return new MockConnectionSettings(client); + } + }); + } + + public static void revertComponent() { + BeanstalkComponent.setConnectionSettingsFactory(ConnectionSettingsFactory.DEFAULT); + } + + public static BeanstalkEndpoint getEndpoint(String uri, CamelContext context, Client client) throws Exception { + BeanstalkEndpoint endpoint = new BeanstalkEndpoint(uri, context.getComponent("beanstalk"), mockConn(client)); + context.addEndpoint(uri, endpoint); + return endpoint; + } + + public static byte[] stringToBytes(final String s) throws IOException { + final ByteArrayOutputStream byteOS = new ByteArrayOutputStream(); + final DataOutputStream dataStream = new DataOutputStream(byteOS); + + try { + dataStream.writeBytes(s); + dataStream.flush(); + return byteOS.toByteArray(); + } finally { + dataStream.close(); + byteOS.close(); + } + } +} + +class MockConnectionSettings extends ConnectionSettings { + final Client client; + + public MockConnectionSettings(Client client) { + super("tube"); + this.client = client; + } + + @Override + public Client newReadingClient(boolean useBlockIO) { + return client; + } + + @Override + public Client newWritingClient() { + return client; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java new file mode 100644 index 0000000..7a5a296 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import com.surftools.BeanstalkClient.Job; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class ImmediateConsumerTest extends BeanstalkMockTestSupport { + final String testMessage = "hello, world"; + + boolean shouldIdie = false; + final Processor processor = new Processor() { + @Override + public void process(Exchange exchange) throws InterruptedException { + if (shouldIdie) throw new InterruptedException("die"); + } + }; + + @Test + public void testDeleteOnSuccess() throws Exception { + final Job jobMock = mock(Job.class); + final long jobId = 111; + final byte[] payload = Helper.stringToBytes(testMessage); + + when(jobMock.getJobId()).thenReturn(jobId); + when(jobMock.getData()).thenReturn(payload); + when(client.reserve(anyInt())) + .thenReturn(jobMock) + .thenReturn(null); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + result.expectedBodiesReceived(testMessage); + result.expectedPropertyReceived(Headers.JOB_ID, jobId); + result.message(0).header(Headers.JOB_ID).isEqualTo(jobId); + result.assertIsSatisfied(100); + + verify(client, atLeast(1)).reserve(0); + verify(client, atLeast(1)).delete(jobId); + } + + @Test + public void testDeleteOnFailure() throws Exception { + shouldIdie = true; + final long jobId = 111; + final byte[] payload = Helper.stringToBytes(testMessage); + final Job jobMock = mock(Job.class); + + when(jobMock.getJobId()).thenReturn(jobId); + when(jobMock.getData()).thenReturn(payload); + when(client.reserve(anyInt())) + .thenReturn(jobMock) + .thenReturn(null); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMinimumMessageCount(1); + result.assertIsNotSatisfied(1000); + + verify(client, atLeastOnce()).reserve(anyInt()); + verify(client, atLeast(1)).delete(jobId); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("beanstalk:tube?consumer.awaitJob=false").process(processor).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java new file mode 100644 index 0000000..e3949a2 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import com.surftools.BeanstalkClient.BeanstalkException; +import org.apache.camel.component.beanstalk.processors.*; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.Producer; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; +import static org.mockito.Mockito.*; + +public class ProducerTest extends BeanstalkMockTestSupport { + final String testMessage = "hello, world"; + + @EndpointInject(uri = "beanstalk:tube") + protected BeanstalkEndpoint endpoint; + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:start") + protected ProducerTemplate direct; + + @Test + public void testPut() throws Exception { + final long priority = BeanstalkComponent.DEFAULT_PRIORITY; + final int delay = BeanstalkComponent.DEFAULT_DELAY; + final int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN; + final byte[] payload = Helper.stringToBytes(testMessage); + final long jobId = 111; + + when(client.put(priority, delay, timeToRun, payload)).thenReturn(jobId); + + final Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class)); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { // TODO: SetBodyProcessor(?) + public void process(Exchange exchange) { + exchange.getIn().setBody(testMessage); + } + }); + + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class)); + verify(client).put(priority, delay, timeToRun, payload); + } + + @Test + public void testPutOut() throws Exception { + final long priority = BeanstalkComponent.DEFAULT_PRIORITY; + final int delay = BeanstalkComponent.DEFAULT_DELAY; + final int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN; + final byte[] payload = Helper.stringToBytes(testMessage); + final long jobId = 111; + + when(client.put(priority, delay, timeToRun, payload)).thenReturn(jobId); + + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class)); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOut, new Processor() { // TODO: SetBodyProcessor(?) + public void process(Exchange exchange) { + exchange.getIn().setBody(testMessage); + } + }); + + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getOut().getHeader(Headers.JOB_ID, Long.class)); + verify(client).put(priority, delay, timeToRun, payload); + } + + @Test + public void testPutWithHeaders() throws Exception { + final long priority = 111; + final int delay = 5; + final int timeToRun = 65; + final byte[] payload = Helper.stringToBytes(testMessage); + final long jobId = 111; + + when(client.put(priority, delay, timeToRun, payload)).thenReturn(jobId); + + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class)); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { // TODO: SetBodyProcessor(?) + public void process(Exchange exchange) { + exchange.getIn().setHeader(Headers.PRIORITY, priority); + exchange.getIn().setHeader(Headers.DELAY, delay); + exchange.getIn().setHeader(Headers.TIME_TO_RUN, timeToRun); + exchange.getIn().setBody(testMessage); + } + }); + + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class)); + verify(client).put(priority, delay, timeToRun, payload); + } + + @Test + public void testBury() throws Exception { + final long priority = BeanstalkComponent.DEFAULT_PRIORITY; + final long jobId = 111; + + endpoint.setCommand(BeanstalkComponent.COMMAND_BURY); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class)); + + when(client.bury(jobId, priority)).thenReturn(true); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) { + exchange.getIn().setHeader(Headers.JOB_ID, jobId); + } + }); + + assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class)); + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class)); + verify(client).bury(jobId, priority); + } + + @Test + public void testBuryNoJobId() throws Exception { + endpoint.setCommand(BeanstalkComponent.COMMAND_BURY); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class)); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) {} + }); + + assertTrue("Exchange failed", exchange.isFailed()); + + verify(client, never()).bury(anyLong(), anyLong()); + } + + @Test + public void testBuryWithHeaders() throws Exception { + final long priority = 1000; + final long jobId = 111; + + endpoint.setCommand(BeanstalkComponent.COMMAND_BURY); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class)); + + when(client.bury(jobId, priority)).thenReturn(true); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) { + exchange.getIn().setHeader(Headers.PRIORITY, priority); + exchange.getIn().setHeader(Headers.JOB_ID, jobId); + } + }); + + assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class)); + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class)); + verify(client).bury(jobId, priority); + } + + @Test + public void testDelete() throws Exception { + final long jobId = 111; + + endpoint.setCommand(BeanstalkComponent.COMMAND_DELETE); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(DeleteCommand.class)); + + when(client.delete(jobId)).thenReturn(true); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) { + exchange.getIn().setHeader(Headers.JOB_ID, jobId); + } + }); + + assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class)); + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class)); + verify(client).delete(jobId); + } + + @Test + public void testDeleteNoJobId() throws Exception { + endpoint.setCommand(BeanstalkComponent.COMMAND_DELETE); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(DeleteCommand.class)); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) {} + }); + + assertTrue("Exchange failed", exchange.isFailed()); + + verify(client, never()).delete(anyLong()); + } + + @Test + public void testRelease() throws Exception { + final long priority = BeanstalkComponent.DEFAULT_PRIORITY; + final int delay = BeanstalkComponent.DEFAULT_DELAY; + final long jobId = 111; + + endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class)); + + when(client.release(jobId, priority, delay)).thenReturn(true); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) { + exchange.getIn().setHeader(Headers.JOB_ID, jobId); + } + }); + + assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class)); + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class)); + verify(client).release(jobId, priority, delay); + } + + @Test + public void testReleaseNoJobId() throws Exception { + endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class)); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) {} + }); + + assertTrue("Exchange failed", exchange.isFailed()); + + verify(client, never()).release(anyLong(), anyLong(), anyInt()); + } + + @Test + public void testReleaseWithHeaders() throws Exception { + final long priority = 1001; + final int delay = 124; + final long jobId = 111; + + endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class)); + + when(client.release(jobId, priority, delay)).thenReturn(true); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) { + exchange.getIn().setHeader(Headers.JOB_ID, jobId); + exchange.getIn().setHeader(Headers.PRIORITY, priority); + exchange.getIn().setHeader(Headers.DELAY, delay); + } + }); + + assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class)); + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class)); + verify(client).release(jobId, priority, delay); + } + + @Test + public void testTouch() throws Exception { + final long jobId = 111; + + endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(TouchCommand.class)); + + when(client.touch(jobId)).thenReturn(true); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) { + exchange.getIn().setHeader(Headers.JOB_ID, jobId); + } + }); + + assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class)); + assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class)); + verify(client).touch(jobId); + } + + @Test + public void testTouchNoJobId() throws Exception { + endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH); + Producer producer = endpoint.createProducer(); + assertNotNull("Producer", producer); + assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class)); + assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(TouchCommand.class)); + + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) {} + }); + + assertTrue("Exchange failed", exchange.isFailed()); + + verify(client, never()).touch(anyLong()); + } + + @Test + public void testHeaderOverride() throws Exception { + final long priority = 1020; + final int delay = 50; + final int timeToRun = 75; + final byte[] payload = Helper.stringToBytes(testMessage); + final long jobId = 113; + + when(client.put(priority, delay, timeToRun, payload)).thenReturn(jobId); + + resultEndpoint.expectedMessageCount(1); + resultEndpoint.allMessages().body().isEqualTo(testMessage); + resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId)); + + direct.sendBodyAndHeader(testMessage, Headers.TIME_TO_RUN, timeToRun); + resultEndpoint.assertIsSatisfied(); + + final Long jobIdIn = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in 'In' message", jobIdIn); + + verify(client).put(priority, delay, timeToRun, payload); + } + + @Test + public void test1BeanstalkException() throws Exception { + final long priority = 1020; + final int delay = 50; + final int timeToRun = 75; + final byte[] payload = Helper.stringToBytes(testMessage); + final long jobId = 113; + + when(client.put(priority, delay, timeToRun, payload)) + .thenThrow(new BeanstalkException("test")) + .thenReturn(jobId); + + resultEndpoint.expectedMessageCount(1); + resultEndpoint.allMessages().body().isEqualTo(testMessage); + resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId)); + + direct.sendBodyAndHeader(testMessage, Headers.TIME_TO_RUN, timeToRun); + resultEndpoint.assertIsSatisfied(); + + final Long jobIdIn = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in 'In' message", jobIdIn); + + verify(client, times(1)).close(); + verify(client, times(2)).put(priority, delay, timeToRun, payload); + } + + @Test + public void test2BeanstalkException() throws Exception { + final long jobId = 111; + + when(client.touch(jobId)) + .thenThrow(new BeanstalkException("test")); + + endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH); + final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) { + exchange.getIn().setHeader(Headers.JOB_ID, jobId); + } + }); + + assertTrue("Exchange failed", exchange.isFailed()); + + verify(client, times(2)).touch(jobId); + verify(client, times(1)).close(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("beanstalk:tube?jobPriority=1020&jobDelay=50&jobTimeToRun=65").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java new file mode 100644 index 0000000..fb87c9a --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.integration; + +import org.apache.camel.component.beanstalk.ConnectionSettings; +import org.apache.camel.component.beanstalk.ConnectionSettingsFactory; +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Before; + +public abstract class BeanstalkCamelTestSupport extends CamelTestSupport { + final ConnectionSettingsFactory connFactory = ConnectionSettingsFactory.DEFAULT; + final String tubeName = String.format("test%d", System.currentTimeMillis()); + + protected Client reader = null; + protected Client writer = null; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + ConnectionSettings conn = connFactory.parseUri(tubeName); + writer = conn.newWritingClient(); + reader = conn.newReadingClient(false); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java new file mode 100644 index 0000000..033955e --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.integration; + +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Job; +import java.io.IOException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Ignore; +import org.junit.Test; +import static org.junit.Assert.*; + +public class BuryProducerIntegrationTest extends BeanstalkCamelTestSupport { + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:start") + protected ProducerTemplate direct; + + @Ignore("requires reserve - bury sequence") + @Test + public void testBury() throws InterruptedException, IOException { + long jobId = writer.put(0, 0, 5, new byte[0]); + assertTrue("Valid Job Id", jobId > 0); + + resultEndpoint.expectedMessageCount(1); + resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull(); + resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true); + direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId); + + assertMockEndpointsSatisfied(); + + final Long messageJobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in message", messageJobId); + assertEquals("Message Job ID equals", jobId, messageJobId.longValue()); + + final Job job = reader.reserve(0); + assertNull("Beanstalk client has no message", job); + + final Job buried = reader.peekBuried(); + assertNotNull("Job in buried", buried); + assertEquals("Buried job id", jobId, buried.getJobId()); + } + + @Test(expected=CamelExecutionException.class) + public void testNoJobId() throws InterruptedException, IOException { + resultEndpoint.expectedMessageCount(0); + direct.sendBody(new byte[0]); + + resultEndpoint.assertIsSatisfied(); + assertListSize("Number of exceptions", resultEndpoint.getFailures(), 1); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("beanstalk:"+tubeName+"?command=bury").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java new file mode 100644 index 0000000..6e2b4de --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.integration; + +import org.apache.camel.component.beanstalk.Headers; +import org.apache.camel.component.beanstalk.Helper; +import java.io.IOException; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class ConsumerIntegrationTest extends BeanstalkCamelTestSupport { + final String testMessage = "Hello, world!"; + + @EndpointInject(uri = "mock:result") + MockEndpoint result; + + @Test + public void testReceive() throws IOException, InterruptedException { + long PRIO = 0; + int TTR = 10; + final long jobId = writer.put(PRIO, 0, TTR, Helper.stringToBytes(testMessage)); + + result.expectedMessageCount(1); + result.expectedPropertyReceived(Headers.JOB_ID, jobId); + result.message(0).header(Exchange.CREATED_TIMESTAMP).isNotNull(); + result.message(0).header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId)); + result.message(0).header(Headers.PRIORITY).isEqualTo(Long.valueOf(PRIO)); + result.message(0).header(Headers.TUBE).isEqualTo(tubeName); + result.message(0).header(Headers.STATE).isEqualTo("reserved"); + result.message(0).header(Headers.AGE).isGreaterThan(0); + result.message(0).header(Headers.TIME_LEFT).isGreaterThan(0); + result.message(0).header(Headers.TIMEOUTS).isNotNull(); + result.message(0).header(Headers.RELEASES).isNotNull(); + result.message(0).header(Headers.BURIES).isNotNull(); + result.message(0).header(Headers.KICKS).isNotNull(); + result.message(0).body().isEqualTo(testMessage); + result.assertIsSatisfied(500); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("beanstalk:"+tubeName).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java new file mode 100644 index 0000000..9d9dc36 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.integration; + +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Job; +import java.io.IOException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import static org.junit.Assert.*; + +public class DeleteProducerIntegrationTest extends BeanstalkCamelTestSupport { + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:start") + protected ProducerTemplate direct; + + @Test + public void testDelete() throws InterruptedException, IOException { + long jobId = writer.put(0, 0, 5, new byte[0]); + assertTrue("Valid Job Id", jobId > 0); + + resultEndpoint.expectedMessageCount(1); + resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull(); + resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true); + direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId); + + assertMockEndpointsSatisfied(); + + final Long messageJobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in message", messageJobId); + assertEquals("Message Job ID equals", jobId, messageJobId.longValue()); + + final Job job = reader.peek(jobId); + assertNull("Job has been deleted", job); + } + + @Test(expected=CamelExecutionException.class) + public void testNoJobId() throws InterruptedException, IOException { + resultEndpoint.expectedMessageCount(0); + direct.sendBody(new byte[0]); + + resultEndpoint.assertIsSatisfied(); + assertListSize("Number of exceptions", resultEndpoint.getFailures(), 1); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("beanstalk:"+tubeName+"?command=delete").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java new file mode 100644 index 0000000..ff96d3e --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.integration; + +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Job; +import java.io.IOException; +import org.apache.camel.EndpointInject; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Produce; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import static org.junit.Assert.*; + +public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport { + final String testMessage = "Hello, world!"; + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:start") + protected ProducerTemplate direct; + + @Test + public void testPut() throws InterruptedException, IOException { + resultEndpoint.expectedMessageCount(1); + resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull(); + direct.sendBody(testMessage); + + resultEndpoint.assertIsSatisfied(); + + final Long jobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in 'In' message", jobId); + + final Job job = reader.reserve(5); + assertNotNull("Beanstalk client got message", job); + assertEquals("Job body from the server", testMessage, new String(job.getData())); + assertEquals("Job ID from the server", jobId.longValue(), job.getJobId()); + reader.delete(jobId.longValue()); + } + + @Test + public void testOut() throws InterruptedException, IOException { + final Endpoint endpoint = context.getEndpoint("beanstalk:"+tubeName); + final Exchange exchange = template.send(endpoint, ExchangePattern.InOut, new Processor() { + public void process(Exchange exchange) { + exchange.getIn().setBody(testMessage); + } + }); + + final Message out = exchange.getOut(); + assertNotNull("Out message", out); + + final Long jobId = out.getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in 'Out' message", jobId); + + final Job job = reader.reserve(5); + assertNotNull("Beanstalk client got message", job); + assertEquals("Job body from the server", testMessage, new String(job.getData())); + assertEquals("Job ID from the server", jobId.longValue(), job.getJobId()); + reader.delete(jobId.longValue()); + } + + @Test + public void testDelay() throws InterruptedException, IOException { + final byte[] testBytes = new byte[0]; + + resultEndpoint.expectedMessageCount(1); + resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull(); + resultEndpoint.expectedBodiesReceived(testBytes); + direct.sendBodyAndHeader(testBytes, Headers.DELAY, 10); + + resultEndpoint.assertIsSatisfied(); + + final Long jobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in message", jobId); + + final Job job = reader.reserve(0); + assertNull("Beanstalk client has no message", job); + reader.delete(jobId.longValue()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("beanstalk:"+tubeName+"?jobPriority=1000&jobTimeToRun=5").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java new file mode 100644 index 0000000..c77dc32 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.integration; + +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Job; +import java.io.IOException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Ignore; +import org.junit.Test; +import static org.junit.Assert.*; + +public class ReleaseProducerIntegrationTest extends BeanstalkCamelTestSupport { + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:start") + protected ProducerTemplate direct; + + @Ignore("requires reserve - release sequence") + @Test + public void testBury() throws InterruptedException, IOException { + long jobId = writer.put(0, 0, 5, new byte[0]); + assertTrue("Valid Job Id", jobId > 0); + + resultEndpoint.expectedMessageCount(1); + resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull(); + resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true); + direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId); + + assertMockEndpointsSatisfied(); + + final Long messageJobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in message", messageJobId); + assertEquals("Message Job ID equals", jobId, messageJobId.longValue()); + + final Job job = reader.reserve(0); + assertNull("Beanstalk client has no message", job); + + final Job buried = reader.peekBuried(); + assertNotNull("Job in buried", buried); + assertEquals("Buried job id", jobId, buried.getJobId()); + } + + @Test(expected=CamelExecutionException.class) + public void testNoJobId() throws InterruptedException, IOException { + resultEndpoint.expectedMessageCount(0); + direct.sendBody(new byte[0]); + + resultEndpoint.assertIsSatisfied(); + assertListSize("Number of exceptions", resultEndpoint.getFailures(), 1); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("beanstalk:"+tubeName+"?command=release").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java new file mode 100644 index 0000000..bfb4c45 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.integration; + +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Job; +import java.io.IOException; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Ignore; +import org.junit.Test; +import static org.junit.Assert.*; + +public class TouchProducerIntegrationTest extends BeanstalkCamelTestSupport { + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:start") + protected ProducerTemplate direct; + + @Ignore("requires reserve - touch sequence") + @Test + public void testBury() throws InterruptedException, IOException { + long jobId = writer.put(0, 0, 5, new byte[0]); + assertTrue("Valid Job Id", jobId > 0); + + resultEndpoint.expectedMessageCount(1); + resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull(); + resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true); + direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId); + + assertMockEndpointsSatisfied(); + + final Long messageJobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class); + assertNotNull("Job ID in message", messageJobId); + assertEquals("Message Job ID equals", jobId, messageJobId.longValue()); + + final Job job = reader.reserve(0); + assertNull("Beanstalk client has no message", job); + + final Job buried = reader.peekBuried(); + assertNotNull("Job in buried", buried); + assertEquals("Buried job id", jobId, buried.getJobId()); + } + + @Test(expected=CamelExecutionException.class) + public void testNoJobId() throws InterruptedException, IOException { + resultEndpoint.expectedMessageCount(0); + direct.sendBody(new byte[0]); + + resultEndpoint.assertIsSatisfied(); + assertListSize("Number of exceptions", resultEndpoint.getFailures(), 1); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("beanstalk:"+tubeName+"?command=touch").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index a53d65f..e185ace 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -64,6 +64,7 @@ <module>camel-aws</module> <module>camel-base64</module> <module>camel-beanio</module> + <module>camel-beanstalk</module> <module>camel-bean-validator</module> <module>camel-barcode</module> <module>camel-bindy</module> http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index be6230e..e27d215 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -68,6 +68,7 @@ <backport-util-concurrent-version>3.1</backport-util-concurrent-version> <bcel-bundle-version>5.2_4</bcel-bundle-version> <beanio-version>2.0.7</beanio-version> + <beanstalkd-client-version>1.4.6</beanstalkd-client-version> <bsh-version>2.0b5</bsh-version> <!-- bouncycastle 1.50 does not work in OSGi - http://www.bouncycastle.org/jira/browse/BJA-476 --> <bouncycastle-version>1.49</bouncycastle-version>
