Author: davsclaus
Date: Thu Jan 13 16:59:15 2011
New Revision: 1058659
URL: http://svn.apache.org/viewvc?rev=1058659&view=rev
Log:
CAMEL-3540: Fixed camel-jt400 receive using timeout should be in seconds.
Thanks to Joao Loureiro for the patch.
Added:
camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
camel/trunk/components/camel-jt400/ (props changed)
camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java?rev=1058659&r1=1058658&r2=1058659&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumerSupport.java
Thu Jan 13 16:59:15 2011
@@ -19,6 +19,8 @@ package org.apache.camel.impl;
import org.apache.camel.Endpoint;
import org.apache.camel.PollingConsumer;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A useful base class for implementations of {@link PollingConsumer}
@@ -26,6 +28,7 @@ import org.apache.camel.spi.ExceptionHan
* @version $Revision$
*/
public abstract class PollingConsumerSupport extends ServiceSupport implements
PollingConsumer {
+ protected final transient Log log = LogFactory.getLog(getClass());
private final Endpoint endpoint;
private ExceptionHandler exceptionHandler;
Propchange: camel/trunk/components/camel-jt400/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Jan 13 16:59:15 2011
@@ -5,3 +5,5 @@
target
.settings
eclipse-classes
+*.i??
+classes
Modified:
camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java?rev=1058659&r1=1058658&r2=1058659&view=diff
==============================================================================
---
camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
(original)
+++
camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
Thu Jan 13 16:59:15 2011
@@ -17,6 +17,7 @@
package org.apache.camel.component.jt400;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import com.ibm.as400.access.AS400;
import com.ibm.as400.access.AS400SecurityException;
@@ -34,7 +35,7 @@ import org.apache.camel.impl.PollingCons
/**
- * {@link PollingConsumer} that polls a data queue for data
+ * {@link org.apache.camel.PollingConsumer} that polls a data queue for data
*/
public class Jt400DataQueueConsumer extends PollingConsumerSupport {
@@ -51,6 +52,7 @@ public class Jt400DataQueueConsumer exte
@Override
protected void doStart() throws Exception {
if (!endpoint.getSystem().isConnected()) {
+ log.info("Connecting to " + endpoint);
endpoint.getSystem().connectService(AS400.DATAQUEUE);
}
}
@@ -58,21 +60,16 @@ public class Jt400DataQueueConsumer exte
@Override
protected void doStop() throws Exception {
if (endpoint.getSystem().isConnected()) {
+ log.info("Disconnecting from " + endpoint);
endpoint.getSystem().disconnectAllServices();
}
}
- /**
- * {@link Jt400DataQueueConsumer#receive(long)}
- */
public Exchange receive() {
// -1 to indicate a blocking read from data queue
return receive(-1);
}
- /**
- * {@link Jt400DataQueueConsumer#receive(long)}
- */
public Exchange receiveNoWait() {
return receive(0);
}
@@ -93,10 +90,18 @@ public class Jt400DataQueueConsumer exte
try {
DataQueueEntry entry;
if (timeout >= 0) {
- entry = queue.read((int)timeout);
+ int seconds = (int)timeout / 1000;
+ if (log.isTraceEnabled()) {
+ log.trace("Reading from data queue: " + queue.getName() +
" with " + seconds + " seconds timeout");
+ }
+ entry = queue.read(seconds);
} else {
- entry = queue.read();
+ if (log.isTraceEnabled()) {
+ log.trace("Reading from data queue: " + queue.getName() +
" with no timeout");
+ }
+ entry = queue.read(-1);
}
+
Exchange exchange = new
DefaultExchange(endpoint.getCamelContext());
if (entry != null) {
if (endpoint.getFormat() == Format.binary) {
@@ -107,18 +112,19 @@ public class Jt400DataQueueConsumer exte
return exchange;
}
} catch (AS400SecurityException e) {
- throw new RuntimeCamelException("Unable to read from data queue: "
+ e.getMessage(), e);
+ throw new RuntimeCamelException("Unable to read from data queue: "
+ queue.getName(), e);
} catch (ErrorCompletingRequestException e) {
- throw new RuntimeCamelException("Unable to read from data queue: "
+ e.getMessage(), e);
+ throw new RuntimeCamelException("Unable to read from data queue: "
+ queue.getName(), e);
} catch (IOException e) {
- throw new RuntimeCamelException("Unable to read from data queue: "
+ e.getMessage(), e);
+ throw new RuntimeCamelException("Unable to read from data queue: "
+ queue.getName(), e);
} catch (IllegalObjectTypeException e) {
- throw new RuntimeCamelException("Unable to read from data queue: "
+ e.getMessage(), e);
+ throw new RuntimeCamelException("Unable to read from data queue: "
+ queue.getName(), e);
} catch (InterruptedException e) {
- throw new RuntimeCamelException("Unable to read from data queue: "
+ e.getMessage(), e);
+ throw new RuntimeCamelException("Unable to read from data queue: "
+ queue.getName(), e);
} catch (ObjectDoesNotExistException e) {
- throw new RuntimeCamelException("Unable to read from data queue: "
+ e.getMessage(), e);
+ throw new RuntimeCamelException("Unable to read from data queue: "
+ queue.getName(), e);
}
return null;
}
+
}
Modified:
camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java?rev=1058659&r1=1058658&r2=1058659&view=diff
==============================================================================
---
camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java
(original)
+++
camel/trunk/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueProducer.java
Thu Jan 13 16:59:15 2011
@@ -54,6 +54,7 @@ public class Jt400DataQueueProducer exte
@Override
protected void doStart() throws Exception {
if (!endpoint.getSystem().isConnected()) {
+ log.info("Connecting to " + endpoint);
endpoint.getSystem().connectService(AS400.DATAQUEUE);
}
}
@@ -61,6 +62,7 @@ public class Jt400DataQueueProducer exte
@Override
protected void doStop() throws Exception {
if (endpoint.getSystem().isConnected()) {
+ log.info("Disconnecting from " + endpoint);
endpoint.getSystem().disconnectAllServices();
}
}
Added:
camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java?rev=1058659&view=auto
==============================================================================
---
camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java
(added)
+++
camel/trunk/components/camel-jt400/src/test/java/org/apache/camel/component/jt400/Jt400DataQueueConsumerTest.java
Thu Jan 13 16:59:15 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jt400;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test case for {@link Jt400DataQueueConsumer}.
+ * <p>
+ * So that timeout semantics can be tested, an URI to an empty data queue on an
+ * AS400 system should be provided (in a resource named
+ * <code>"jt400test.properties"</code>, in a property with key
+ * <code>"org.apache.camel.component.jt400.emptydtaq.uri"</code>).
+ * </p>
+ *
+ * @version $Revision$
+ */
+@Ignore("Test manual")
+public class Jt400DataQueueConsumerTest extends TestCase {
+
+ /**
+ * The deviation of the actual timeout value that we permit in our timeout
+ * tests.
+ */
+ private static final long TIMEOUT_TOLERANCE = 300L;
+
+ /**
+ * Timeout value in milliseconds used to test <code>receive(long)</code>.
+ */
+ private static final long TIMEOUT_VALUE = 3999L;
+
+ /**
+ * The amount of time in milliseconds to pass so that a call is assumed to
+ * be a blocking call.
+ */
+ private static final long BLOCKING_THRESHOLD = 5000L;
+
+ /**
+ * The consumer instance used in the tests.
+ */
+ private Jt400DataQueueConsumer consumer;
+
+ /**
+ * Flag that indicates whether <code>receive()</code> has returned from
+ * call.
+ */
+ private boolean receiveFlag;
+
+ @Before
+ public void setUp() throws Exception {
+ // Load endpoint URI
+ InputStream is =
getClass().getResourceAsStream("jt400test.properties");
+ Properties props = new Properties();
+ String endpointURI;
+
+ props.load(is);
+ endpointURI =
props.getProperty("org.apache.camel.component.jt400.emptydtaq.uri");
+
+ // Instantiate consumer
+ CamelContext camel = new DefaultCamelContext();
+ Jt400Component component = new Jt400Component();
+
+ component.setCamelContext(camel);
+ consumer = (Jt400DataQueueConsumer)
component.createEndpoint(endpointURI).createPollingConsumer();
+ camel.start();
+ }
+
+ /**
+ * Tests whether <code>receive(long)</code> honours the
<code>timeout</code> parameter.
+ */
+ @Test(timeout = TIMEOUT_VALUE + TIMEOUT_TOLERANCE)
+ public void testReceiveLong() {
+ consumer.receive(TIMEOUT_VALUE);
+ }
+
+ /**
+ * Tests whether receive() blocks indefinitely.
+ */
+ @Test
+ public void testReceive() throws InterruptedException {
+ new Thread(new Runnable() {
+ public void run() {
+ consumer.receive();
+ receiveFlag = true;
+ }
+ }).start();
+
+ final long startTime = System.currentTimeMillis();
+ while (!receiveFlag) {
+ if ((System.currentTimeMillis() - startTime) > BLOCKING_THRESHOLD)
{
+ /* Passed test. */
+ return;
+ }
+ Thread.sleep(50L);
+ }
+ assertTrue("Method receive() has returned from call.", false);
+ }
+
+}