This is an automated email from the ASF dual-hosted git repository.
quinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new c8f24ca CAMEL-12210 - Improve detection of the end of an MLLP Envelope
c8f24ca is described below
commit c8f24cac46f9c4a02a869597a0e7ba03565aa9fd
Author: Quinn Stevenson <[email protected]>
AuthorDate: Mon Jan 29 15:58:00 2018 -0700
CAMEL-12210 - Improve detection of the end of an MLLP Envelope
---
.../component/mllp/MllpTcpServerConsumer.java | 55 ++++--
.../component/mllp/internal/MllpSocketBuffer.java | 35 +++-
...eptRunnable.java => TcpServerAcceptThread.java} | 24 ++-
.../TcpServerConsumerValidationRunnable.java | 196 +++++++++++++++++++++
.../mllp/internal/TcpSocketConsumerRunnable.java | 45 +++--
...TcpClientProducerIdleConnectionTimeoutTest.java | 8 +-
.../mllp/MllpTcpServerConsumerConnectionTest.java | 5 +-
...onsumerOptionalEndOfDataWithValidationTest.java | 13 +-
...umerOptionalEndOfDataWithoutValidationTest.java | 14 +-
...onsumerRequiredEndOfDataWithValidationTest.java | 15 +-
...umerRequiredEndOfDataWithoutValidationTest.java | 12 +-
...tProducerEndOfDataAndValidationTestSupport.java | 6 +-
...rConsumerEndOfDataAndValidationTestSupport.java | 63 ++++---
.../mllp/internal/MllpSocketBufferTest.java | 4 +-
.../mllp/internal/MllpSocketBufferWriteTest.java | 4 +-
.../src/test/resources/log4j2.properties | 2 +-
16 files changed, 412 insertions(+), 89 deletions(-)
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index bc4f41d..72b00bd 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -36,29 +37,27 @@ import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.component.mllp.internal.MllpSocketBuffer;
-import org.apache.camel.component.mllp.internal.TcpServerAcceptRunnable;
+import
org.apache.camel.component.mllp.internal.TcpServerConsumerValidationRunnable;
+import org.apache.camel.component.mllp.internal.TcpServerAcceptThread;
import org.apache.camel.component.mllp.internal.TcpSocketConsumerRunnable;
import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.processor.mllp.Hl7AcknowledgementGenerator;
/**
* The MLLP consumer.
*/
@ManagedResource(description = "MLLP Producer")
public class MllpTcpServerConsumer extends DefaultConsumer {
- final ExecutorService acceptExecutor;
+ final ExecutorService validationExecutor;
final ExecutorService consumerExecutor;
- TcpServerAcceptRunnable acceptRunnable;
+ TcpServerAcceptThread acceptThread;
Map<TcpSocketConsumerRunnable, Long> consumerRunnables = new
ConcurrentHashMap<>();
public MllpTcpServerConsumer(MllpEndpoint endpoint, Processor processor) {
super(endpoint, processor);
log.trace("MllpTcpServerConsumer(endpoint, processor)");
- // this.endpoint = endpoint;
- // this.configuration = endpoint.getConfiguration();
- acceptExecutor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new SynchronousQueue<>());
+ validationExecutor = Executors.newCachedThreadPool();
consumerExecutor = new ThreadPoolExecutor(1,
getConfiguration().getMaxConcurrentConsumers(),
getConfiguration().getAcceptTimeout(), TimeUnit.MILLISECONDS, new
SynchronousQueue<>());
}
@@ -111,9 +110,9 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
consumerClientSocketThread.stop();
}
- acceptRunnable.stop();
+ acceptThread.interrupt();
- acceptRunnable = null;
+ acceptThread = null;
super.doStop();
}
@@ -171,8 +170,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer
{
}
} while (!serverSocket.isBound());
- acceptRunnable = new TcpServerAcceptRunnable(this, serverSocket);
- acceptExecutor.submit(acceptRunnable);
+ // acceptRunnable = new TcpServerConsumerValidationRunnable(this,
serverSocket);
+ // validationExecutor.submit(acceptRunnable);
+ acceptThread = new TcpServerAcceptThread(this, serverSocket);
+ acceptThread.start();
super.doStart();
}
@@ -181,7 +182,10 @@ public class MllpTcpServerConsumer extends DefaultConsumer
{
protected void doShutdown() throws Exception {
super.doShutdown();
consumerExecutor.shutdownNow();
- acceptExecutor.shutdownNow();
+ if (acceptThread != null) {
+ acceptThread.interrupt();
+ }
+ validationExecutor.shutdownNow();
}
public MllpConfiguration getConfiguration() {
@@ -192,8 +196,21 @@ public class MllpTcpServerConsumer extends DefaultConsumer
{
return consumerRunnables;
}
- public void startConsumer(Socket clientSocket) {
- TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable(this,
clientSocket);
+ public void validateConsumer(Socket clientSocket) {
+ MllpSocketBuffer mllpBuffer = new MllpSocketBuffer(getEndpoint());
+ TcpServerConsumerValidationRunnable client = new
TcpServerConsumerValidationRunnable(this, clientSocket, mllpBuffer);
+
+ try {
+ log.info("Validating consumer for Socket {}", clientSocket);
+ validationExecutor.submit(client);
+ } catch (RejectedExecutionException rejectedExecutionEx) {
+ log.warn("Cannot validate consumer - max validations already
active");
+ mllpBuffer.resetSocket(clientSocket);
+ }
+ }
+
+ public void startConsumer(Socket clientSocket, MllpSocketBuffer
mllpBuffer) {
+ TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable(this,
clientSocket, mllpBuffer);
consumerRunnables.put(client, System.currentTimeMillis());
try {
@@ -205,5 +222,15 @@ public class MllpTcpServerConsumer extends DefaultConsumer
{
}
}
+
+ @Override
+ public void handleException(Throwable t) {
+ super.handleException(t);
+ }
+
+ @Override
+ public void handleException(String message, Throwable t) {
+ super.handleException(message, t);
+ }
}
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
index 8839045..faf6d63 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/MllpSocketBuffer.java
@@ -170,8 +170,11 @@ public class MllpSocketBuffer {
endOfBlockIndex = -1;
}
-
public synchronized void readFrom(Socket socket) throws
MllpSocketException, SocketTimeoutException {
+ readFrom(socket, endpoint.getConfiguration().getReceiveTimeout(),
endpoint.getConfiguration().getReadTimeout());
+ }
+
+ public synchronized void readFrom(Socket socket, int receiveTimeout, int
readTimeout) throws MllpSocketException, SocketTimeoutException {
log.trace("Entering readFrom ...");
if (socket != null && socket.isConnected() && !socket.isClosed()) {
ensureCapacity(MIN_BUFFER_SIZE);
@@ -179,11 +182,11 @@ public class MllpSocketBuffer {
try {
InputStream socketInputStream = socket.getInputStream();
-
socket.setSoTimeout(endpoint.getConfiguration().getReceiveTimeout());
+ socket.setSoTimeout(receiveTimeout);
readSocketInputStream(socketInputStream, socket);
if (!hasCompleteEnvelope()) {
-
socket.setSoTimeout(endpoint.getConfiguration().getReadTimeout());
+ socket.setSoTimeout(readTimeout);
while (!hasCompleteEnvelope()) {
ensureCapacity(Math.max(MIN_BUFFER_SIZE,
socketInputStream.available()));
@@ -249,7 +252,7 @@ public class MllpSocketBuffer {
log.trace("Exiting writeTo ...");
}
- public synchronized byte toByteArray()[] {
+ public synchronized byte[] toByteArray() {
if (availableByteCount > 0) {
return Arrays.copyOf(buffer, availableByteCount);
}
@@ -257,7 +260,7 @@ public class MllpSocketBuffer {
return null;
}
- public synchronized byte toByteArrayAndReset()[] {
+ public synchronized byte[] toByteArrayAndReset() {
byte[] answer = toByteArray();
reset();
@@ -293,6 +296,22 @@ public class MllpSocketBuffer {
return "";
}
+ public synchronized String toStringAndReset() {
+ String answer = toString();
+
+ reset();
+
+ return answer;
+ }
+
+ public synchronized String toStringAndReset(String charsetName) {
+ String answer = toString(charsetName);
+
+ reset();
+
+ return answer;
+ }
+
/**
* Convert the entire contents of the buffer (including enveloping
characters) to a print-friendly
* String representation.
@@ -534,8 +553,10 @@ public class MllpSocketBuffer {
}
void updateIndexes(int b, int indexOffset) {
- if (startOfBlockIndex < 0 && b ==
MllpProtocolConstants.START_OF_BLOCK) {
- startOfBlockIndex = availableByteCount + indexOffset;
+ if (startOfBlockIndex < 0) {
+ if (b == MllpProtocolConstants.START_OF_BLOCK) {
+ startOfBlockIndex = availableByteCount + indexOffset;
+ }
} else if (endOfBlockIndex < 0 && b ==
MllpProtocolConstants.END_OF_BLOCK) {
endOfBlockIndex = availableByteCount + indexOffset;
}
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java
similarity index 88%
rename from
components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java
rename to
components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java
index 568a56a..f126286 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptRunnable.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java
@@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
/**
- * Runnable to handle the ServerSocket.accept requests
+ * Thread to handle the ServerSocket.accept requests, and submit the sockets
to the accept executor for validation.
*/
-public class TcpServerAcceptRunnable implements Runnable {
+public class TcpServerAcceptThread extends Thread {
Logger log = LoggerFactory.getLogger(this.getClass());
MllpTcpServerConsumer consumer;
ServerSocket serverSocket;
boolean running;
- public TcpServerAcceptRunnable(MllpTcpServerConsumer consumer,
ServerSocket serverSocket) {
+ public TcpServerAcceptThread(MllpTcpServerConsumer consumer, ServerSocket
serverSocket) {
this.consumer = consumer;
this.serverSocket = serverSocket;
}
@@ -97,7 +97,6 @@ public class TcpServerAcceptRunnable implements Runnable {
Socket socket = null;
try {
socket = serverSocket.accept();
-
consumer.getEndpoint().updateLastConnectionEstablishedTicks();
} catch (SocketTimeoutException timeoutEx) {
// Didn't get a new connection - keep waiting for one
log.debug("Timeout waiting for client connection - keep
listening");
@@ -126,7 +125,7 @@ public class TcpServerAcceptRunnable implements Runnable {
if (MllpSocketBuffer.isConnectionValid(socket)) {
// Try and avoid starting client threads for things like
security scans and load balancer probes
- consumer.startConsumer(socket);
+ consumer.validateConsumer(socket);
}
}
} finally {
@@ -144,7 +143,18 @@ public class TcpServerAcceptRunnable implements Runnable {
}
}
- public void stop() {
- running = false;
+ @Override
+ public void interrupt() {
+ this.running = false;
+ super.interrupt();
+ if (null != serverSocket) {
+ if (serverSocket.isBound()) {
+ try {
+ serverSocket.close();
+ } catch (IOException ioEx) {
+ log.warn("Exception encountered closing ServerSocket in
interrupt() method - ignoring", ioEx);
+ }
+ }
+ }
}
}
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java
new file mode 100644
index 0000000..066b3db
--- /dev/null
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerConsumerValidationRunnable.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mllp.internal;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+import org.apache.camel.Route;
+import org.apache.camel.component.mllp.MllpSocketException;
+import org.apache.camel.component.mllp.MllpTcpServerConsumer;
+import org.apache.camel.impl.MDCUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Runnable to handle the ServerSocket.accept requests
+ */
+public class TcpServerConsumerValidationRunnable implements Runnable {
+ final Socket clientSocket;
+ final MllpSocketBuffer mllpBuffer;
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+ MllpTcpServerConsumer consumer;
+
+ private final String localAddress;
+ private final String remoteAddress;
+ private final String combinedAddress;
+
+ public TcpServerConsumerValidationRunnable(MllpTcpServerConsumer consumer,
Socket clientSocket, MllpSocketBuffer mllpBuffer) {
+ this.consumer = consumer;
+ // this.setName(createThreadName(clientSocket));
+ this.clientSocket = clientSocket;
+
+ SocketAddress localSocketAddress =
clientSocket.getLocalSocketAddress();
+ if (localSocketAddress != null) {
+ localAddress = localSocketAddress.toString();
+ } else {
+ localAddress = null;
+ }
+
+ SocketAddress remoteSocketAddress =
clientSocket.getRemoteSocketAddress();
+ if (remoteSocketAddress != null) {
+ remoteAddress = remoteSocketAddress.toString();
+ } else {
+ remoteAddress = null;
+ }
+
+ combinedAddress =
MllpSocketBuffer.formatAddressString(remoteSocketAddress, localSocketAddress);
+
+
+ try {
+ if (consumer.getConfiguration().hasKeepAlive()) {
+
this.clientSocket.setKeepAlive(consumer.getConfiguration().getKeepAlive());
+ }
+ if (consumer.getConfiguration().hasTcpNoDelay()) {
+
this.clientSocket.setTcpNoDelay(consumer.getConfiguration().getTcpNoDelay());
+ }
+ if (consumer.getConfiguration().hasReceiveBufferSize()) {
+
this.clientSocket.setReceiveBufferSize(consumer.getConfiguration().getReceiveBufferSize());
+ }
+ if (consumer.getConfiguration().hasSendBufferSize()) {
+
this.clientSocket.setSendBufferSize(consumer.getConfiguration().getSendBufferSize());
+ }
+
+ this.clientSocket.setSoLinger(false, -1);
+
+ // Initial Read Timeout
+
this.clientSocket.setSoTimeout(consumer.getConfiguration().getReceiveTimeout());
+ } catch (IOException initializationException) {
+ throw new IllegalStateException("Failed to initialize " +
this.getClass().getSimpleName(), initializationException);
+ }
+
+ if (mllpBuffer == null) {
+ this.mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint());
+ } else {
+ this.mllpBuffer = mllpBuffer;
+ }
+ }
+
+ /**
+ * derive a thread name from the class name, the component URI and the
connection information
+ * <p/>
+ * The String will in the format <class name>[endpoint key] - [local
socket address] -> [remote socket address]
+ *
+ * @return the thread name
+ */
+ String createThreadName(Socket socket) {
+ // Get the URI without options
+ String fullEndpointKey = consumer.getEndpoint().getEndpointKey();
+ String endpointKey;
+ if (fullEndpointKey.contains("?")) {
+ endpointKey = fullEndpointKey.substring(0,
fullEndpointKey.indexOf('?'));
+ } else {
+ endpointKey = fullEndpointKey;
+ }
+
+ // Now put it all together
+ return String.format("%s[%s] - %s", this.getClass().getSimpleName(),
endpointKey, combinedAddress);
+ }
+
+ /**
+ * Do the initial read on the Socket and try to determine if it has HL7
data, junk, or nothing.
+ */
+ @Override
+ public void run() {
+ String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(createThreadName(clientSocket));
+ MDC.put(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID,
consumer.getEndpoint().getCamelContext().getName());
+
+ Route route = consumer.getRoute();
+ if (route != null) {
+ String routeId = route.getId();
+ if (routeId != null) {
+ MDC.put(MDCUnitOfWork.MDC_ROUTE_ID, route.getId());
+ }
+ }
+
+ log.debug("Checking {} for data", combinedAddress);
+
+ try {
+ mllpBuffer.readFrom(clientSocket, 500, 50);
+ if (mllpBuffer.hasCompleteEnvelope() ||
mllpBuffer.hasStartOfBlock()) {
+ consumer.startConsumer(clientSocket, mllpBuffer);
+ } else if (!mllpBuffer.isEmpty()) {
+ // We have some leading out-of-band data but no START_OF_BLOCK
+ log.info("Ignoring out-of-band data on initial read: {}",
mllpBuffer.toStringAndReset());
+ mllpBuffer.resetSocket(clientSocket);
+ }
+ } catch (MllpSocketException socketEx) {
+ // TODO: The socket is invalid for some reason
+ if (!mllpBuffer.isEmpty()) {
+ log.warn("Exception encountered receiving complete message: ",
mllpBuffer.toStringAndReset());
+ }
+ mllpBuffer.resetSocket(clientSocket);
+ } catch (SocketTimeoutException timeoutEx) {
+ if (mllpBuffer.isEmpty()) {
+ log.debug("Initial read timed-out but no data was read -
starting consumer");
+ consumer.startConsumer(clientSocket, mllpBuffer);
+ } else {
+ log.warn("Timeout receiving complete message: {}",
mllpBuffer.toStringAndReset());
+ mllpBuffer.resetSocket(clientSocket);
+ }
+ } finally {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }
+
+ public void closeSocket() {
+ mllpBuffer.closeSocket(clientSocket);
+ }
+
+ public void closeSocket(String logMessage) {
+ mllpBuffer.closeSocket(clientSocket, logMessage);
+ }
+
+ public void resetSocket() {
+ mllpBuffer.resetSocket(clientSocket);
+ }
+
+ public void resetSocket(String logMessage) {
+ mllpBuffer.resetSocket(clientSocket, logMessage);
+ }
+
+ public String getLocalAddress() {
+ return localAddress;
+ }
+
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ public String getCombinedAddress() {
+ return combinedAddress;
+ }
+
+}
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
index 0f0aa56..8940867 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpSocketConsumerRunnable.java
@@ -61,7 +61,7 @@ public class TcpSocketConsumerRunnable implements Runnable {
private final String remoteAddress;
private final String combinedAddress;
- public TcpSocketConsumerRunnable(MllpTcpServerConsumer consumer, Socket
clientSocket) {
+ public TcpSocketConsumerRunnable(MllpTcpServerConsumer consumer, Socket
clientSocket, MllpSocketBuffer mllpBuffer) {
this.consumer = consumer;
// this.setName(createThreadName(clientSocket));
this.clientSocket = clientSocket;
@@ -105,7 +105,11 @@ public class TcpSocketConsumerRunnable implements Runnable
{
throw new IllegalStateException("Failed to initialize " +
this.getClass().getSimpleName(), initializationException);
}
- mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint());
+ if (mllpBuffer == null) {
+ this.mllpBuffer = new MllpSocketBuffer(consumer.getEndpoint());
+ } else {
+ this.mllpBuffer = mllpBuffer;
+ }
}
/**
@@ -448,6 +452,7 @@ public class TcpSocketConsumerRunnable implements Runnable {
throw runtimeEx;
} catch (Exception ex) {
log.error("Unexpected exception processing exchange", ex);
+ exchange.setException(ex);
}
} catch (Exception uowEx) {
// TODO: Handle this correctly
@@ -483,8 +488,15 @@ public class TcpSocketConsumerRunnable implements Runnable
{
log.debug("Starting {} for {}", this.getClass().getSimpleName(),
combinedAddress);
try {
+ byte[] hl7MessageBytes = null;
+ if (mllpBuffer.hasCompleteEnvelope()) {
+ // If we got a complete message on the validation read,
process it
+ hl7MessageBytes = mllpBuffer.toMllpPayload();
+ mllpBuffer.reset();
+ processMessage(hl7MessageBytes);
+ }
+
while (running && null != clientSocket &&
clientSocket.isConnected() && !clientSocket.isClosed()) {
- byte[] hl7MessageBytes = null;
log.debug("Checking for data ....");
try {
mllpBuffer.readFrom(clientSocket);
@@ -519,31 +531,16 @@ public class TcpSocketConsumerRunnable implements
Runnable {
consumer.getEndpoint().doConnectionClose(clientSocket, true, log);
}
}
- log.info("No data received - ignoring timeout");
+ log.debug("No data received - ignoring timeout");
} else {
mllpBuffer.resetSocket(clientSocket);
- if (consumer.getEndpoint().isBridgeErrorHandler()) {
- Exchange exchange =
consumer.getEndpoint().createExchange(ExchangePattern.InOut);
- exchange.setException(new
MllpInvalidMessageException("Timeout receiving complete payload",
mllpBuffer.toByteArray()));
- log.warn("Exception encountered reading payload -
sending exception to route", exchange.getException());
- try {
- consumer.getProcessor().process(exchange);
- } catch (Exception e) {
- log.error("Exception encountered processing
exchange with exception encounter reading payload", e);
- }
- } else {
- log.error("Timeout receiving complete payload",
new MllpInvalidMessageException("Timeout receiving complete payload",
mllpBuffer.toByteArray(), timeoutEx));
- }
+ new MllpInvalidMessageException("Timeout receiving
complete message payload", mllpBuffer.toByteArrayAndReset(), timeoutEx);
+ consumer.handleException(new
MllpInvalidMessageException("Timeout receiving complete message payload",
mllpBuffer.toByteArrayAndReset(), timeoutEx));
}
} catch (MllpSocketException mllpSocketEx) {
+ mllpBuffer.resetSocket(clientSocket);
if (!mllpBuffer.isEmpty()) {
- Exchange exchange =
consumer.getEndpoint().createExchange(ExchangePattern.InOut);
- exchange.setException(new
MllpReceiveException("Exception encountered reading payload",
mllpBuffer.toByteArrayAndReset(), mllpSocketEx));
- try {
- consumer.getProcessor().process(exchange);
- } catch (Exception ignoredEx) {
- log.error("Ingnoring exception encountered
processing exchange with exception encounter reading payload", ignoredEx);
- }
+ consumer.handleException(new
MllpReceiveException("Exception encountered reading payload",
mllpBuffer.toByteArrayAndReset(), mllpSocketEx));
} else {
log.warn("Ignoring exception encountered checking for
data", mllpSocketEx);
}
@@ -558,6 +555,8 @@ public class TcpSocketConsumerRunnable implements Runnable {
Thread.currentThread().setName(originalThreadName);
MDC.remove(MDCUnitOfWork.MDC_ROUTE_ID);
MDC.remove(MDCUnitOfWork.MDC_CAMEL_CONTEXT_ID);
+
+ mllpBuffer.resetSocket(clientSocket);
}
}
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java
index 9121136..0539e2e 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java
@@ -42,7 +42,10 @@ import org.junit.Rule;
import org.junit.Test;
public class MllpTcpClientProducerIdleConnectionTimeoutTest extends
CamelTestSupport {
- static final int IDLE_TIMEOUT = 10000;
+ static final int CONNECT_TIMEOUT = 500;
+ static final int RECEIVE_TIMEOUT = 1000;
+ static final int READ_TIMEOUT = 500;
+ static final int IDLE_TIMEOUT = RECEIVE_TIMEOUT * 3;
@Rule
public MllpServerResource mllpServer = new MllpServerResource("localhost",
AvailablePortFinder.getNextAvailable());
@@ -90,7 +93,8 @@ public class MllpTcpClientProducerIdleConnectionTimeoutTest
extends CamelTestSup
from(source.getDefaultEndpoint()).routeId(routeId)
.log(LoggingLevel.INFO, routeId, "Sending Message")
- .toF("mllp://%s:%d?idleTimeout=%s",
mllpServer.getListenHost(), mllpServer.getListenPort(), IDLE_TIMEOUT)
+
.toF("mllp://%s:%d?connectTimeout=%d&receiveTimeout=%d&readTimeout=%d&idleTimeout=%s",
mllpServer.getListenHost(), mllpServer.getListenPort(),
+ CONNECT_TIMEOUT, RECEIVE_TIMEOUT, READ_TIMEOUT,
IDLE_TIMEOUT)
.log(LoggingLevel.INFO, routeId, "Received
Acknowledgement")
.to(complete);
}
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
index b7b4d99..c366465 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerConnectionTest.java
@@ -40,7 +40,8 @@ import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.instanceOf;
public class MllpTcpServerConsumerConnectionTest extends CamelTestSupport {
- static final int RECEIVE_TIMEOUT = 500;
+ static final int RECEIVE_TIMEOUT = 1000;
+ static final int READ_TIMEOUT = 500;
@Rule
public MllpClientResource mllpClient = new MllpClientResource();
@@ -68,7 +69,7 @@ public class MllpTcpServerConsumerConnectionTest extends
CamelTestSupport {
String routeId = "mllp-receiver";
public void configure() {
- fromF("mllp://%s:%d?autoAck=false", mllpClient.getMllpHost(),
mllpClient.getMllpPort())
+
fromF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&autoAck=false",
mllpClient.getMllpHost(), mllpClient.getMllpPort(), RECEIVE_TIMEOUT,
READ_TIMEOUT)
.log(LoggingLevel.INFO, routeId, "Receiving: ${body}")
.to(result);
}
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java
index 3645314..62140591 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithValidationTest.java
@@ -17,6 +17,11 @@
package org.apache.camel.component.mllp;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
+
public class MllpTcpServerConsumerOptionalEndOfDataWithValidationTest extends
TcpServerConsumerEndOfDataAndValidationTestSupport {
@Override
@@ -62,7 +67,13 @@ public class
MllpTcpServerConsumerOptionalEndOfDataWithValidationTest extends Tc
public void testMessageContainingEmbeddedEndOfBlock() throws Exception {
expectedInvalidCount = 1;
- runMessageContainingEmbeddedEndOfBlock();
+ setExpectedCounts();
+
+ NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
+
+
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
"EVN" + MllpProtocolConstants.END_OF_BLOCK));
+
+ assertTrue("Exchange should have completed", done.matches(5,
TimeUnit.SECONDS));
}
@Override
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
index 66087da..cc1c7d4 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest.java
@@ -17,6 +17,9 @@
package org.apache.camel.component.mllp;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.component.mllp.internal.Hl7Util;
import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
@@ -44,6 +47,8 @@ public class
MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest extends
@Override
public void testNthInvalidMessage() throws Exception {
+ expectedFailedCount = 1;
+
runNthInvalidMessage();
}
@@ -65,8 +70,13 @@ public class
MllpTcpServerConsumerOptionalEndOfDataWithoutValidationTest extends
public void testMessageContainingEmbeddedEndOfBlock() throws Exception {
expectedCompleteCount = 1;
- runMessageContainingEmbeddedEndOfBlock();
- }
+ setExpectedCounts();
+
+ NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
+
+
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
"EVN" + MllpProtocolConstants.END_OF_BLOCK));
+
+ assertTrue("Exchange should have completed", done.matches(5,
TimeUnit.SECONDS)); }
@Override
public void testNthMessageContainingEmbeddedEndOfBlock() throws Exception {
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java
index d8bee66..bb7666f 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithValidationTest.java
@@ -17,6 +17,11 @@
package org.apache.camel.component.mllp;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
+
public class MllpTcpServerConsumerRequiredEndOfDataWithValidationTest extends
TcpServerConsumerEndOfDataAndValidationTestSupport {
@Override
@@ -59,9 +64,15 @@ public class
MllpTcpServerConsumerRequiredEndOfDataWithValidationTest extends Tc
@Override
public void testMessageContainingEmbeddedEndOfBlock() throws Exception {
- expectedInvalidCount = 1;
+ //expectedInvalidCount = 1;
+
+ setExpectedCounts();
+
+ NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
+
+
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
"EVN" + MllpProtocolConstants.END_OF_BLOCK));
- runMessageContainingEmbeddedEndOfBlock();
+ assertFalse("Exchange should not have completed", done.matches(5,
TimeUnit.SECONDS));
}
@Override
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
index 458a7bf..4abe439 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest.java
@@ -37,11 +37,15 @@ public class
MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest extends
@Override
public void testInvalidMessage() throws Exception {
+ expectedFailedCount = 1;
+
runNthInvalidMessage();
}
@Override
public void testNthInvalidMessage() throws Exception {
+ expectedFailedCount = 1;
+
runNthInvalidMessage();
}
@@ -61,9 +65,13 @@ public class
MllpTcpServerConsumerRequiredEndOfDataWithoutValidationTest extends
@Override
public void testMessageContainingEmbeddedEndOfBlock() throws Exception {
- expectedInvalidCount = 1;
+ setExpectedCounts();
+
+ NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
+
+
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
"EVN" + MllpProtocolConstants.END_OF_BLOCK));
- runMessageContainingEmbeddedEndOfBlock();
+ assertFalse("Exchange should not have completed", done.matches(5,
TimeUnit.SECONDS));
}
@Override
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java
index f659685..4893f7e 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpClientProducerEndOfDataAndValidationTestSupport.java
@@ -35,6 +35,9 @@ import org.junit.Rule;
import org.junit.Test;
public abstract class TcpClientProducerEndOfDataAndValidationTestSupport
extends CamelTestSupport {
+ static final int RECEIVE_TIMEOUT = 1000;
+ static final int READ_TIMEOUT = 500;
+
static final String TEST_MESSAGE =
"MSH|^~\\&|ADT|EPIC|JCAPS|CC|20161206193919|RISTECH|ADT^A08|00001|D|2.3^^|||||||"
+ '\r'
+
"EVN|A08|20150107161440||REG_UPDATE_SEND_VISIT_MESSAGES_ON_PATIENT_CHANGES|RISTECH^RADIOLOGY^TECHNOLOGIST^^^^^^UCLA^^^^^RRMC||"
+ '\r'
@@ -158,7 +161,8 @@ public abstract class
TcpClientProducerEndOfDataAndValidationTestSupport extends
from(source.getDefaultEndpoint()).routeId(routeId)
.log(LoggingLevel.INFO, routeId, "Sending Message")
-
.toF("mllp://%s:%d?validatePayload=%b&requireEndOfData=%b",
mllpServer.getListenHost(), mllpServer.getListenPort(), validatePayload(),
requireEndOfData())
+
.toF("mllp://%s:%d?receiveTimeout=%d&readTimeout=%d&validatePayload=%b&requireEndOfData=%b",
mllpServer.getListenHost(), mllpServer.getListenPort(),
+ RECEIVE_TIMEOUT, READ_TIMEOUT, validatePayload(),
requireEndOfData())
.log(LoggingLevel.INFO, routeId, "Received
Acknowledgement")
.to(aa);
}
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
index 094ff08..fe6e6f7 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerEndOfDataAndValidationTestSupport.java
@@ -17,6 +17,9 @@
package org.apache.camel.component.mllp;
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+import java.net.SocketException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
@@ -30,7 +33,6 @@ import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit.rule.mllp.MllpClientResource;
import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException;
-import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceTimeoutException;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.test.mllp.Hl7TestMessageGenerator;
import org.junit.Rule;
@@ -38,7 +40,8 @@ import org.junit.Test;
public abstract class TcpServerConsumerEndOfDataAndValidationTestSupport
extends CamelTestSupport {
static final int CONNECT_TIMEOUT = 500;
- static final int RESPONSE_TIMEOUT = 5000;
+ static final int RECEIVE_TIMEOUT = 1000;
+ static final int READ_TIMEOUT = 500;
@Rule
public MllpClientResource mllpClient = new MllpClientResource();
@@ -46,10 +49,14 @@ public abstract class
TcpServerConsumerEndOfDataAndValidationTestSupport extends
@EndpointInject(uri = "mock://complete")
MockEndpoint complete;
+ @EndpointInject(uri = "mock://failed")
+ MockEndpoint failed;
+
@EndpointInject(uri = "mock://invalid-ex")
MockEndpoint invalid;
int expectedCompleteCount;
+ int expectedFailedCount;
int expectedInvalidCount;
@Override
@@ -76,8 +83,11 @@ public abstract class
TcpServerConsumerEndOfDataAndValidationTestSupport extends
onException(MllpInvalidMessageException.class)
.to(invalid);
-
fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&validatePayload=%b&requireEndOfData=%b",
- mllpClient.getMllpHost(), mllpClient.getMllpPort(),
CONNECT_TIMEOUT, RESPONSE_TIMEOUT, validatePayload(), requireEndOfData())
+ onCompletion().onFailureOnly()
+ .to(failed);
+
+
fromF("mllp://%s:%d?autoAck=true&connectTimeout=%d&receiveTimeout=%d&readTimeout=%d&validatePayload=%b&requireEndOfData=%b",
+ mllpClient.getMllpHost(), mllpClient.getMllpPort(),
CONNECT_TIMEOUT, RECEIVE_TIMEOUT, READ_TIMEOUT, validatePayload(),
requireEndOfData())
.routeId(routeId)
.log(LoggingLevel.INFO, routeId, "Test route received
message")
.to(complete);
@@ -91,6 +101,7 @@ public abstract class
TcpServerConsumerEndOfDataAndValidationTestSupport extends
protected void setExpectedCounts() {
complete.expectedMessageCount(expectedCompleteCount);
+ failed.expectedMessageCount(expectedFailedCount);
invalid.expectedMessageCount(expectedInvalidCount);
}
@@ -160,7 +171,7 @@ public abstract class
TcpServerConsumerEndOfDataAndValidationTestSupport extends
log.info("Sending TEST_MESSAGE_2");
String acknowledgement2 =
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(2));
- assertTrue("First two normal exchanges did not complete",
notify1.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS));
+ assertTrue("First two normal exchanges did not complete",
notify1.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS));
log.info("Sending TEST_MESSAGE_3");
mllpClient.setSendEndOfBlock(false);
@@ -183,7 +194,7 @@ public abstract class
TcpServerConsumerEndOfDataAndValidationTestSupport extends
log.info("Sending TEST_MESSAGE_5");
String acknowledgement5 =
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(5));
- assertTrue("Remaining exchanges did not complete",
notify2.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS));
+ assertTrue("Remaining exchanges did not complete",
notify2.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS));
assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
@@ -207,27 +218,47 @@ public abstract class
TcpServerConsumerEndOfDataAndValidationTestSupport extends
// Send one message to establish the connection and start the
ConsumerClientSocketThread
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage());
- assertTrue("One exchange should have completed",
oneDone.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS));
+ assertTrue("One exchange should have completed",
oneDone.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS));
mllpClient.setSendEndOfBlock(false);
mllpClient.setSendEndOfData(false);
- mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage());
+
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage());
- assertTrue("Two exchanges should have completed",
twoDone.matches(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS));
+ assertTrue("Two exchanges should have completed",
twoDone.matches(RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS));
}
@Test
public void testInitialMessageReadTimeout() throws Exception {
- expectedInvalidCount = 1;
+ expectedCompleteCount = 1;
setExpectedCounts();
mllpClient.setSendEndOfBlock(false);
mllpClient.setSendEndOfData(false);
- mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage());
+ log.info("Sending first message");
+
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage(10001));
+
+ Thread.sleep(RECEIVE_TIMEOUT * 5);
+
+ mllpClient.setSendEndOfBlock(true);
+ mllpClient.setSendEndOfData(true);
+
+ try {
+ log.info("Attempting to send second message");
+ String acknowledgement =
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10002));
+ assertEquals("If the send doesn't throw an exception, the
acknowledgement should be empty", "", acknowledgement);
+ } catch (MllpJUnitResourceException expected) {
+ assertThat("If the send throws an exception, the cause should be a
SocketException", expected.getCause(), instanceOf(SocketException.class));
+ }
+
+ mllpClient.disconnect();
+ mllpClient.connect();
+
+ log.info("Sending third message");
+ String acknowledgement =
mllpClient.sendMessageAndWaitForAcknowledgement(Hl7TestMessageGenerator.generateMessage(10003));
}
@Test
@@ -306,16 +337,6 @@ public abstract class
TcpServerConsumerEndOfDataAndValidationTestSupport extends
@Test
public abstract void testMessageContainingEmbeddedEndOfBlock() throws
Exception;
- protected void runMessageContainingEmbeddedEndOfBlock() throws Exception {
- setExpectedCounts();
-
- NotifyBuilder done = new NotifyBuilder(context()).whenDone(1).create();
-
-
mllpClient.sendFramedData(Hl7TestMessageGenerator.generateMessage().replaceFirst("EVN",
"EVN" + MllpProtocolConstants.END_OF_BLOCK));
-
- assertTrue("Exchange should have completed", done.matches(15,
TimeUnit.SECONDS));
- }
-
@Test
public abstract void testInvalidMessageContainingEmbeddedEndOfBlock()
throws Exception;
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
index 092be01..cd87fc6 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferTest.java
@@ -403,7 +403,7 @@ public class MllpSocketBufferTest extends
SocketBufferTestSupport {
assertFalse("Unexpected initial value", instance.hasEndOfBlock());
instance.write(MllpProtocolConstants.END_OF_BLOCK);
- assertTrue(instance.hasEndOfBlock());
+ assertFalse("START_OF_BLOCK before an END_OF_BLOCK",
instance.hasEndOfBlock());
instance.reset();
assertFalse(instance.hasEndOfBlock());
@@ -457,7 +457,7 @@ public class MllpSocketBufferTest extends
SocketBufferTestSupport {
assertFalse(instance.hasEndOfData());
instance.write(MllpProtocolConstants.END_OF_DATA);
- assertTrue(instance.hasEndOfData());
+ assertFalse("Need a START_OF_BLOCK before the END_OF_DATA",
instance.hasEndOfData());
instance.reset();
assertFalse(instance.hasEndOfData());
diff --git
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
index 1f21bbf..903669e 100644
---
a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
+++
b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/internal/MllpSocketBufferWriteTest.java
@@ -57,7 +57,7 @@ public class MllpSocketBufferWriteTest extends
SocketBufferTestSupport {
assertEquals(1, instance.size());
assertEquals(-1, instance.startOfBlockIndex);
- assertEquals(0, instance.endOfBlockIndex);
+ assertEquals(-1, instance.endOfBlockIndex);
}
/**
@@ -142,7 +142,7 @@ public class MllpSocketBufferWriteTest extends
SocketBufferTestSupport {
assertEquals(6, instance.size());
assertEquals(-1, instance.startOfBlockIndex);
- assertEquals(4, instance.endOfBlockIndex);
+ assertEquals(-1, instance.endOfBlockIndex);
}
/**
diff --git a/components/camel-mllp/src/test/resources/log4j2.properties
b/components/camel-mllp/src/test/resources/log4j2.properties
index 5c96df2..4179677 100644
--- a/components/camel-mllp/src/test/resources/log4j2.properties
+++ b/components/camel-mllp/src/test/resources/log4j2.properties
@@ -31,4 +31,4 @@ rootLogger.appenderRef.file.ref = file
loggers = mllp
logger.mllp.name = org.apache.camel.component.mllp
-# logger.mllp.level = TRACE
+logger.mllp.level = DEBUG
--
To stop receiving notification emails like this one, please contact
[email protected].