Donating camel-beanstalk component to Apache Camel "camel-beanstalk" component was developed and maintained by me (Alexander Azarov) for Osinka https://github.com/osinka . Per @davsclaus consideration https://github.com/osinka/camel-beanstalk/issues/8 , we are donating the code to Apache Camel.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a4ff6b62 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a4ff6b62 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a4ff6b62 Branch: refs/heads/master Commit: a4ff6b626eceea4271bdf60a5d0983bc0b282b04 Parents: 42fbf67 Author: Alexander Azarov <[email protected]> Authored: Sat Sep 20 12:21:23 2014 +0300 Committer: Alexander Azarov <[email protected]> Committed: Sat Sep 20 12:21:25 2014 +0300 ---------------------------------------------------------------------- components/camel-beanstalk/pom.xml | 88 ++++ components/camel-beanstalk/src/etc/header.txt | 13 + .../component/beanstalk/BeanstalkComponent.java | 88 ++++ .../component/beanstalk/BeanstalkConsumer.java | 257 ++++++++++++ .../component/beanstalk/BeanstalkEndpoint.java | 125 ++++++ .../beanstalk/BeanstalkExchangeHelper.java | 47 +++ .../component/beanstalk/BeanstalkProducer.java | 130 ++++++ .../component/beanstalk/ConnectionSettings.java | 132 ++++++ .../beanstalk/ConnectionSettingsFactory.java | 43 ++ .../camel/component/beanstalk/Headers.java | 46 ++ .../beanstalk/processors/BuryCommand.java | 49 +++ .../component/beanstalk/processors/Command.java | 24 ++ .../beanstalk/processors/DefaultCommand.java | 45 ++ .../beanstalk/processors/DeleteCommand.java | 46 ++ .../beanstalk/processors/KickCommand.java | 45 ++ .../beanstalk/processors/PutCommand.java | 50 +++ .../beanstalk/processors/ReleaseCommand.java | 52 +++ .../beanstalk/processors/TouchCommand.java | 46 ++ .../src/main/resources/META-INF/LICENSE.txt | 203 +++++++++ .../src/main/resources/META-INF/NOTICE.txt | 11 + .../org/apache/camel/component/beanstalk | 18 + .../beanstalk/AwaitingConsumerTest.java | 88 ++++ .../beanstalk/BeanstalkMockTestSupport.java | 45 ++ .../beanstalk/ConnectionSettingsTest.java | 53 +++ .../beanstalk/ConsumerCompletionTest.java | 118 ++++++ .../camel/component/beanstalk/EndpointTest.java | 90 ++++ .../camel/component/beanstalk/Helper.java | 81 ++++ .../beanstalk/ImmediateConsumerTest.java | 93 ++++ .../camel/component/beanstalk/ProducerTest.java | 419 +++++++++++++++++++ .../integration/BeanstalkCamelTestSupport.java | 41 ++ .../BuryProducerIntegrationTest.java | 82 ++++ .../integration/ConsumerIntegrationTest.java | 66 +++ .../DeleteProducerIntegrationTest.java | 76 ++++ .../integration/PutProducerIntegrationTest.java | 112 +++++ .../ReleaseProducerIntegrationTest.java | 82 ++++ .../TouchProducerIntegrationTest.java | 82 ++++ components/pom.xml | 1 + parent/pom.xml | 1 + 38 files changed, 3088 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/pom.xml b/components/camel-beanstalk/pom.xml new file mode 100644 index 0000000..1716fc0 --- /dev/null +++ b/components/camel-beanstalk/pom.xml @@ -0,0 +1,88 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.15-SNAPSHOT</version> + </parent> + + <artifactId>camel-beanstalk</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Beanstalk</name> + + <description>Camel Beanstalk component</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.beanstalk.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=beanstalk</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + + <!-- Beanstalkd Java library --> + <dependency> + <groupId>com.surftools</groupId> + <artifactId>BeanstalkClient</artifactId> + <version>${beanstalkd-client-version}</version> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>integration</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.14</version> + <configuration> + <excludes /> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/etc/header.txt ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/etc/header.txt b/components/camel-beanstalk/src/etc/header.txt new file mode 100644 index 0000000..4091cb9 --- /dev/null +++ b/components/camel-beanstalk/src/etc/header.txt @@ -0,0 +1,13 @@ +Copyright (C) ${year} ${author} <${email}> + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java new file mode 100644 index 0000000..0040697 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import java.util.Map; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; + +/** + * Beanstalk Camel component. + * + * URI is <code>beanstalk://[host[:port]][/tube]?query</code> + * <p> + * Parameters:<ul> + * <li><code>command</code> - one of "put", "release", "bury", "touch", "delete", "kick". + * "put" is the default for Producers.</li> + * <li><code>jobPriority</code></li> + * <li><code>jobDelay</code></li> + * <li><code>jobTimeToRun</code></li> + * <li><code>consumer.onFailure</code></li> + * <li><code>consumer.awaitJob</code></li> + * </ul> + * + * @author <a href="mailto:[email protected]">Alexander Azarov</a> + * @see BeanstalkEndpoint + * @see ConnectionSettingsFactory + */ +public class BeanstalkComponent extends DefaultComponent { + public static final String DEFAULT_TUBE = "default"; + + public final static String COMMAND_BURY = "bury"; + public final static String COMMAND_RELEASE = "release"; + public final static String COMMAND_PUT = "put"; + public final static String COMMAND_TOUCH = "touch"; + public final static String COMMAND_DELETE = "delete"; + public final static String COMMAND_KICK = "kick"; + + public final static long DEFAULT_PRIORITY = 1000; // 0 is highest + public final static int DEFAULT_DELAY = 0; + public final static int DEFAULT_TIME_TO_RUN = 60; // if 0 the daemon sets 1. + + static ConnectionSettingsFactory connFactory = ConnectionSettingsFactory.DEFAULT; + + public BeanstalkComponent() { + } + + public BeanstalkComponent(final CamelContext context) { + super(context); + } + + @Override + public boolean useRawUri() { + return true; + } + + @Override + protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String,Object> parameters) throws Exception { + return new BeanstalkEndpoint(uri, this, connFactory.parseUri(remaining)); + } + + /** + * Custom ConnectionSettingsFactory. + * <p> + * Specify which {@link ConnectionSettingsFactory} to use to make connections to Beanstalkd. Especially + * useful for unit testing without beanstalkd daemon (you can mock {@link ConnectionSettings}) + * + * @param connFactory + * @see ConnectionSettingsFactory + */ + public static void setConnectionSettingsFactory(ConnectionSettingsFactory connFactory) { + BeanstalkComponent.connFactory = connFactory; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java new file mode 100644 index 0000000..9c16f7d --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import org.apache.camel.component.beanstalk.processors.*; +import com.surftools.BeanstalkClient.BeanstalkException; +import com.surftools.BeanstalkClient.Client; +import com.surftools.BeanstalkClient.Job; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.impl.ScheduledPollConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PollingConsumer to read Beanstalk jobs. + * + * The consumer may delete the job immediately or based on successful {@link Exchange} + * completion. The behavior is configurable by <code>consumer.awaitJob</code> + * flag (by default <code>true</code>) + * + * This consumer will add a {@link Synchronization} object to every {@link Exchange} + * object it creates in order to react on successful exchange completion or failure. + * + * In the case of successful completion, Beanstalk's <code>delete</code> method is + * called upon the job. In the case of failure the default reaction is to call + * <code>bury</code>. + * + * The reaction on failures is configurable: possible variants are "bury", "release" or "delete" + * + * @author <a href="mailto:[email protected]">Alexander Azarov</a> + */ +public class BeanstalkConsumer extends ScheduledPollConsumer { + private final transient Logger log = LoggerFactory.getLogger(getClass()); + + String onFailure = BeanstalkComponent.COMMAND_BURY; + boolean useBlockIO = true; + boolean deleteImmediately = false; + + private Client client = null; + private ExecutorService executor = null; + private Synchronization sync = null; + + private static String[] statsKeysStr = new String[] {"tube", "state"}; + private static String[] statsKeysInt = new String[] {"age", "time-left", "timeouts", "releases", "buries", "kicks"}; + + private final Runnable initTask = new Runnable() { + @Override + public void run() { + client = getEndpoint().getConnection().newReadingClient(useBlockIO); + } + }; + private final Callable<Exchange> pollTask = new Callable<Exchange>() { + final Integer NO_WAIT = Integer.valueOf(0); + + @Override + public Exchange call() throws Exception { + if (client == null) + throw new RuntimeCamelException("Beanstalk client not initialized"); + + try { + final Job job = client.reserve(NO_WAIT); + if (job == null) + return null; + + if (log.isDebugEnabled()) + log.debug(String.format("Received job ID %d (data length %d)", job.getJobId(), job.getData().length)); + + final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly); + exchange.setProperty(Headers.JOB_ID, job.getJobId()); + exchange.getIn().setBody(job.getData(), byte[].class); + + Map<String,String> jobStats = client.statsJob(job.getJobId()); + if (jobStats != null) { + for (String key : statsKeysStr) { + if (jobStats.containsKey(key)) + exchange.setProperty(Headers.PREFIX+key, jobStats.get(key).trim()); + } + + if (jobStats.containsKey("pri")) + exchange.setProperty(Headers.PRIORITY, Long.parseLong(jobStats.get("pri").trim())); + + for (String key : statsKeysInt) { + if (jobStats.containsKey(key)) + exchange.setProperty(Headers.PREFIX+key, Integer.parseInt(jobStats.get(key).trim())); + } + } + + if (deleteImmediately) + client.delete(job.getJobId()); + else + exchange.addOnCompletion(sync); + + return exchange; + } catch (BeanstalkException e) { + log.error("Beanstalk client error", e); + resetClient(); + return null; + } + } + + }; + + public BeanstalkConsumer(final BeanstalkEndpoint endpoint, final Processor processor) { + super(endpoint, processor); + } + + @Override + protected int poll() throws Exception { + int messagesPolled = 0; + while (isPollAllowed()) { + final Exchange exchange = executor.submit(pollTask).get(); + if (exchange == null) + break; + + ++messagesPolled; + getProcessor().process(exchange); + } + return messagesPolled; + } + + public String getOnFailure() { + return onFailure; + } + + public void setOnFailure(String onFailure) { + this.onFailure = onFailure; + } + + public boolean getUseBlockIO() { + return useBlockIO; + } + + public void setUseBlockIO(boolean useBlockIO) { + this.useBlockIO = useBlockIO; + } + + public boolean getAwaitJob() { + return !deleteImmediately; + } + + public void setAwaitJob(boolean awaitingCompletion) { + this.deleteImmediately = !awaitingCompletion; + } + + @Override + public BeanstalkEndpoint getEndpoint() { + return (BeanstalkEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + executor = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Beanstalk-Consumer"); + executor.execute(initTask); + sync = new Sync(); + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + if (executor != null) + executor.shutdown(); + } + + protected void resetClient() { + if (client != null) + client.close(); + initTask.run(); + } + + class Sync implements Synchronization { + protected final Command successCommand; + protected final Command failureCommand; + + public Sync() { + successCommand = new DeleteCommand(getEndpoint()); + + if (BeanstalkComponent.COMMAND_BURY.equals(onFailure)) + failureCommand = new BuryCommand(getEndpoint()); + else if (BeanstalkComponent.COMMAND_RELEASE.equals(onFailure)) + failureCommand = new ReleaseCommand(getEndpoint()); + else if (BeanstalkComponent.COMMAND_DELETE.equals(onFailure)) + failureCommand = new DeleteCommand(getEndpoint()); + else + throw new IllegalArgumentException(String.format("Unknown failure command: %s", onFailure)); + } + + @Override + public void onComplete(final Exchange exchange) { + try { + executor.submit(new RunCommand(successCommand, exchange)).get(); + } catch (Exception e) { + if (log.isErrorEnabled()) + log.error(String.format("Could not run completion of exchange %s", exchange), e); + } + } + + @Override + public void onFailure(final Exchange exchange) { + try { + executor.submit(new RunCommand(failureCommand, exchange)).get(); + } catch (Exception e) { + if (log.isErrorEnabled()) + log.error(String.format("%s could not run failure of exchange %s", failureCommand.getClass().getName(), exchange), e); + } + } + + class RunCommand implements Runnable { + private final Command command; + private final Exchange exchange; + + public RunCommand(final Command command, final Exchange exchange) { + this.command = command; + this.exchange = exchange; + } + + @Override + public void run() { + try { + try { + command.act(client, exchange); + } catch (BeanstalkException e) { + if (log.isWarnEnabled()) + log.warn(String.format("Post-processing %s of exchange %s failed, retrying.", command.getClass().getName(), exchange), e); + resetClient(); + command.act(client, exchange); + } + } catch (final Exception e) { + if (log.isErrorEnabled()) + log.error(String.format("%s could not post-process exchange %s", command.getClass().getName(), exchange), e); + exchange.setException(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java new file mode 100644 index 0000000..62c6809 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Component; +import org.apache.camel.Producer; +import org.apache.camel.component.beanstalk.processors.*; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.impl.ScheduledPollEndpoint; + +/** + * @author <a href="mailto:[email protected]">Alexander Azarov</a> + * @see BeanstalkConsumer + * @see org.apache.camel.component.beanstalk.processors.PutCommand + */ +public class BeanstalkEndpoint extends ScheduledPollEndpoint { + final ConnectionSettings conn; + + String command = BeanstalkComponent.COMMAND_PUT; + long priority = BeanstalkComponent.DEFAULT_PRIORITY; + int delay = BeanstalkComponent.DEFAULT_DELAY; + int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN; + + BeanstalkEndpoint(final String uri, final Component component, final ConnectionSettings conn) { + super(uri, component); + + this.conn = conn; + } + + public ConnectionSettings getConnection() { + return conn; + } + + /** + * The command {@link Producer} must execute + * + * @param command + */ + public void setCommand(final String command) { + this.command = command; + } + + public void setJobPriority(final long priority) { + this.priority = priority; + } + + public long getJobPriority() { + return priority; + } + + public void setJobDelay(final int delay) { + this.delay = delay; + } + + public int getJobDelay() { + return delay; + } + + public void setJobTimeToRun(final int timeToRun) { + this.timeToRun = timeToRun; + } + + public int getJobTimeToRun() { + return timeToRun; + } + + /** + * Creates Camel producer. + * <p> + * Depending on the command parameter (see {@link BeanstalkComponent} URI) it + * will create one of the producer implementations. + * + * @return {@link Producer} instance + * @throws IllegalArgumentException when {@link ConnectionSettings} cannot + * create a writable {@link Client} + */ + @Override + public Producer createProducer() throws Exception { + Command cmd = null; + if (BeanstalkComponent.COMMAND_PUT.equals(command)) + cmd = new PutCommand(this); + else if (BeanstalkComponent.COMMAND_RELEASE.equals(command)) + cmd = new ReleaseCommand(this); + else if (BeanstalkComponent.COMMAND_BURY.equals(command)) + cmd = new BuryCommand(this); + else if (BeanstalkComponent.COMMAND_TOUCH.equals(command)) + cmd = new TouchCommand(this); + else if (BeanstalkComponent.COMMAND_DELETE.equals(command)) + cmd = new DeleteCommand(this); + else if (BeanstalkComponent.COMMAND_KICK.equals(command)) + cmd = new KickCommand(this); + else + throw new IllegalArgumentException(String.format("Unknown command for Beanstalk endpoint: %s", command)); + + return new BeanstalkProducer(this, cmd); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + BeanstalkConsumer consumer = new BeanstalkConsumer(this, processor); + configureConsumer(consumer); + return consumer; + } + + @Override + public boolean isSingleton() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java new file mode 100644 index 0000000..19a298f --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.NoSuchHeaderException; +import org.apache.camel.util.ExchangeHelper; + +/** + * + * @author <a href="mailto:[email protected]">Alexander Azarov</a> + */ +public final class BeanstalkExchangeHelper { + public static long getPriority(final BeanstalkEndpoint endpoint, final Message in) { + return in.getHeader(Headers.PRIORITY, Long.valueOf(endpoint.getJobPriority()), Long.class).longValue(); + } + + public static int getDelay(final BeanstalkEndpoint endpoint, final Message in) { + return in.getHeader(Headers.DELAY, Integer.valueOf(endpoint.getJobDelay()), Integer.class).intValue(); + } + + public static int getTimeToRun(final BeanstalkEndpoint endpoint, final Message in) { + return in.getHeader(Headers.TIME_TO_RUN, Integer.valueOf(endpoint.getJobTimeToRun()), Integer.class).intValue(); + } + + public static long getJobID(final Exchange exchange) throws NoSuchHeaderException { + Long jobId = exchange.getProperty(Headers.JOB_ID, Long.class); + if (jobId != null) + return jobId; + return ExchangeHelper.getMandatoryHeader(exchange, Headers.JOB_ID, Long.class); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java new file mode 100644 index 0000000..83cfa98 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.camel.component.beanstalk.processors.Command; +import com.surftools.BeanstalkClient.BeanstalkException; +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Exchange; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.AsyncCallback; +import org.apache.camel.impl.DefaultProducer; + +/** + * + * @author <a href="mailto:[email protected]">Alexander Azarov</a> + */ +public class BeanstalkProducer extends DefaultProducer implements AsyncProcessor { + private ExecutorService executor = null; + + Client client = null; + final Command command; + + public BeanstalkProducer(BeanstalkEndpoint endpoint, final Command command) throws Exception { + super(endpoint); + this.command = command; + } + + @Override + public void process(final Exchange exchange) throws Exception { + Future f = executor.submit(new RunCommand(exchange)); + f.get(); + } + + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { + try { + executor.submit(new RunCommand(exchange, callback)); + } catch (Throwable t) { + exchange.setException(t); + callback.done(true); + return true; + } + return false; + } + + protected void resetClient() { + closeClient(); + initClient(); + } + + protected void closeClient() { + if (client != null) + client.close(); + } + + protected void initClient() { + this.client = getEndpoint().getConnection().newWritingClient(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + executor = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Beanstalk-Producer"); + executor.execute(new Runnable() { + public void run() { + initClient(); + } + }); + } + + @Override + protected void doStop() throws Exception { + executor.shutdown(); + closeClient(); + super.doStop(); + } + + @Override + public BeanstalkEndpoint getEndpoint() { + return (BeanstalkEndpoint) super.getEndpoint(); + } + + class RunCommand implements Runnable { + private final Exchange exchange; + private final AsyncCallback callback; + + public RunCommand(final Exchange exchange) { + this(exchange, null); + } + + public RunCommand(final Exchange exchange, final AsyncCallback callback) { + this.exchange = exchange; + this.callback = callback; + } + + @Override + public void run() { + try { + try { + command.act(client, exchange); + } catch (BeanstalkException e) { + /* Retry one time */ + resetClient(); + command.act(client, exchange); + } + } catch (Throwable t) { + exchange.setException(t); + } finally { + if (callback != null) + callback.done(false); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java new file mode 100644 index 0000000..35359b1 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Scanner; +import java.net.URLDecoder; +import com.surftools.BeanstalkClient.Client; +import com.surftools.BeanstalkClientImpl.ClientImpl; +import java.io.UnsupportedEncodingException; + +/** + * Represents the connection to Beanstalk. + * <p> + * Along with the list of tubes it may watch. + * + * @author <a href="mailto:[email protected]">Alexander Azarov</a> + */ +public class ConnectionSettings { + final String host; + final int port; + final String[] tubes; + + public ConnectionSettings(final String tube) { + this(Client.DEFAULT_HOST, Client.DEFAULT_PORT, tube); + } + + public ConnectionSettings(final String host, final String tube) { + this(host, Client.DEFAULT_PORT, tube); + } + + public ConnectionSettings(final String host, final int port, final String tube) { + this.host = host; + this.port = port; + + final Scanner scanner = new Scanner(tube); + scanner.useDelimiter("\\+"); + final ArrayList<String> buffer = new ArrayList<String>(); + while (scanner.hasNext()) { + final String tubeRaw = scanner.next(); + try { + buffer.add( URLDecoder.decode(tubeRaw, "UTF-8") ); + } catch (UnsupportedEncodingException e) { + buffer.add(tubeRaw); + } + } + this.tubes = buffer.toArray(new String[0]); + scanner.close(); + } + + /** + * Returns the {@link Client} instance ready for writing + * operations, e.g. "put". + * <p> + * <code>use(tube)</code> is applied during this call. + * + * @return {@link Client} instance + * @throws IllegalArgumentException the exception is raised when this ConnectionSettings + * has more than one tube. + */ + public Client newWritingClient() throws IllegalArgumentException { + if (tubes.length > 1) { + throw new IllegalArgumentException("There must be only one tube specified for Beanstalk producer"); + } + + final String tube = tubes.length > 0 ? tubes[0] : BeanstalkComponent.DEFAULT_TUBE; + + final ClientImpl client = new ClientImpl(host, port); + + /* FIXME: There is a problem in JavaBeanstalkClient 1.4.4 (at least in 1.4.4), + when using uniqueConnectionPerThread=false. The symptom is that ProtocolHandler + breaks the protocol, reading incomplete messages. To be investigated. */ + //client.setUniqueConnectionPerThread(false); + client.useTube(tube); + return client; + } + + /** + * Returns the {@link Client} instance for reading operations with all + * the tubes aleady watched + * <p> + * <code>watch(tube)</code> is applied for every tube during this call. + * + * @param useBlockIO configuration param to {@link Client} + * @return {@link Client} instance + */ + public Client newReadingClient(boolean useBlockIO) { + final ClientImpl client = new ClientImpl(host, port, useBlockIO); + + /* FIXME: There is a problem in JavaBeanstalkClient 1.4.4 (at least in 1.4.4), + when using uniqueConnectionPerThread=false. The symptom is that ProtocolHandler + breaks the protocol, reading incomplete messages. To be investigated. */ + //client.setUniqueConnectionPerThread(false); + for (String tube : tubes) + client.watch(tube); + return client; + } + + @Override + public boolean equals(final Object obj) { + if (obj instanceof ConnectionSettings) { + final ConnectionSettings other = (ConnectionSettings) obj; + return other.host.equals(host) && other.port == port && Arrays.equals(other.tubes, tubes); + } + return false; + } + + @Override + public int hashCode() { + return 41*(41*(41+host.hashCode())+port)+Arrays.hashCode(tubes); + } + + @Override + public String toString() { + return "beanstalk://"+host+":"+port+"/"+Arrays.toString(tubes); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java new file mode 100644 index 0000000..7949cf2 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import com.surftools.BeanstalkClient.Client; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * + * @author <a href="mailto:[email protected]">Alexander Azarov</a> + */ +public class ConnectionSettingsFactory { + public static final ConnectionSettingsFactory DEFAULT = new ConnectionSettingsFactory(); + + final Pattern HostPortTubeRE = Pattern.compile("^(([\\w.-]+)(:([\\d]+))?/)?([\\w%+]*)$"); + + public ConnectionSettings parseUri(final String remaining) throws IllegalArgumentException { + final Matcher m = HostPortTubeRE.matcher(remaining); + if (!m.matches()) + throw new IllegalArgumentException(String.format("Invalid path format: %s - should be [<hostName>[:<port>]/][<tubes>]", remaining)); + + final String host = m.group(2) != null ? m.group(2) : Client.DEFAULT_HOST; + final int port = m.group(4) != null ? Integer.parseInt(m.group(4)) : Client.DEFAULT_PORT; + final String tubes = m.group(5) != null ? m.group(5) : ""; + return new ConnectionSettings(host, port, tubes); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java new file mode 100644 index 0000000..b944872 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +/** + * + * @author <a href="mailto:[email protected]">Alexander Azarov</a> + */ +public final class Headers { + public static final String PREFIX = "beanstalk."; + + // in + public static final String PRIORITY = PREFIX+"priority"; + public static final String DELAY = PREFIX+"delay"; + public static final String TIME_TO_RUN = PREFIX+"timeToRun"; + + // in/out + public static final String JOB_ID = PREFIX+"jobId"; + + // out + public static final String RESULT = PREFIX+"result"; + + // other info + public static final String TUBE = PREFIX+"tube"; + public static final String STATE = PREFIX+"state"; + public static final String AGE = PREFIX+"age"; + public static final String TIME_LEFT = PREFIX+"time-left"; + public static final String TIMEOUTS = PREFIX+"timeouts"; + public static final String RELEASES = PREFIX+"releases"; + public static final String BURIES = PREFIX+"buries"; + public static final String KICKS = PREFIX+"kicks"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java new file mode 100644 index 0000000..2df0e60 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.processors; + +import org.apache.camel.component.beanstalk.BeanstalkEndpoint; +import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper; +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Exchange; +import org.apache.camel.NoSuchHeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuryCommand extends DefaultCommand { + private final transient Logger log = LoggerFactory.getLogger(getClass()); + + public BuryCommand(BeanstalkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException { + final Long jobId = BeanstalkExchangeHelper.getJobID(exchange); + final long priority = BeanstalkExchangeHelper.getPriority(endpoint, exchange.getIn()); + final boolean result = client.bury(jobId.longValue(), priority); + + if (!result && log.isWarnEnabled()) + log.warn(String.format("Failed to bury job %d (with priority %d)", jobId, priority)); + else if (log.isDebugEnabled()) + log.debug(String.format("Job %d buried with priority %d. Result is %b", jobId, priority, result)); + + answerWith(exchange, Headers.RESULT, result); + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java new file mode 100644 index 0000000..7088279 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.processors; + +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Exchange; + +public interface Command { + public void act(Client client, Exchange exchange) throws Exception; +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java new file mode 100644 index 0000000..e8e9a45 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.processors; + +import org.apache.camel.component.beanstalk.BeanstalkEndpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.util.ExchangeHelper; + +abstract class DefaultCommand implements Command { + protected final BeanstalkEndpoint endpoint; + + public DefaultCommand(BeanstalkEndpoint endpoint) { + this.endpoint = endpoint; + } + + protected Message getAnswerMessage(final Exchange exchange) { + Message answer = exchange.getIn(); + if (ExchangeHelper.isOutCapable(exchange)) { + answer = exchange.getOut(); + // preserve headers + answer.getHeaders().putAll(exchange.getIn().getHeaders()); + } + return answer; + } + + protected void answerWith(final Exchange exchange, final String header, final Object value) { + final Message answer = getAnswerMessage(exchange); + answer.setHeader(header, value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java new file mode 100644 index 0000000..f738556 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.processors; + +import org.apache.camel.component.beanstalk.BeanstalkEndpoint; +import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper; +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Exchange; +import org.apache.camel.NoSuchHeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeleteCommand extends DefaultCommand { + private final transient Logger log = LoggerFactory.getLogger(getClass()); + + public DeleteCommand(BeanstalkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException { + final Long jobId = BeanstalkExchangeHelper.getJobID(exchange); + final boolean result = client.delete(jobId.longValue()); + if (!result && log.isWarnEnabled()) + log.warn(String.format("Failed to delete job %d", jobId)); + else if (log.isDebugEnabled()) + log.debug(String.format("Job %d deleted. Result is %b", jobId, result)); + + answerWith(exchange, Headers.RESULT, result); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java new file mode 100644 index 0000000..7bad253 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.processors; + +import org.apache.camel.component.beanstalk.BeanstalkEndpoint; +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Message; +import org.apache.camel.NoSuchHeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KickCommand extends DefaultCommand { + private final transient Logger log = LoggerFactory.getLogger(getClass()); + + public KickCommand(BeanstalkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException, InvalidPayloadException { + final Integer jobs = exchange.getIn().getMandatoryBody(Integer.class); + final int result = client.kick(jobs); + if (log.isDebugEnabled()) + log.debug(String.format("Kick %d jobs. Kicked %d actually.", jobs, result)); + + final Message answer = getAnswerMessage(exchange); + answer.setBody(result, Integer.class); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java new file mode 100644 index 0000000..c6fa32e --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.processors; + +import org.apache.camel.component.beanstalk.BeanstalkEndpoint; +import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper; +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.NoSuchHeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PutCommand extends DefaultCommand { + private final transient Logger log = LoggerFactory.getLogger(getClass()); + + public PutCommand(BeanstalkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException { + final Message in = exchange.getIn(); + + final long priority = BeanstalkExchangeHelper.getPriority(endpoint, in); + final int delay = BeanstalkExchangeHelper.getDelay(endpoint, in); + final int timeToRun = BeanstalkExchangeHelper.getTimeToRun(endpoint, in); + + final long jobId = client.put(priority, delay, timeToRun, in.getBody(byte[].class)); + if (log.isDebugEnabled()) + log.debug(String.format("Created job %d with priority %d, delay %d seconds and time to run %d", jobId, priority, delay, timeToRun)); + + answerWith(exchange, Headers.JOB_ID, jobId); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java new file mode 100644 index 0000000..f41e080 --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.processors; + +import org.apache.camel.component.beanstalk.BeanstalkEndpoint; +import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper; +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.NoSuchHeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReleaseCommand extends DefaultCommand { + private final transient Logger log = LoggerFactory.getLogger(getClass()); + + public ReleaseCommand(BeanstalkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException { + final Message in = exchange.getIn(); + + final Long jobId = BeanstalkExchangeHelper.getJobID(exchange); + final long priority = BeanstalkExchangeHelper.getPriority(endpoint, in); + final int delay = BeanstalkExchangeHelper.getDelay(endpoint, in); + + final boolean result = client.release(jobId.longValue(), priority, delay); + if (!result && log.isWarnEnabled()) + log.warn(String.format("Failed to release job %d (priority %d, delay %d)", jobId, priority, delay)); + else if (log.isDebugEnabled()) + log.debug(String.format("Job %d released with priority %d, delay %d seconds. Result is %b", jobId, priority, delay, result)); + + answerWith(exchange, Headers.RESULT, result); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java new file mode 100644 index 0000000..c43f32c --- /dev/null +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk.processors; + +import org.apache.camel.component.beanstalk.BeanstalkEndpoint; +import org.apache.camel.component.beanstalk.Headers; +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.Exchange; +import org.apache.camel.NoSuchHeaderException; +import org.apache.camel.util.ExchangeHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TouchCommand extends DefaultCommand { + private final transient Logger log = LoggerFactory.getLogger(getClass()); + + public TouchCommand(BeanstalkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException { + final Long jobId = ExchangeHelper.getMandatoryHeader(exchange, Headers.JOB_ID, Long.class); + final boolean result = client.touch(jobId.longValue()); + if (!result && log.isWarnEnabled()) + log.warn(String.format("Failed to touch job %d", jobId)); + else if (log.isDebugEnabled()) + log.debug(String.format("Job %d touched. Result is %b", jobId, result)); + + answerWith(exchange, Headers.RESULT, result); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/resources/META-INF/LICENSE.txt ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/resources/META-INF/LICENSE.txt b/components/camel-beanstalk/src/main/resources/META-INF/LICENSE.txt new file mode 100644 index 0000000..6b0b127 --- /dev/null +++ b/components/camel-beanstalk/src/main/resources/META-INF/LICENSE.txt @@ -0,0 +1,203 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/resources/META-INF/NOTICE.txt ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/resources/META-INF/NOTICE.txt b/components/camel-beanstalk/src/main/resources/META-INF/NOTICE.txt new file mode 100644 index 0000000..2e215bf --- /dev/null +++ b/components/camel-beanstalk/src/main/resources/META-INF/NOTICE.txt @@ -0,0 +1,11 @@ + ========================================================================= + == NOTICE file corresponding to the section 4 d of == + == the Apache License, Version 2.0, == + == in this case for the Apache Camel distribution. == + ========================================================================= + + This product includes software developed by + The Apache Software Foundation (http://www.apache.org/). + + Please read the different LICENSE files present in the licenses directory of + this distribution. http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/resources/META-INF/services/org/apache/camel/component/beanstalk ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/resources/META-INF/services/org/apache/camel/component/beanstalk b/components/camel-beanstalk/src/main/resources/META-INF/services/org/apache/camel/component/beanstalk new file mode 100644 index 0000000..c290f54 --- /dev/null +++ b/components/camel-beanstalk/src/main/resources/META-INF/services/org/apache/camel/component/beanstalk @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.beanstalk.BeanstalkComponent http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java new file mode 100644 index 0000000..164dff2 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import com.surftools.BeanstalkClient.Job; +import com.surftools.BeanstalkClient.BeanstalkException; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import static org.mockito.Mockito.*; + +public class AwaitingConsumerTest extends BeanstalkMockTestSupport { + final String testMessage = "hello, world"; + + @EndpointInject(uri = "beanstalk:tube") + protected BeanstalkEndpoint endpoint; + + @Test + public void testReceive() throws Exception { + final Job jobMock = mock(Job.class); + final long jobId = 111; + final byte[] payload = Helper.stringToBytes(testMessage); + + when(jobMock.getJobId()).thenReturn(jobId); + when(jobMock.getData()).thenReturn(payload); + when(client.reserve(anyInt())) + .thenReturn(jobMock) + .thenReturn(null); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + result.expectedBodiesReceived(testMessage); + result.expectedPropertyReceived(Headers.JOB_ID, jobId); + result.message(0).header(Headers.JOB_ID).isEqualTo(jobId); + result.assertIsSatisfied(100); + + verify(client, atLeast(1)).reserve(0); + verify(client, atLeast(1)).delete(jobId); + } + + @Test + public void testBeanstalkException() throws Exception { + final Job jobMock = mock(Job.class); + final long jobId = 111; + final byte[] payload = Helper.stringToBytes(testMessage); + + when(jobMock.getJobId()).thenReturn(jobId); + when(jobMock.getData()).thenReturn(payload); + when(client.reserve(anyInt())) + .thenThrow(new BeanstalkException("test")) + .thenReturn(jobMock); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + result.expectedBodiesReceived(testMessage); + result.expectedPropertyReceived(Headers.JOB_ID, jobId); + result.message(0).header(Headers.JOB_ID).isEqualTo(jobId); + result.assertIsSatisfied(100); + + verify(client, atLeast(1)).reserve(anyInt()); + verify(client, times(1)).close(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("beanstalk:tube").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java new file mode 100644 index 0000000..42b3114 --- /dev/null +++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.beanstalk; + +import com.surftools.BeanstalkClient.Client; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import static org.mockito.Mockito.*; + +public class BeanstalkMockTestSupport extends CamelTestSupport { + @Mock Client client; + + @Before + @Override + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + reset(client); + Helper.mockComponent(client); + super.setUp(); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + Helper.revertComponent(); + } +}
