This is an automated email from the ASF dual-hosted git repository.
quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new b3468ef CAMEL-12215 - add lenientBind URI option
b3468ef is described below
commit b3468ef4db9d17b137fc26268d83426d20890cf9
Author: Quinn Stevenson <[email protected]>
AuthorDate: Tue Jan 30 16:37:04 2018 -0700
CAMEL-12215 - add lenientBind URI option
---
.../camel-mllp/src/main/docs/mllp-component.adoc | 15 +--
.../camel/component/mllp/MllpConfiguration.java | 23 +++-
.../apache/camel/component/mllp/MllpEndpoint.java | 4 +
.../component/mllp/MllpTcpServerConsumer.java | 103 ++++++----------
.../mllp/internal/TcpServerBindThread.java | 129 +++++++++++++++++++++
.../MllpTcpServerConsumerLenientBindTest.java | 99 ++++++++++++++++
.../mllp/MllpTcpServerConsumerConnectionTest.java | 30 ++---
...rConsumerEndOfDataAndValidationTestSupport.java | 5 +-
.../src/test/resources/log4j2.properties | 2 +-
.../springboot/MllpComponentConfiguration.java | 18 +++
10 files changed, 328 insertions(+), 100 deletions(-)
diff --git a/components/camel-mllp/src/main/docs/mllp-component.adoc
b/components/camel-mllp/src/main/docs/mllp-component.adoc
index 2e1336a..972fa31 100644
--- a/components/camel-mllp/src/main/docs/mllp-component.adoc
+++ b/components/camel-mllp/src/main/docs/mllp-component.adoc
@@ -79,7 +79,7 @@ with the following path and query parameters:
| *port* | *Required* Port number for the TCP connection | | int
|===
-==== Query Parameters (26 parameters):
+==== Query Parameters (27 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
|===
@@ -95,18 +95,19 @@ with the following path and query parameters:
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. | | ExchangePattern
| *synchronous* (advanced) | Sets whether synchronous processing should be
strictly used or Camel is allowed to use asynchronous processing (if
supported). | false | boolean
| *backlog* (tcp) | The maximum queue length for incoming connection
indications (a request to connect) is set to the backlog parameter. If a
connection indication arrives when the queue is full the connection is refused.
| 5 | Integer
+| *lenientBind* (tcp) | TCP Server Only - Allow the endpoint to start before
the TCP ServerSocket is bound. In some environments it may be desirable to
allow the endpoint to start before the TCP ServerSocket is bound. | false |
boolean
| *maxConcurrentConsumers* (tcp) | The maximum number of concurrent MLLP
Consumer connections that will be allowed. If a new connection is received and
the maximum is number are already established the new connection will be reset
immediately. | 5 | int
-| *reuseAddress* (tcp) | Enable/disable the SO_REUSEADDR socket option. | true
| Boolean
+| *receiveBufferSize* (tcp) | Sets the SO_RCVBUF option to the specified value
(in bytes) | 8192 | Integer
+| *reuseAddress* (tcp) | Enable/disable the SO_REUSEADDR socket option. |
false | Boolean
| *acceptTimeout* (timeout) | Timeout (in milliseconds) while waiting for a
TCP connection TCP Server Only | 60000 | int
| *bindRetryInterval* (timeout) | TCP Server Only - The number of milliseconds
to wait between bind attempts | 5000 | int
| *bindTimeout* (timeout) | TCP Server Only - The number of milliseconds to
retry binding to a server port | 30000 | int
-| *keepAlive* (tcp) | Enable/disable the SO_KEEPALIVE socket option. | true |
Boolean
+| *tcpNoDelay* (tcp) | Enable/disable the TCP_NODELAY socket option. | true |
Boolean
| *connectTimeout* (timeout) | Timeout (in milliseconds) for establishing for
a TCP connection TCP Client only | 30000 | int
-| *receiveBufferSize* (tcp) | Sets the SO_RCVBUF option to the specified value
(in bytes) | 8192 | Integer
-| *sendBufferSize* (tcp) | Sets the SO_SNDBUF option to the specified value
(in bytes) | 8192 | Integer
| *idleTimeout* (timeout) | The approximate idle time allowed before the
Client TCP Connection will be reset. A null value or a value less than or equal
to zero will disable the idle timeout. | | Integer
| *maxReceiveTimeouts* (timeout) | *Deprecated* The maximum number of timeouts
(specified by receiveTimeout) allowed before the TCP Connection will be reset.
| | Integer
-| *tcpNoDelay* (tcp) | Enable/disable the TCP_NODELAY socket option. | true |
Boolean
+| *keepAlive* (tcp) | Enable/disable the SO_KEEPALIVE socket option. | true |
Boolean
+| *sendBufferSize* (tcp) | Sets the SO_SNDBUF option to the specified value
(in bytes) | 8192 | Integer
| *readTimeout* (timeout) | The SO_TIMEOUT value (in milliseconds) used after
the start of an MLLP frame has been received | 500 | int
| *receiveTimeout* (timeout) | The SO_TIMEOUT value (in milliseconds) used
when waiting for the start of an MLLP frame | 15000 | int
| *charsetName* (codec) | Set the CamelCharsetName property on the exchange |
| String
@@ -195,4 +196,4 @@ The MLLP Producer adds these headers on the Camel message:
|===================================
All headers are String types. If a header value is missing, its value
-is null.
\ No newline at end of file
+is null.
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java
index 70a756b..4d5e46b 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpConfiguration.java
@@ -40,6 +40,9 @@ public class MllpConfiguration implements Cloneable {
@UriParam(label = "advanced,consumer,tcp,timeout", defaultValue = "5000")
int bindRetryInterval = 5000;
+ @UriParam(label = "advanced,consumer,tcp", defaultValue = "false")
+ boolean lenientBind;
+
@UriParam(label = "advanced,consumer,tcp,timeout", defaultValue = "60000")
int acceptTimeout = 60000;
@@ -68,8 +71,8 @@ public class MllpConfiguration implements Cloneable {
@UriParam(label = "advanced,producer,tcp", defaultValue = "true")
Boolean tcpNoDelay = true;
- @UriParam(label = "advanced,consumer,tcp", defaultValue = "true")
- Boolean reuseAddress = true;
+ @UriParam(label = "advanced,consumer,tcp", defaultValue = "false")
+ Boolean reuseAddress = false;
@UriParam(label = "advanced,tcp", defaultValue = "8192")
Integer receiveBufferSize = 8192;
@@ -206,6 +209,22 @@ public class MllpConfiguration implements Cloneable {
return acceptTimeout;
}
+ public boolean isLenientBind() {
+ return lenientBind;
+ }
+
+ /**
+ * TCP Server Only - Allow the endpoint to start before the TCP
ServerSocket is bound.
+ *
+ * In some environments, it may be desirable to allow the endpoint to
start before the TCP ServerSocket
+ * is bound.
+ *
+ * @param lenientBind if true, the ServerSocket will be bound
asynchronously; otherwise the ServerSocket will be bound synchronously.
+ */
+ public void setLenientBind(boolean lenientBind) {
+ this.lenientBind = lenientBind;
+ }
+
/**
* Timeout (in milliseconds) while waiting for a TCP connection
* <p/>
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index 5c0338a..6bffef5 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -278,6 +278,10 @@ public class MllpEndpoint extends DefaultEndpoint {
configuration.setBindRetryInterval(bindRetryInterval);
}
+ public void setLenientBind(boolean lenientBind) {
+ configuration.setLenientBind(lenientBind);
+ }
+
public void setAcceptTimeout(int acceptTimeout) {
configuration.setAcceptTimeout(acceptTimeout);
}
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 72b00bd..b0cdcfa 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -18,7 +18,6 @@
package org.apache.camel.component.mllp;
import java.net.BindException;
-import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;
@@ -37,8 +36,9 @@ import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.mllp.internal.MllpSocketBuffer;
-import
org.apache.camel.component.mllp.internal.TcpServerConsumerValidationRunnable;
import org.apache.camel.component.mllp.internal.TcpServerAcceptThread;
+import org.apache.camel.component.mllp.internal.TcpServerBindThread;
+import
org.apache.camel.component.mllp.internal.TcpServerConsumerValidationRunnable;
import org.apache.camel.component.mllp.internal.TcpSocketConsumerRunnable;
import org.apache.camel.impl.DefaultConsumer;
@@ -49,7 +49,10 @@ import org.apache.camel.impl.DefaultConsumer;
public class MllpTcpServerConsumer extends DefaultConsumer {
final ExecutorService validationExecutor;
final ExecutorService consumerExecutor;
+
+ TcpServerBindThread bindThread;
TcpServerAcceptThread acceptThread;
+
Map<TcpSocketConsumerRunnable, Long> consumerRunnables = new
ConcurrentHashMap<>();
@@ -110,75 +113,51 @@ public class MllpTcpServerConsumer extends
DefaultConsumer {
consumerClientSocketThread.stop();
}
- acceptThread.interrupt();
+ if (acceptThread != null) {
+ acceptThread.interrupt();
+ acceptThread = null;
+ }
- acceptThread = null;
+ if (bindThread != null) {
+ bindThread.interrupt();
+ bindThread = null;
+ }
super.doStop();
}
@Override
protected void doStart() throws Exception {
- log.debug("doStart() - starting acceptor");
-
- ServerSocket serverSocket = new ServerSocket();
- if (getConfiguration().hasReceiveBufferSize()) {
-
serverSocket.setReceiveBufferSize(getConfiguration().getReceiveBufferSize());
- }
-
- if (getConfiguration().hasReuseAddress()) {
- serverSocket.setReuseAddress(getConfiguration().getReuseAddress());
- }
+ if (bindThread == null || !bindThread.isAlive()) {
+ bindThread = new TcpServerBindThread(this);
- // Accept Timeout
- serverSocket.setSoTimeout(getConfiguration().getAcceptTimeout());
-
- InetSocketAddress socketAddress;
- if (null == getEndpoint().getHostname()) {
- socketAddress = new InetSocketAddress(getEndpoint().getPort());
- } else {
- socketAddress = new InetSocketAddress(getEndpoint().getHostname(),
getEndpoint().getPort());
- }
- long startTicks = System.currentTimeMillis();
-
- // Log usage of deprecated URI options
- if (getConfiguration().hasMaxReceiveTimeouts()) {
- if (getConfiguration().hasIdleTimeout()) {
- log.info("Both maxReceivedTimeouts {} and idleTimeout {} URI
options are specified - idleTimeout will be used",
- getConfiguration().getMaxReceiveTimeouts(),
getConfiguration().getIdleTimeout());
+ if (getConfiguration().isLenientBind()) {
+ log.debug("doStart() - starting bind thread");
+ bindThread.start();
} else {
-
getConfiguration().setIdleTimeout(getConfiguration().getMaxReceiveTimeouts() *
getConfiguration().getReceiveTimeout());
- log.info("Deprecated URI option maxReceivedTimeouts {}
specified - idleTimeout {} will be used",
getConfiguration().getMaxReceiveTimeouts(),
getConfiguration().getIdleTimeout());
- }
- }
+ log.debug("doStart() - attempting to bind to port {}",
getEndpoint().getPort());
+ bindThread.run();
- do {
- try {
- if (getConfiguration().hasBacklog()) {
- serverSocket.bind(socketAddress,
getConfiguration().getBacklog());
- } else {
- serverSocket.bind(socketAddress);
- }
- } catch (BindException bindException) {
- if (System.currentTimeMillis() > startTicks +
getConfiguration().getBindTimeout()) {
- log.error("Failed to bind to address {} within timeout
{}", socketAddress, getConfiguration().getBindTimeout());
- throw bindException;
- } else {
- log.warn("Failed to bind to address {} - retrying in {}
milliseconds", socketAddress, getConfiguration().getBindRetryInterval());
- Thread.sleep(getConfiguration().getBindRetryInterval());
+ if (this.acceptThread == null) {
+ throw new BindException("Failed to bind to port " +
getEndpoint().getPort());
}
}
- } while (!serverSocket.isBound());
-
- // acceptRunnable = new TcpServerConsumerValidationRunnable(this,
serverSocket);
- // validationExecutor.submit(acceptRunnable);
- acceptThread = new TcpServerAcceptThread(this, serverSocket);
- acceptThread.start();
+ }
super.doStart();
}
@Override
+ public void handleException(Throwable t) {
+ super.handleException(t);
+ }
+
+ @Override
+ public void handleException(String message, Throwable t) {
+ super.handleException(message, t);
+ }
+
+ @Override
protected void doShutdown() throws Exception {
super.doShutdown();
consumerExecutor.shutdownNow();
@@ -209,6 +188,11 @@ public class MllpTcpServerConsumer extends DefaultConsumer
{
}
}
+ public void startAcceptThread(ServerSocket serverSocket) {
+ acceptThread = new TcpServerAcceptThread(this, serverSocket);
+ acceptThread.start();
+ }
+
public void startConsumer(Socket clientSocket, MllpSocketBuffer
mllpBuffer) {
TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable(this,
clientSocket, mllpBuffer);
@@ -222,15 +206,4 @@ public class MllpTcpServerConsumer extends DefaultConsumer
{
}
}
-
- @Override
- public void handleException(Throwable t) {
- super.handleException(t);
- }
-
- @Override
- public void handleException(String message, Throwable t) {
- super.handleException(message, t);
- }
}
-
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java
new file mode 100644
index 0000000..893cf9e
--- /dev/null
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mllp.internal;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import org.apache.camel.Route;
+import org.apache.camel.component.mllp.MllpTcpServerConsumer;
+import org.apache.camel.impl.MDCUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Runnable to handle the ServerSocket.accept requests
+ */
+public class TcpServerBindThread extends Thread {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+ private final MllpTcpServerConsumer consumer;
+
+ public TcpServerBindThread(MllpTcpServerConsumer consumer) {
+ this.consumer = consumer;
+
+ // Get the URI without options
+ String fullEndpointKey = consumer.getEndpoint().getEndpointKey();
+ String endpointKey;
+ if (fullEndpointKey.contains("?")) {
+ endpointKey = fullEndpointKey.substring(0,
fullEndpointKey.indexOf('?'));
+ } else {
+ endpointKey = fullEndpointKey;
+ }
+
+ this.setName(String.format("%s - %s", this.getClass().getSimpleName(),
endpointKey));
+ }
+
+
+ /**
+ * Do the initial read on the Socket and try to determine if it has HL7
data, junk, or nothing.
+ */
+ @Override
+ public void run() {
+
+ MDC.put(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID,
consumer.getEndpoint().getCamelContext().getName());
+
+ Route route = consumer.getRoute();
+ if (route != null) {
+ String routeId = route.getId();
+ if (routeId != null) {
+ MDC.put(MDCUnitOfWork.MDC_ROUTE_ID, route.getId());
+ }
+ }
+
+ try {
+ ServerSocket serverSocket = new ServerSocket();
+ if (consumer.getConfiguration().hasReceiveBufferSize()) {
+
serverSocket.setReceiveBufferSize(consumer.getConfiguration().getReceiveBufferSize());
+ }
+
+ if (consumer.getConfiguration().hasReuseAddress()) {
+
serverSocket.setReuseAddress(consumer.getConfiguration().getReuseAddress());
+ }
+
+ // Accept Timeout
+
serverSocket.setSoTimeout(consumer.getConfiguration().getAcceptTimeout());
+
+ InetSocketAddress socketAddress;
+ if (null == consumer.getEndpoint().getHostname()) {
+ socketAddress = new
InetSocketAddress(consumer.getEndpoint().getPort());
+ } else {
+ socketAddress = new
InetSocketAddress(consumer.getEndpoint().getHostname(),
consumer.getEndpoint().getPort());
+ }
+
+ log.debug("Attempting to bind to {}", socketAddress);
+
+ long startTicks = System.currentTimeMillis();
+ do {
+ try {
+ if (consumer.getConfiguration().hasBacklog()) {
+ serverSocket.bind(socketAddress,
consumer.getConfiguration().getBacklog());
+ } else {
+ serverSocket.bind(socketAddress);
+ }
+ consumer.startAcceptThread(serverSocket);
+ } catch (BindException bindException) {
+ if (System.currentTimeMillis() > startTicks +
consumer.getConfiguration().getBindTimeout()) {
+ log.error("Failed to bind to address {} within timeout
{}", socketAddress, consumer.getConfiguration().getBindTimeout(),
bindException);
+ break;
+ } else {
+ log.warn("Failed to bind to address {} - retrying in
{} milliseconds", socketAddress,
consumer.getConfiguration().getBindRetryInterval());
+ try {
+
Thread.sleep(consumer.getConfiguration().getBindRetryInterval());
+ } catch (InterruptedException interruptedEx) {
+ log.info("Bind to address {} interrupted",
socketAddress, interruptedEx);
+ if (!this.isInterrupted()) {
+ super.interrupt();
+ }
+ break;
+ }
+ }
+ } catch (IOException unexpectedEx) {
+ log.error("Unexpected exception encountered binding to
address {}", socketAddress, unexpectedEx);
+ break;
+ }
+ } while (!this.isInterrupted() && !serverSocket.isBound());
+
+ } catch (IOException ioEx) {
+ log.error("Unexpected exception encountered initializing
ServerSocket before attempting to bind", ioEx);
+ }
+ }
+
+}
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java
new file mode 100644
index 0000000..d0a681a
--- /dev/null
+++
b/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
+import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException;
+import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceTimeoutException;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class MllpTcpServerConsumerLenientBindTest extends CamelTestSupport {
+ static final int RECEIVE_TIMEOUT = 1000;
+ static final int READ_TIMEOUT = 500;
+
+ @Rule
+ public MllpClientResource mllpClient = new MllpClientResource();
+
+ @EndpointInject(uri = "mock://result")
+ MockEndpoint result;
+
+ ServerSocket portBlocker;
+
+ @Override
+ protected void doPreSetup() throws Exception {
+ mllpClient.setMllpHost("localhost");
+ mllpClient.setMllpPort(AvailablePortFinder.getNextAvailable());
+
+ portBlocker = new ServerSocket(mllpClient.getMllpPort());
+
+ assertTrue(portBlocker.isBound());
+
+ super.doPreSetup();
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ RouteBuilder builder = new RouteBuilder() {
+ String routeId = "mllp-receiver-with-lenient-bind";
+
+ public void configure() {
+
fromF("mllp://%s:%d?bindTimeout=15000&bindRetryInterval=500&receiveTimeout=%d&readTimeout=%d&reuseAddress=false&lenientBind=true",
+ mllpClient.getMllpHost(), mllpClient.getMllpPort(),
RECEIVE_TIMEOUT, READ_TIMEOUT)
+ .routeId(routeId)
+ .log(LoggingLevel.INFO, routeId, "Receiving: ${body}")
+ .to(result);
+ }
+ };
+
+ return builder;
+ }
+
+ @Test
+ public void testLenientBind() throws Exception {
+ assertEquals(ServiceStatus.Started, context.getStatus());
+
+ mllpClient.connect();
+ try {
+
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10001));
+ } catch (MllpJUnitResourceTimeoutException expectedEx) {
+ assertIsInstanceOf(SocketTimeoutException.class,
expectedEx.getCause());
+ }
+ mllpClient.reset();
+
+ portBlocker.close();
+ Thread.sleep(2000);
+ assertEquals(ServiceStatus.Started, context.getStatus());
+
+ mllpClient.connect();
+ String acknowledgement =
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10002));
+ assertStringContains(acknowledgement, "10002");
+ }
+
+
+
+}
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
index c366465..0a07c1b 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
@@ -17,8 +17,6 @@
package org.apache.camel.component.mllp;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.EndpointInject;
@@ -37,7 +35,6 @@ import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.anyOf;
-import static org.hamcrest.CoreMatchers.instanceOf;
public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport {
static final int RECEIVE_TIMEOUT = 1000;
@@ -63,20 +60,6 @@ public class MllpTcpServerConsumerConnectionTest extends
CamelTestSupport {
super.doPreSetup();
}
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- String routeId = "mllp-receiver";
-
- public void configure() {
-
fromF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&autoAck=false",
mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT,
READ_TIMEOUT)
- .log(LoggingLevel.INFO, routeId, "Receiving: ${body}")
- .to(result);
- }
- };
-
- }
-
/**
* Simulate a Load Balancer Probe
* <p/>
@@ -97,7 +80,7 @@ public class MllpTcpServerConsumerConnectionTest extends
CamelTestSupport {
result.setExpectedCount(0);
result.setAssertPeriod(1000);
- addTestRoute(-1);
+ addTestRouteWithIdleTimeout(-1);
for (int i = 1; i <= connectionCount; ++i) {
mllpClient.connect();
@@ -121,7 +104,7 @@ public class MllpTcpServerConsumerConnectionTest extends
CamelTestSupport {
result.setExpectedCount(0);
result.setAssertPeriod(1000);
- addTestRoute(-1);
+ addTestRouteWithIdleTimeout(-1);
for (int i = 1; i <= connectionCount; ++i) {
mllpClient.connect();
@@ -150,7 +133,7 @@ public class MllpTcpServerConsumerConnectionTest extends
CamelTestSupport {
result.setExpectedCount(1);
result.setAssertPeriod(1000);
- addTestRoute(idleTimeout);
+ addTestRouteWithIdleTimeout(idleTimeout);
mllpClient.connect();
mllpClient.sendMessageAndWaitForAcknowledgement(testMessage);
@@ -167,12 +150,13 @@ public class MllpTcpServerConsumerConnectionTest extends
CamelTestSupport {
assertMockEndpointsSatisfied(15, TimeUnit.SECONDS);
}
- void addTestRoute(final int idleTimeout) throws Exception {
+ void addTestRouteWithIdleTimeout(final int idleTimeout) throws Exception {
RouteBuilder builder = new RouteBuilder() {
- String routeId = "mllp-receiver";
+ String routeId = "mllp-receiver-with-timeout";
public void configure() {
- fromF("mllp://%s:%d?receiveTimeout=%d&idleTimeout=%d",
mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT,
idleTimeout)
+
fromF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&idleTimeout=%d",
mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT,
READ_TIMEOUT, idleTimeout)
+ .routeId(routeId)
.log(LoggingLevel.INFO, routeId, "Receiving: ${body}")
.to(result);
}
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
index fe6e6f7..ed2a56e 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
@@ -17,8 +17,6 @@
package org.apache.camel.component.mllp;
-import static org.hamcrest.CoreMatchers.instanceOf;
-
import java.net.SocketException;
import java.util.concurrent.TimeUnit;
@@ -35,9 +33,12 @@ import
org.apache.camel.test.junit.rule.mllp.MllpClientResource;
import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
+
import org.junit.Rule;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.instanceOf;
+
public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport
extends CamelTestSupport {
static final int CONNECT_TIMEOUT = 500;
static final int RECEIVE_TIMEOUT = 1000;
diff --git a/components/camel-mllp/src/test/resources/log4j2.properties
b/components/camel-mllp/src/test/resources/log4j2.properties
index 4179677..7e3ad87 100644
--- a/components/camel-mllp/src/test/resources/log4j2.properties
+++ b/components/camel-mllp/src/test/resources/log4j2.properties
@@ -31,4 +31,4 @@ rootLogger.appenderRef.file.ref = file
loggers = mllp
logger.mllp.name = org.apache.camel.component.mllp
-logger.mllp.level = DEBUG
+# logger.mllp.level = DEBUG
diff --git
a/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java
b/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java
index aeff923..0ff738a 100644
---
a/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java
+++
b/platforms/spring-boot/components-starter/camel-mllp-starter/src/main/java/org/apache/camel/component/mllp/springboot/MllpComponentConfiguration.java
@@ -122,6 +122,16 @@ public class MllpComponentConfiguration
*/
private Integer acceptTimeout = 60000;
/**
+ * TCP Server Only - Allow the endpoint to start before the TCP
+ * ServerSocket is bound. In some environments, it may be desirable to
+ * allow the endpoint to start before the TCP ServerSocket is bound.
+ *
+ * @param lenientBind
+ * if true, the ServerSocket will be bound asynchronously;
+ * otherwise the ServerSocket will be bound synchronously.
+ */
+ private Boolean lenientBind = false;
+ /**
* Timeout (in milliseconds) for establishing for a TCP connection
* <p/>
* TCP Client only
@@ -315,6 +325,14 @@ public class MllpComponentConfiguration
this.acceptTimeout = acceptTimeout;
}
+ public Boolean getLenientBind() {
+ return lenientBind;
+ }
+
+ public void setLenientBind(Boolean lenientBind) {
+ this.lenientBind = lenientBind;
+ }
+
public Integer getConnectTimeout() {
return connectTimeout;
}
--
To stop receiving notification emails like this one, please contact
[email protected].