This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new f01668e NIFI-6530 - HTTP SiteToSite server returns 201 in case no
data is available
f01668e is described below
commit f01668e66ad2e45197915769e966a4be27e1592e
Author: Arpad Boda <[email protected]>
AuthorDate: Mon Aug 12 17:30:30 2019 +0200
NIFI-6530 - HTTP SiteToSite server returns 201 in case no data is available
This closes #3647.
Signed-off-by: Koji Kawamura <[email protected]>
---
.../apache/nifi/remote/client/PeerSelector.java | 22 +++-----
.../apache/nifi/remote/client/http/HttpClient.java | 22 ++++++--
.../http/TransportProtocolVersionNegotiator.java | 1 +
.../client/socket/EndpointConnectionPool.java | 19 ++++---
.../nifi/remote/client/socket/SocketClient.java | 6 +--
.../nifi/remote/exception/NoContentException.java | 39 ++++++++++++++
.../remote/exception/NoValidPeerException.java | 40 ++++++++++++++
.../protocol/socket/SocketClientTransaction.java | 4 ++
.../nifi/remote/util/SiteToSiteRestApiClient.java | 6 ++-
.../nifi/remote/client/TestPeerSelector.java | 31 ++++++-----
.../nifi/remote/client/http/TestHttpClient.java | 63 +++++++++++++++++++---
.../socket/TestSocketClientTransaction.java | 17 +++---
.../java/org/apache/nifi/spark/NiFiReceiver.java | 7 +++
.../nifi/remote/StandardRemoteGroupPort.java | 15 ++++--
.../apache/nifi/web/api/DataTransferResource.java | 19 ++++++-
.../stateless/core/StatelessRemoteOutputPort.java | 8 ++-
.../apache/nifi/toolkit/s2s/SiteToSiteCliMain.java | 3 ++
.../nifi/toolkit/s2s/SiteToSiteReceiver.java | 4 ++
18 files changed, 253 insertions(+), 73 deletions(-)
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index a443967..14c163b 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -262,7 +262,7 @@ public class PeerSelector {
* for RECEIVE, a peer with more flow files is preferred
* @return a selected peer, if there is no available peer or all peers are
penalized, then return null
*/
- public PeerStatus getNextPeerStatus(final TransferDirection direction) {
+ public ArrayList<PeerStatus> getPeerStatuses(final TransferDirection
direction) {
List<PeerStatus> peerList = peerStatuses;
if (isPeerRefreshNeeded(peerList)) {
peerRefreshLock.lock();
@@ -289,25 +289,15 @@ public class PeerSelector {
}
}
+
if (peerList == null || peerList.isEmpty()) {
- return null;
+ return new ArrayList<PeerStatus>();
}
- PeerStatus peerStatus;
- for (int i = 0; i < peerList.size(); i++) {
- final long idx = peerIndex.getAndIncrement();
- final int listIndex = (int) (idx % peerList.size());
- peerStatus = peerList.get(listIndex);
-
- if (isPenalized(peerStatus)) {
- logger.debug("{} {} is penalized; will not communicate with
this peer", this, peerStatus);
- } else {
- return peerStatus;
- }
- }
+ ArrayList<PeerStatus> retVal = new ArrayList<>(peerList);
+ retVal.removeIf(p -> isPenalized(p));
- logger.debug("{} All peers appear to be penalized; returning null",
this);
- return null;
+ return retVal;
}
private List<PeerStatus> createPeerStatusList(final TransferDirection
direction) throws IOException {
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index 4213dac..e1516d2 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -27,6 +27,8 @@ import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NoContentException;
+import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -124,9 +127,11 @@ public class HttpClient extends AbstractSiteToSiteClient
implements PeerStatusPr
@Override
public Transaction createTransaction(final TransferDirection direction)
throws HandshakeException, PortNotRunningException, ProtocolException,
UnknownPortException, IOException {
final int timeoutMillis = (int)
config.getTimeout(TimeUnit.MILLISECONDS);
+ Integer peersWithNoContent = 0;
- PeerStatus peerStatus;
- while ((peerStatus = peerSelector.getNextPeerStatus(direction)) !=
null) {
+ ArrayList<PeerStatus> peers = peerSelector.getPeerStatuses(direction);
+
+ for (PeerStatus peerStatus : peers) {
logger.debug("peerStatus={}", peerStatus);
final CommunicationsSession commSession = new
HttpCommunicationsSession();
@@ -168,6 +173,11 @@ public class HttpClient extends AbstractSiteToSiteClient
implements PeerStatusPr
try {
transactionUrl = apiClient.initiateTransaction(direction,
portId);
commSession.setUserDn(apiClient.getTrustedPeerDn());
+ } catch (final NoContentException e) {
+ apiClient.close();
+ peersWithNoContent++;
+ logger.debug("Peer {} has no flowfiles to provide", peer);
+ continue;
} catch (final Exception e) {
apiClient.close();
logger.warn("Penalizing a peer {} due to {}", peer,
e.toString());
@@ -210,8 +220,12 @@ public class HttpClient extends AbstractSiteToSiteClient
implements PeerStatusPr
return transaction;
}
- logger.info("Couldn't find a valid peer to communicate with.");
- return null;
+ if(peersWithNoContent > 0) {
+ return null;
+ }
+ String error = new String("Couldn't find a valid peer to communicate
with.");
+ logger.info(error);
+ throw new NoValidPeerException(error);
}
private String resolveNodeApiUrl(final PeerDescription description) {
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
index d0a6368..844a92e 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
@@ -32,6 +32,7 @@ public class TransportProtocolVersionNegotiator extends
StandardVersionNegotiato
public int getTransactionProtocolVersion() {
switch (getVersion()) {
case 1:
+ case 2:
return 5;
default:
throw new RuntimeException("Transport protocol version " +
getVersion()
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index a360c87..e9f2536 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -29,6 +29,7 @@ import org.apache.nifi.remote.client.SiteInfoProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
@@ -165,14 +166,9 @@ public class EndpointConnectionPool implements
PeerStatusProvider {
throw new UnreachableClusterException("Unable to refresh details
from any of the configured remote instances.", ioe);
}
- do {
+ for (PeerStatus peerStatus : peerSelector.getPeerStatuses(direction))
{
final List<EndpointConnection> addBack = new ArrayList<>();
- logger.debug("{} getting next peer status", this);
- final PeerStatus peerStatus =
peerSelector.getNextPeerStatus(direction);
logger.debug("{} next peer status = {}", this, peerStatus);
- if (peerStatus == null) {
- return null;
- }
final PeerDescription peerDescription =
peerStatus.getPeerDescription();
BlockingQueue<EndpointConnection> connectionQueue =
connectionQueueMap.get(peerDescription);
@@ -192,7 +188,7 @@ public class EndpointConnectionPool implements
PeerStatusProvider {
if (connection == null && !addBack.isEmpty()) {
// all available connections have been penalized.
logger.debug("{} all Connections for {} are penalized;
returning no Connection", this, portId);
- return null;
+ throw new NoValidPeerException("All peers are penalized");
}
if (connection != null &&
connection.getPeer().isPenalized(portId)) {
@@ -318,10 +314,13 @@ public class EndpointConnectionPool implements
PeerStatusProvider {
}
}
- } while (connection == null || codec == null || commsSession == null
|| protocol == null);
+ if( connection != null && codec != null && commsSession != null &&
protocol != null) {
+ activeConnections.add(connection);
+ return connection;
+ }
+ }
+ throw new NoValidPeerException("Didn't find any valid peer to connect
to");
- activeConnections.add(connection);
- return connection;
}
public boolean offer(final EndpointConnection endpointConnection) {
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index ba6839c..0999d57 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -23,6 +23,7 @@ import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,14 +126,13 @@ public class SocketClient extends
AbstractSiteToSiteClient {
}
final EndpointConnection connectionState =
pool.getEndpointConnection(direction, getConfig());
- if (connectionState == null) {
- return null;
- }
final Transaction transaction;
try {
transaction =
connectionState.getSocketClientProtocol().startTransaction(
connectionState.getPeer(), connectionState.getCodec(),
direction);
+ } catch (final NoContentException e) {
+ return null;
} catch (final Throwable t) {
pool.terminate(connectionState);
throw new IOException("Unable to create Transaction to communicate
with " + connectionState.getPeer(), t);
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java
new file mode 100644
index 0000000..a0dd23d
--- /dev/null
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+import java.io.IOException;
+
+/**
+ * A NoContentException occurs when the remote peer has no flowfiles to provide
+ */
+public class NoContentException extends IOException {
+
+ private static final long serialVersionUID = -689032011082690815L;
+
+ public NoContentException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public NoContentException(final String message) {
+ super(message);
+ }
+
+ public NoContentException(final Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java
new file mode 100644
index 0000000..30a51a0
--- /dev/null
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+import java.io.IOException;
+
+
+/**
+ * A NoValidPeerException occurs when all the remote peers are penalized or
none exists
+ */
+public class NoValidPeerException extends IOException {
+
+ private static final long serialVersionUID = 8421102798129193880L;
+
+ public NoValidPeerException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public NoValidPeerException(final String message) {
+ super(message);
+ }
+
+ public NoValidPeerException(final Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index e29f045..5b1eb1c 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -21,6 +21,7 @@ import org.apache.nifi.remote.AbstractTransaction;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.Response;
@@ -43,6 +44,9 @@ public class SocketClientTransaction extends
AbstractTransaction {
this.dos = new
DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
initialize();
+ if (direction == TransferDirection.RECEIVE && !this.dataAvailable){
+ throw new NoContentException("Remote side has no flowfiles to
provide");
+ }
}
private void initialize() throws IOException {
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 249325d..3270708 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -64,6 +64,7 @@ import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
@@ -147,6 +148,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private static final int RESPONSE_CODE_OK = 200;
private static final int RESPONSE_CODE_CREATED = 201;
private static final int RESPONSE_CODE_ACCEPTED = 202;
+ private static final int RESPONSE_CODE_NO_CONTENT = 204;
private static final int RESPONSE_CODE_BAD_REQUEST = 400;
private static final int RESPONSE_CODE_FORBIDDEN = 403;
private static final int RESPONSE_CODE_NOT_FOUND = 404;
@@ -171,7 +173,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private int batchCount = 0;
private long batchSize = 0;
private long batchDurationMillis = 0;
- private TransportProtocolVersionNegotiator
transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
+ private TransportProtocolVersionNegotiator
transportProtocolVersionNegotiator = new
TransportProtocolVersionNegotiator(2,1);
private String trustedPeerDn;
private final ScheduledExecutorService ttlExtendTaskExecutor;
@@ -498,6 +500,8 @@ public class SiteToSiteRestApiClient implements Closeable {
}
serverTransactionTtl =
Integer.parseInt(serverTransactionTtlHeader.getValue());
break;
+ case RESPONSE_CODE_NO_CONTENT:
+ throw new NoContentException("Server has no flowfiles to
provide");
default:
try (InputStream content = response.getEntity().getContent()) {
diff --git
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
index 6b6db3c..e29efd8 100644
---
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -36,8 +37,6 @@ import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.reducing;
import static java.util.stream.Collectors.toMap;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -212,10 +211,13 @@ public class TestPeerSelector {
throw new IOException("Connection refused. " +
peerFetchStatusesFrom + " is not running.");
}).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
+
+ ArrayList<PeerStatus> peers;
+
// 1st attempt. It uses the bootstrap node.
peerSelector.refreshPeers();
- PeerStatus peerStatus =
peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
- assertNotNull(peerStatus);
+ peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
+ assert(!peers.isEmpty());
// Proceed time so that peer selector refresh statuses.
peerStatuses.remove(bootstrapNodeStatus);
@@ -223,33 +225,34 @@ public class TestPeerSelector {
// 2nd attempt.
peerSelector.refreshPeers();
- peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
- assertNotNull(peerStatus);
- assertEquals("Node2 should be returned since node 2 is the only
available node.", node2, peerStatus.getPeerDescription());
+ peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
+ assert(!peers.isEmpty());
+ assertEquals("Node2 should be returned since node 2 is the only
available node.", node2, peers.get(0).getPeerDescription());
// Proceed time so that peer selector refresh statuses.
systemTime.offset += TimeUnit.MILLISECONDS.convert(1,
TimeUnit.MINUTES) + 1;
// 3rd attempt.
peerSelector.refreshPeers();
- peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
- assertNotNull(peerStatus);
- assertEquals("Node2 should be returned since node 2 is the only
available node.", node2, peerStatus.getPeerDescription());
+ peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
+ assert(!peers.isEmpty());
+ assertEquals("Node2 should be returned since node 2 is the only
available node.", node2, peers.get(0).getPeerDescription());
// Remove node2 to simulate that it goes down. There's no available
node at this point.
peerStatuses.remove(node2Status);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1,
TimeUnit.MINUTES) + 1;
peerSelector.refreshPeers();
- peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
- assertNull("PeerSelector should return null as next peer status, since
there's no available peer", peerStatus);
+ peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
+ assertTrue("PeerSelector should return an empty list as next peer
statuses, since there's no available peer", peers.isEmpty());
// Add node1 back. PeerSelector should be able to fetch peer statuses
because it always tries to fetch at least from the bootstrap node.
peerStatuses.add(bootstrapNodeStatus);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1,
TimeUnit.MINUTES) + 1;
peerSelector.refreshPeers();
- peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
- assertEquals("Node1 should be returned since node 1 is the only
available node.", bootstrapNode, peerStatus.getPeerDescription());
+ peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
+ assert(!peers.isEmpty());
+ assertEquals("Node1 should be returned since node 1 is the only
available node.", bootstrapNode, peers.get(0).getPeerDescription());
}
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
index ded1db1..706b4ca 100644
---
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
@@ -57,6 +57,7 @@ import org.apache.nifi.remote.client.KeystoreType;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket;
@@ -197,6 +198,21 @@ public class TestHttpClient {
}
+ public static class EmptyPortTransactionsServlet extends
PortTransactionsServlet {
+
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse
resp) throws ServletException, IOException {
+
+ final int reqProtocolVersion = getReqProtocolVersion(req);
+ if (reqProtocolVersion == 1) {
+ super.doPost(req, resp);
+ } else {
+ respondWithText(resp, "No flowfiles available", 204);
+ }
+ }
+
+ }
+
public static class PortTransactionsAccessDeniedServlet extends
HttpServlet {
@Override
@@ -497,6 +513,8 @@ public class TestHttpClient {
servletHandler.addServletWithMapping(OutputPortTransactionServlet.class,
"/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id");
servletHandler.addServletWithMapping(FlowFilesTimeoutAfterDataExchangeServlet.class,
"/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id/flow-files");
+
servletHandler.addServletWithMapping(EmptyPortTransactionsServlet.class,"/data-transfer/output-ports/empty-output-running-id/transactions");
+
server.start();
logger.info("Starting server on port {} for HTTP, and {} for HTTPS",
httpConnector.getLocalPort(), sslConnector.getLocalPort());
@@ -656,6 +674,13 @@ public class TestHttpClient {
runningOutputPort.setState(ScheduledState.RUNNING.name());
outputPorts.add(runningOutputPort);
+ final PortDTO emptyRunningOutputPort = new PortDTO();
+ emptyRunningOutputPort.setName("empty-output-running");
+ emptyRunningOutputPort.setId("empty-output-running-id");
+ emptyRunningOutputPort.setType("OUTPUT_PORT");
+ emptyRunningOutputPort.setState(ScheduledState.RUNNING.name());
+ outputPorts.add(emptyRunningOutputPort);
+
final PortDTO timeoutOutputPort = new PortDTO();
timeoutOutputPort.setName("output-timeout");
timeoutOutputPort.setId("output-timeout-id");
@@ -718,9 +743,10 @@ public class TestHttpClient {
.build()
) {
final Transaction transaction =
client.createTransaction(TransferDirection.SEND);
+ fail();
- assertNull(transaction);
-
+ } catch (final NoValidPeerException e) {
+ assertNotNull(e.getMessage());
}
}
@@ -737,9 +763,10 @@ public class TestHttpClient {
.build()
) {
final Transaction transaction =
client.createTransaction(TransferDirection.SEND);
+ fail();
- assertNull(transaction);
-
+ } catch (final NoValidPeerException e) {
+ assertNotNull(e.getMessage());
}
}
@@ -755,11 +782,11 @@ public class TestHttpClient {
.build()
) {
final Transaction transaction =
client.createTransaction(TransferDirection.SEND);
+ fail();
- assertNull(transaction);
-
+ } catch (final NoValidPeerException e) {
+ assertNotNull(e.getMessage());
}
-
}
@Test
@@ -854,7 +881,10 @@ public class TestHttpClient {
.build()
) {
final Transaction transaction =
client.createTransaction(TransferDirection.SEND);
- assertNull("createTransaction should fail at peer selection and
return null.", transaction);
+ fail();
+
+ } catch (final NoValidPeerException e) {
+ assertNotNull("createTransaction should fail at peer selection and
return null.", e.getMessage());
}
}
@@ -1224,6 +1254,23 @@ public class TestHttpClient {
}
}
+ @Test
+ public void testReceiveEmptyPort() throws Exception {
+
+ try (
+ SiteToSiteClient client = getDefaultBuilder()
+ .portName("empty-output-running")
+ .build()
+ ) {
+ try {
+ final Transaction transaction =
client.createTransaction(TransferDirection.RECEIVE);
+ assertNull(transaction);
+ } catch (IOException e) {
+ fail();
+ }
+ }
+ }
+
private void testReceive(SiteToSiteClient client) throws IOException {
final Transaction transaction =
client.createTransaction(TransferDirection.RECEIVE);
diff --git
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
index a327313..ec86dff 100644
---
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
+++
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
@@ -20,7 +20,6 @@ import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPack
import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveZeroFlowFile;
import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
@@ -43,6 +42,7 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.SocketChannelInput;
import org.apache.nifi.remote.io.socket.SocketChannelOutput;
@@ -51,6 +51,7 @@ import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.junit.Test;
+import static org.junit.Assert.fail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,14 +92,12 @@ public class TestSocketClientTransaction {
ByteArrayInputStream bis = new
ByteArrayInputStream(serverResponseBos.toByteArray());
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- SocketClientTransaction transaction = getClientTransaction(bis, bos,
TransferDirection.RECEIVE);
-
- execReceiveZeroFlowFile(transaction);
-
- // Verify what client has sent.
- DataInputStream sentByClient = new DataInputStream(new
ByteArrayInputStream(bos.toByteArray()));
- assertEquals(RequestType.RECEIVE_FLOWFILES,
RequestType.readRequestType(sentByClient));
- assertEquals(-1, sentByClient.read());
+ try {
+ SocketClientTransaction transaction = getClientTransaction(bis,
bos, TransferDirection.RECEIVE);
+ fail();
+ } catch (final NoContentException e) {
+ assertEquals("Remote side has no flowfiles to provide",
e.getMessage());
+ }
}
@Test
diff --git
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
index 83a7e42..278e6b6 100644
---
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
+++
b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
@@ -147,6 +147,13 @@ public class NiFiReceiver extends Receiver<NiFiDataPacket>
{
try {
while (!isStopped()) {
final Transaction transaction =
client.createTransaction(TransferDirection.RECEIVE);
+ if (transaction == null) {
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
DataPacket dataPacket = transaction.receive();
if (dataPacket == null) {
transaction.confirm();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index e51ea37..3b4d630 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -36,6 +36,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
@@ -232,9 +233,16 @@ public class StandardRemoteGroupPort extends
RemoteGroupPort {
}
final SiteToSiteClient client = getSiteToSiteClient();
- final Transaction transaction;
+ Transaction transaction = null;
try {
transaction = client.createTransaction(transferDirection);
+ } catch (final NoValidPeerException e) {
+ final String message = String.format("%s Unable to create
transaction to communicate with; all peers must be penalized, so yielding
context", this);
+ logger.debug(message);
+ session.rollback();
+ context.yield();
+ remoteGroup.getEventReporter().reportEvent(Severity.ERROR,
CATEGORY, message);
+ return;
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
@@ -270,11 +278,10 @@ public class StandardRemoteGroupPort extends
RemoteGroupPort {
remoteGroup.getEventReporter().reportEvent(Severity.ERROR,
CATEGORY, message);
return;
}
-
if (transaction == null) {
- logger.debug("{} Unable to create transaction to communicate with;
all peers must be penalized, so yielding context", this);
- session.rollback();
context.yield();
+ final String message = String.format("%s successfully connected to
%s, but it has no flowfiles to provide, yielding", this, url);
+ logger.debug(message);
return;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index c8787d3..f74d66e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -31,6 +31,7 @@ import org.apache.nifi.authorization.PublicPortAuthorizable;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.connectable.Connection;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
@@ -77,6 +78,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.List;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -90,6 +92,7 @@ import static
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERT
import static
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
+import static
org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION;
/**
* RESTful endpoint for managing a SiteToSite connection.
@@ -205,9 +208,21 @@ public class DataTransferResource extends
ApplicationResource {
final int transportProtocolVersion =
validationResult.transportProtocolVersion;
try {
- // Execute handshake.
- initiateServerProtocol(req, peer, transportProtocolVersion);
+ HttpFlowFileServerProtocol serverProtocol =
initiateServerProtocol(req, peer, transportProtocolVersion);
+
+ int protocolVersion =
Integer.parseUnsignedInt(req.getHeader(PROTOCOL_VERSION));
+ if ((protocolVersion >= 2) && PORT_TYPE_OUTPUT.equals(portType)) {
+ List<Connection> connectionList =
serverProtocol.getPort().getIncomingConnections();
+ if (connectionList.stream().allMatch(c ->
c.getFlowFileQueue().isEmpty())) {
+ // Transaction could be created, but there is nothing to
transfer. Just return 200.
+ logger.debug("Output port has no flowfiles to transfer,
returning 200");
+ transactionManager.cancelTransaction(transactionId);
+ return
noCache(Response.status(Response.Status.NO_CONTENT)).type(MediaType.TEXT_PLAIN).entity("No
flowfiles available").build();
+ }
+ }
+
+ // Execute handshake.
TransactionResultEntity entity = new TransactionResultEntity();
entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
entity.setMessage("Handshake properties are valid, and port is
running. A transaction is created:" + transactionId);
diff --git
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java
index cd82fe7..0543706 100644
---
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java
+++
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java
@@ -17,6 +17,7 @@
package org.apache.nifi.stateless.core;
import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.exception.NoValidPeerException;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
@@ -114,8 +115,8 @@ public class StatelessRemoteOutputPort extends
AbstractStatelessComponent {
try {
final Transaction transaction =
client.createTransaction(TransferDirection.RECEIVE);
if (transaction == null) {
- getLogger().error("Unable to create a transaction for Remote
Process Group {} to pull from port {}", new Object[]{url, name});
- return false;
+ getLogger().debug("No flowfiles to receive");
+ return true;
}
final Queue<StatelessFlowFile> destinationQueue = new
LinkedList<>();
@@ -139,6 +140,9 @@ public class StatelessRemoteOutputPort extends
AbstractStatelessComponent {
transaction.confirm();
transaction.complete();
+ } catch (final NoValidPeerException e) {
+ getLogger().error("Unable to create a transaction for Remote
Process Group {} to pull from port {}", new Object[]{url, name});
+ return false;
} catch (final Exception e) {
getLogger().error("Failed to receive FlowFile via site-to-site",
e);
return false;
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
index e57dbbd..00bdaf8 100644
---
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
+++
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
@@ -30,6 +30,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.KeystoreType;
import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.util.FormatUtils;
@@ -241,6 +242,8 @@ public class SiteToSiteCliMain {
} else {
new SiteToSiteReceiver(siteToSiteClient,
output).receiveFiles();
}
+ } catch (final NoContentException e) {
+ System.out.println("Remote port has no flowfiles");
}
} catch (Exception e) {
printUsage(e.getMessage(), options);
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
index 88ad8f3..82d19d2 100644
---
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
+++
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
@@ -22,6 +22,7 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.exception.NoContentException;
import org.apache.nifi.remote.protocol.DataPacket;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -45,6 +46,9 @@ public class SiteToSiteReceiver {
public TransactionCompletion receiveFiles() throws IOException {
Transaction transaction =
siteToSiteClient.createTransaction(TransferDirection.RECEIVE);
+ if (transaction == null) {
+ throw new NoContentException("Remote side has no flowfiles to
provide");
+ }
JsonGenerator jsonGenerator = new
JsonFactory().createJsonGenerator(output);
jsonGenerator.writeStartArray();
DataPacket dataPacket;