This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit a667e0ec643a720a3fc010fdb991b2e89a94d6f4 Author: Sebastian Rühl <sebastian.ru...@codecentric.de> AuthorDate: Thu Dec 21 11:49:27 2017 +0100 Cleanup camel component: - Fixed shutdown - Async call in test - Added path variable for driver --- .../java/org/apache/plc4x/camel/PLC4XEndpoint.java | 9 ++++ .../java/org/apache/plc4x/camel/PLC4XProducer.java | 57 +++++++++++----------- .../org/apache/plc4x/camel/PLC4XComponentTest.java | 6 ++- 3 files changed, 41 insertions(+), 31 deletions(-) diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java index dd800e1..4fd84e4 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java @@ -32,10 +32,19 @@ import org.apache.plc4x.java.PlcDriverManager; public class PLC4XEndpoint extends DefaultEndpoint { /** + * The name 0f the PLC4X driver + */ + @UriPath + @Metadata(required = "true") + @SuppressWarnings("unused") + String driver; + + /** * The address for the PLC4X driver */ @UriPath @Metadata(required = "true") + @SuppressWarnings("unused") String address; final PlcDriverManager plcDriverManager; diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java index 29331d2..e11d1e5 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java @@ -21,22 +21,22 @@ package org.apache.plc4x.camel; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.ShutdownRunningTask; import org.apache.camel.impl.DefaultAsyncProducer; -import org.apache.camel.spi.ShutdownAware; import org.apache.plc4x.java.api.connection.PlcConnection; import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.messages.PlcWriteRequest; +import org.apache.plc4x.java.api.messages.PlcWriteResponse; import org.apache.plc4x.java.api.model.Address; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class PLC4XProducer extends DefaultAsyncProducer implements ShutdownAware { - private static final Logger LOG = LoggerFactory.getLogger(PLC4XProducer.class); +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +public class PLC4XProducer extends DefaultAsyncProducer { + @SuppressWarnings("unused") private PLC4XEndpoint endpoint; private PlcConnection plcConnection; + private AtomicInteger openRequests; public PLC4XProducer(PLC4XEndpoint endpoint) { super(endpoint); @@ -47,6 +47,7 @@ public class PLC4XProducer extends DefaultAsyncProducer implements ShutdownAware } catch (PlcException e) { throw new RuntimeException(e); } + openRequests = new AtomicInteger(); } @SuppressWarnings("unchecked") @@ -57,13 +58,21 @@ public class PLC4XProducer extends DefaultAsyncProducer implements ShutdownAware Object value = in.getBody(Object.class); PlcWriteRequest plcSimpleWriteRequest = new PlcWriteRequest(datatype, address, value); PlcWriter plcWriter = plcConnection.getWriter().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found")); - Object response = plcWriter.write(plcSimpleWriteRequest).get(); - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - out.setBody(response); - } else { - in.setBody(response); + CompletableFuture<PlcWriteResponse> completableFuture = plcWriter.write(plcSimpleWriteRequest); + int currentlyOpenRequests = openRequests.incrementAndGet(); + try { + log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests); + PlcWriteResponse plcWriteResponse = completableFuture.get(); + if (exchange.getPattern().isOutCapable()) { + Message out = exchange.getOut(); + out.copyFrom(exchange.getIn()); + out.setBody(plcWriteResponse); + } else { + in.setBody(plcWriteResponse); + } + } finally { + int openRequestsAfterFinish = openRequests.decrementAndGet(); + log.trace("Open Requests after {}:{}", exchange, openRequestsAfterFinish); } } @@ -81,27 +90,17 @@ public class PLC4XProducer extends DefaultAsyncProducer implements ShutdownAware } @Override - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - switch (shutdownRunningTask) { - case CompleteCurrentTaskOnly: - break; - case CompleteAllTasks: - break; + protected void doStop() throws Exception { + int openRequestsAtStop = openRequests.get(); + log.debug("Stopping with {} open requests", openRequestsAtStop); + if (openRequestsAtStop > 0) { + log.warn("There are still {} open requests", openRequestsAtStop); } try { plcConnection.close(); } catch (Exception ignore) { } - return false; + super.doStop(); } - @Override - public int getPendingExchangesSize() { - return 0; - } - - @Override - public void prepareShutdown(boolean suspendOnly, boolean forced) { - - } } diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java index cc3ae5f..0e3e8ea 100644 --- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java +++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java @@ -25,6 +25,8 @@ import org.apache.plc4x.java.s7.model.S7Address; import org.apache.plc4x.java.s7.netty.model.types.MemoryArea; import org.junit.Test; +import java.util.concurrent.TimeUnit; + public class PLC4XComponentTest extends CamelTestSupport { @Test @@ -32,9 +34,9 @@ public class PLC4XComponentTest extends CamelTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMinimumMessageCount(1); - template.sendBody("direct:plc4x", 3); + template.asyncSendBody("direct:plc4x", "irrelevant"); - assertMockEndpointsSatisfied(); + assertMockEndpointsSatisfied(2, TimeUnit.SECONDS); } @Override -- To stop receiving notification emails like this one, please contact "commits@plc4x.apache.org" <commits@plc4x.apache.org>.