Repository: nifi
Updated Branches:
  refs/heads/master ae3db8230 -> f97b3fe45


NIFI-3963: - Ensuring the RemoteGroupPort yields when the details cannot be 
refreshed from any of the configured remote instances.

This closes #1853.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f97b3fe4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f97b3fe4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f97b3fe4

Branch: refs/heads/master
Commit: f97b3fe455e7cf8bd051c7a40f3e3f7ee1d1a16a
Parents: ae3db82
Author: Matt Gilman <[email protected]>
Authored: Wed May 24 14:28:51 2017 -0400
Committer: Bryan Bende <[email protected]>
Committed: Wed May 24 15:58:22 2017 -0400

----------------------------------------------------------------------
 .../client/socket/EndpointConnectionPool.java   | 62 +++++++++++---------
 .../exception/UnreachableClusterException.java  | 33 +++++++++++
 .../nifi/remote/StandardRemoteGroupPort.java    | 39 +++++++-----
 3 files changed, 90 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f97b3fe4/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 926e4b4..a360c87 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -16,9 +16,32 @@
  */
 package org.apache.nifi.remote.client.socket;
 
-import static org.apache.nifi.remote.util.EventReportUtil.error;
-import static org.apache.nifi.remote.util.EventReportUtil.warn;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.PeerSelector;
+import org.apache.nifi.remote.client.PeerStatusProvider;
+import org.apache.nifi.remote.client.SiteInfoProvider;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.exception.UnreachableClusterException;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -44,31 +67,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.RemoteDestination;
-import org.apache.nifi.remote.RemoteResourceInitiator;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.PeerSelector;
-import org.apache.nifi.remote.client.PeerStatusProvider;
-import org.apache.nifi.remote.client.SiteInfoProvider;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
 
 public class EndpointConnectionPool implements PeerStatusProvider {
 
@@ -157,7 +157,13 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
         SocketClientProtocol protocol = null;
         EndpointConnection connection;
         Peer peer = null;
-        URI clusterUrl = siteInfoProvider.getActiveClusterUrl();
+
+        final URI clusterUrl;
+        try {
+            clusterUrl = siteInfoProvider.getActiveClusterUrl();
+        } catch (final IOException ioe) {
+            throw new UnreachableClusterException("Unable to refresh details 
from any of the configured remote instances.", ioe);
+        }
 
         do {
             final List<EndpointConnection> addBack = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/f97b3fe4/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnreachableClusterException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnreachableClusterException.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnreachableClusterException.java
new file mode 100644
index 0000000..5e9452f
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnreachableClusterException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * UnreachableClusterException when none of the target clusterUrls are 
reachable.
+ */
+public class UnreachableClusterException extends ProtocolException {
+
+    private static final long serialVersionUID = 6147433671708846798L;
+
+    public UnreachableClusterException(final String message) {
+        super(message);
+    }
+
+    public UnreachableClusterException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f97b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index b1288f3..1be5b42 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,22 +16,6 @@
  */
 package org.apache.nifi.remote;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.ssl.SSLContext;
-
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
@@ -56,6 +40,7 @@ import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.exception.UnreachableClusterException;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@@ -69,6 +54,21 @@ import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class StandardRemoteGroupPort extends RemoteGroupPort {
 
     private static final long BATCH_SEND_NANOS = 
TimeUnit.MILLISECONDS.toNanos(500L); // send batches of up to 500 millis
@@ -239,6 +239,13 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
+        } catch (final UnreachableClusterException e) {
+            context.yield();
+            final String message = String.format("%s failed to communicate 
with %s due to %s", this, url, e.toString());
+            logger.error(message);
+            session.rollback();
+            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
+            return;
         } catch (final IOException e) {
             // we do not yield here because the 'peer' will be penalized, and 
we won't communicate with that particular nifi instance
             // for a while due to penalization, but we can continue to talk to 
other nifi instances

Reply via email to