Repository: nifi
Updated Branches:
  refs/heads/master c2bfc4ef2 -> 8536ad65f


NIFI-2651: Ensure that when we disable transmission on an RPG that we interrupt 
any transactions in progress for http-based site-to-site

This closes #937.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8536ad65
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8536ad65
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8536ad65

Branch: refs/heads/master
Commit: 8536ad65f4b6736b5b27eebd63b346f0f8d064f7
Parents: c2bfc4e
Author: Mark Payne <[email protected]>
Authored: Wed Aug 24 21:06:30 2016 -0400
Committer: Koji Kawamura <[email protected]>
Committed: Thu Aug 25 15:33:10 2016 +0900

----------------------------------------------------------------------
 .../apache/nifi/remote/AbstractTransaction.java |  2 +-
 .../nifi/remote/client/http/HttpClient.java     | 22 ++++++++++++++++++--
 .../io/http/HttpCommunicationsSession.java      |  2 ++
 .../apache/nifi/remote/io/http/HttpInput.java   | 11 +++++++++-
 .../apache/nifi/remote/io/http/HttpOutput.java  | 11 +++++++++-
 5 files changed, 43 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8536ad65/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
index 0752fa1..826cf00 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
@@ -99,7 +99,7 @@ public abstract class AbstractTransaction implements 
Transaction {
     }
 
     @Override
-    public Communicant getCommunicant() {
+    public Peer getCommunicant() {
         return peer;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8536ad65/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index 0f0d4a5..8821edb 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -42,6 +42,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -55,6 +57,7 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
 
     private final ScheduledExecutorService taskExecutor;
     private final PeerSelector peerSelector;
+    private final Set<HttpClientTransaction> activeTransactions = 
Collections.synchronizedSet(new HashSet<>());
 
     public HttpClient(final SiteToSiteClientConfig config) {
         super(config);
@@ -177,15 +180,26 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
             // We found a valid peer to communicate with.
             final Integer transactionProtocolVersion = 
apiClient.getTransactionProtocolVersion();
             final HttpClientTransaction transaction = new 
HttpClientTransaction(transactionProtocolVersion, peer, direction,
-                    config.isUseCompression(), portId, penaltyMillis, 
config.getEventReporter());
+                config.isUseCompression(), portId, penaltyMillis, 
config.getEventReporter()) {
+
+                @Override
+                protected void close() throws IOException {
+                    try {
+                        super.close();
+                    } finally {
+                        activeTransactions.remove(this);
+                    }
+                }
+            };
+
             transaction.initialize(apiClient, transactionUrl);
 
+            activeTransactions.add(transaction);
             return transaction;
         }
 
         logger.info("Couldn't find a valid peer to communicate with.");
         return null;
-
     }
 
     private String resolveNodeApiUrl(final PeerDescription description) {
@@ -201,5 +215,9 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
     public void close() throws IOException {
         taskExecutor.shutdown();
         peerSelector.clear();
+
+        for (final HttpClientTransaction transaction : activeTransactions) {
+            
transaction.getCommunicant().getCommunicationsSession().interrupt();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8536ad65/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
index 868fb36..95b3314 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
@@ -74,6 +74,8 @@ public class HttpCommunicationsSession extends 
AbstractCommunicationsSession {
 
     @Override
     public void interrupt() {
+        input.interrupt();
+        output.interrupt();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/8536ad65/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
index 5048306..77804cd 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.remote.io.http;
 
+import org.apache.nifi.remote.io.InterruptableInputStream;
 import org.apache.nifi.remote.protocol.CommunicationsInput;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 
@@ -25,6 +26,7 @@ import java.io.InputStream;
 public class HttpInput implements CommunicationsInput {
 
     private ByteCountingInputStream countingIn;
+    private InterruptableInputStream interruptableIn;
 
     @Override
     public InputStream getInputStream() throws IOException {
@@ -53,6 +55,13 @@ public class HttpInput implements CommunicationsInput {
     }
 
     public void setInputStream(InputStream inputStream) {
-        this.countingIn = new ByteCountingInputStream(inputStream);
+        interruptableIn = new InterruptableInputStream(inputStream);
+        this.countingIn = new ByteCountingInputStream(interruptableIn);
+    }
+
+    public void interrupt() {
+        if (interruptableIn != null) {
+            interruptableIn.interrupt();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8536ad65/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
index b78be18..54c105f 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.remote.io.http;
 
+import org.apache.nifi.remote.io.InterruptableOutputStream;
 import org.apache.nifi.remote.protocol.CommunicationsOutput;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 
@@ -25,6 +26,7 @@ import java.io.OutputStream;
 public class HttpOutput implements CommunicationsOutput {
 
     private ByteCountingOutputStream countingOut;
+    private InterruptableOutputStream interruptableOut;
 
     @Override
     public OutputStream getOutputStream() throws IOException {
@@ -40,6 +42,13 @@ public class HttpOutput implements CommunicationsOutput {
     }
 
     public void setOutputStream(OutputStream outputStream) {
-        this.countingOut = new ByteCountingOutputStream(outputStream);
+        interruptableOut = new InterruptableOutputStream(outputStream);
+        this.countingOut = new ByteCountingOutputStream(interruptableOut);
+    }
+
+    public void interrupt() {
+        if (interruptableOut != null) {
+            interruptableOut.interrupt();
+        }
     }
 }

Reply via email to