This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e659e3b NIFI-5952 Refactor RAW S2S from nio to socket
e659e3b is described below
commit e659e3b606cc3e41816081046f07d9a8d33c88f6
Author: Koji Kawamura <[email protected]>
AuthorDate: Tue Dec 25 11:35:52 2018 +0900
NIFI-5952 Refactor RAW S2S from nio to socket
---
.../client/socket/EndpointConnectionPool.java | 30 +-
...ssion.java => SocketCommunicationsSession.java} | 33 +-
.../{SocketChannelInput.java => SocketInput.java} | 34 +-
...{SocketChannelOutput.java => SocketOutput.java} | 30 +-
.../ssl/SSLSocketChannelCommunicationsSession.java | 114 ------
.../io/socket/ssl/SSLSocketChannelInput.java | 55 ---
.../io/socket/ssl/SSLSocketChannelOutput.java | 44 ---
.../protocol/socket/SocketClientTransaction.java | 2 +-
.../socket/TestSocketClientTransaction.java | 53 +--
.../nifi/remote/SocketRemoteSiteListener.java | 386 +++++++++++----------
.../nifi/remote/TestStandardRemoteGroupPort.java | 100 +++---
.../socket/TestSocketFlowFileServerProtocol.java | 12 +-
12 files changed, 362 insertions(+), 531 deletions(-)
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index e9f2536..17d66da 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -34,11 +34,10 @@ import
org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.security.util.CertificateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,9 +48,8 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
+import java.net.Socket;
import java.net.URI;
-import java.nio.channels.SocketChannel;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
@@ -450,27 +448,23 @@ public class EndpointConnectionPool implements
PeerStatusProvider {
+ " because it requires Secure Site-to-Site
communications, but this instance is not configured for secure communications");
}
- final SSLSocketChannel socketChannel = new
SSLSocketChannel(sslContext, hostname, port, localAddress, true);
- socketChannel.connect();
-
- commsSession = new
SSLSocketChannelCommunicationsSession(socketChannel);
+ final Socket socket =
sslContext.getSocketFactory().createSocket(hostname, port);
+ socket.setSoTimeout(commsTimeout);
+ commsSession = new SocketCommunicationsSession(socket);
try {
- commsSession.setUserDn(socketChannel.getDn());
+ final String dn =
CertificateUtils.extractPeerDNFromSSLSocket(socket);
+ commsSession.setUserDn(dn);
} catch (final CertificateException ex) {
throw new IOException(ex);
}
} else {
- final SocketChannel socketChannel = SocketChannel.open();
- if (localAddress != null) {
- final SocketAddress localSocketAddress = new
InetSocketAddress(localAddress, 0);
- socketChannel.socket().bind(localSocketAddress);
- }
- socketChannel.socket().connect(new InetSocketAddress(hostname,
port), commsTimeout);
- socketChannel.socket().setSoTimeout(commsTimeout);
+ final Socket socket = new Socket();
+ socket.connect(new InetSocketAddress(hostname, port),
commsTimeout);
+ socket.setSoTimeout(commsTimeout);
- commsSession = new
SocketChannelCommunicationsSession(socketChannel);
+ commsSession = new SocketCommunicationsSession(socket);
}
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketCommunicationsSession.java
similarity index 76%
rename from
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
rename to
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketCommunicationsSession.java
index e19cd3d..7b62f4b 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketCommunicationsSession.java
@@ -16,38 +16,39 @@
*/
package org.apache.nifi.remote.io.socket;
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-
import org.apache.nifi.remote.AbstractCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+
+import java.io.IOException;
+import java.net.Socket;
-public class SocketChannelCommunicationsSession extends
AbstractCommunicationsSession {
+public class SocketCommunicationsSession extends AbstractCommunicationsSession
{
- private final SocketChannel channel;
- private final SocketChannelInput request;
- private final SocketChannelOutput response;
+ private final Socket socket;
+ private final SocketInput request;
+ private final SocketOutput response;
private int timeout = 30000;
- public SocketChannelCommunicationsSession(final SocketChannel
socketChannel) throws IOException {
+ public SocketCommunicationsSession(final Socket socket) throws IOException
{
super();
- request = new SocketChannelInput(socketChannel);
- response = new SocketChannelOutput(socketChannel);
- channel = socketChannel;
- socketChannel.configureBlocking(false);
+ this.socket = socket;
+ request = new SocketInput(socket);
+ response = new SocketOutput(socket);
}
@Override
public boolean isClosed() {
- return !channel.isConnected();
+ return socket.isClosed();
}
@Override
- public SocketChannelInput getInput() {
+ public CommunicationsInput getInput() {
return request;
}
@Override
- public SocketChannelOutput getOutput() {
+ public CommunicationsOutput getOutput() {
return response;
}
@@ -74,7 +75,7 @@ public class SocketChannelCommunicationsSession extends
AbstractCommunicationsSe
}
try {
- channel.close();
+ socket.close();
} catch (final IOException ioe) {
if (suppressed != null) {
ioe.addSuppressed(suppressed);
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketInput.java
similarity index 70%
rename from
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
rename to
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketInput.java
index 5cf2a62..1048073 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketInput.java
@@ -19,21 +19,28 @@ package org.apache.nifi.remote.io.socket;
import org.apache.nifi.remote.io.InterruptableInputStream;
import org.apache.nifi.remote.protocol.CommunicationsInput;
import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.channels.SocketChannel;
+import java.net.Socket;
+import java.net.SocketException;
-public class SocketChannelInput implements CommunicationsInput {
+public class SocketInput implements CommunicationsInput {
- private final SocketChannelInputStream socketIn;
+ private static final Logger LOG =
LoggerFactory.getLogger(SocketInput.class);
+
+ private final Socket socket;
+ private final InputStream socketIn;
private final ByteCountingInputStream countingIn;
private final InputStream bufferedIn;
private final InterruptableInputStream interruptableIn;
- public SocketChannelInput(final SocketChannel socketChannel) throws
IOException {
- this.socketIn = new SocketChannelInputStream(socketChannel);
+ public SocketInput(final Socket socket) throws IOException {
+ this.socket = socket;
+ socketIn = socket.getInputStream();
countingIn = new ByteCountingInputStream(socketIn);
bufferedIn = new BufferedInputStream(countingIn);
interruptableIn = new InterruptableInputStream(bufferedIn);
@@ -45,7 +52,11 @@ public class SocketChannelInput implements
CommunicationsInput {
}
public void setTimeout(final int millis) {
- socketIn.setTimeout(millis);
+ try {
+ socket.setSoTimeout(millis);
+ } catch (SocketException e) {
+ LOG.warn("Failed to set socket timeout.", e);
+ }
}
public boolean isDataAvailable() {
@@ -63,11 +74,18 @@ public class SocketChannelInput implements
CommunicationsInput {
public void interrupt() {
interruptableIn.interrupt();
- socketIn.interrupt();
}
@Override
public void consume() throws IOException {
- socketIn.consume();
+ if (interruptableIn == null || !isDataAvailable()) {
+ return;
+ }
+
+ final byte[] b = new byte[4096];
+ int bytesRead;
+ do {
+ bytesRead = interruptableIn.read(b);
+ } while (bytesRead > 0);
}
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketOutput.java
similarity index 73%
rename from
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
rename to
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketOutput.java
index 859b88c..6676239 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketOutput.java
@@ -16,24 +16,30 @@
*/
package org.apache.nifi.remote.io.socket;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.SocketChannel;
import org.apache.nifi.remote.io.InterruptableOutputStream;
import org.apache.nifi.remote.protocol.CommunicationsOutput;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+
+public class SocketOutput implements CommunicationsOutput {
-public class SocketChannelOutput implements CommunicationsOutput {
+ private static final Logger LOG =
LoggerFactory.getLogger(SocketOutput.class);
- private final SocketChannelOutputStream socketOutStream;
+ private final Socket socket;
private final ByteCountingOutputStream countingOut;
private final OutputStream bufferedOut;
private final InterruptableOutputStream interruptableOut;
- public SocketChannelOutput(final SocketChannel socketChannel) throws
IOException {
- socketOutStream = new SocketChannelOutputStream(socketChannel);
- countingOut = new ByteCountingOutputStream(socketOutStream);
+ public SocketOutput(final Socket socket) throws IOException {
+ this.socket = socket;
+ countingOut = new ByteCountingOutputStream(socket.getOutputStream());
bufferedOut = new BufferedOutputStream(countingOut);
interruptableOut = new InterruptableOutputStream(bufferedOut);
}
@@ -44,7 +50,11 @@ public class SocketChannelOutput implements
CommunicationsOutput {
}
public void setTimeout(final int timeout) {
- socketOutStream.setTimeout(timeout);
+ try {
+ socket.setSoTimeout(timeout);
+ } catch (SocketException e) {
+ LOG.warn("Failed to set socket timeout.", e);
+ }
}
@Override
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
deleted file mode 100644
index 73f5a90..0000000
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-
-import org.apache.nifi.remote.AbstractCommunicationsSession;
-
-public class SSLSocketChannelCommunicationsSession extends
AbstractCommunicationsSession {
-
- private final SSLSocketChannel channel;
- private final SSLSocketChannelInput request;
- private final SSLSocketChannelOutput response;
-
- public SSLSocketChannelCommunicationsSession(final SSLSocketChannel
channel) {
- super();
- request = new SSLSocketChannelInput(channel);
- response = new SSLSocketChannelOutput(channel);
- this.channel = channel;
- }
-
- @Override
- public SSLSocketChannelInput getInput() {
- return request;
- }
-
- @Override
- public SSLSocketChannelOutput getOutput() {
- return response;
- }
-
- @Override
- public void setTimeout(final int millis) throws IOException {
- channel.setTimeout(millis);
- }
-
- @Override
- public int getTimeout() throws IOException {
- return channel.getTimeout();
- }
-
- @Override
- public void close() throws IOException {
- IOException suppressed = null;
-
- try {
- request.consume();
- } catch (final IOException ioe) {
- suppressed = ioe;
- }
-
- try {
- channel.close();
- } catch (final IOException ioe) {
- if (suppressed != null) {
- ioe.addSuppressed(suppressed);
- }
-
- throw ioe;
- }
-
- if (suppressed != null) {
- throw suppressed;
- }
- }
-
- @Override
- public boolean isClosed() {
- return channel.isClosed();
- }
-
- @Override
- public boolean isDataAvailable() {
- try {
- return request.isDataAvailable();
- } catch (final Exception e) {
- return false;
- }
- }
-
- @Override
- public long getBytesWritten() {
- return response.getBytesWritten();
- }
-
- @Override
- public long getBytesRead() {
- return request.getBytesRead();
- }
-
- @Override
- public void interrupt() {
- channel.interrupt();
- }
-
- @Override
- public String toString() {
- return super.toString() + "[SSLSocketChannel=" + channel + "]";
- }
-}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
deleted file mode 100644
index 842b7f2..0000000
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-
-public class SSLSocketChannelInput implements CommunicationsInput {
-
- private final SSLSocketChannelInputStream in;
- private final ByteCountingInputStream countingIn;
- private final InputStream bufferedIn;
-
- public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
- in = new SSLSocketChannelInputStream(socketChannel);
- countingIn = new ByteCountingInputStream(in);
- this.bufferedIn = new BufferedInputStream(countingIn);
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return bufferedIn;
- }
-
- public boolean isDataAvailable() throws IOException {
- return bufferedIn.available() > 0;
- }
-
- @Override
- public long getBytesRead() {
- return countingIn.getBytesRead();
- }
-
- @Override
- public void consume() throws IOException {
- in.consume();
- }
-}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
deleted file mode 100644
index d413d0b..0000000
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.nifi.remote.protocol.CommunicationsOutput;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-
-public class SSLSocketChannelOutput implements CommunicationsOutput {
-
- private final OutputStream out;
- private final ByteCountingOutputStream countingOut;
-
- public SSLSocketChannelOutput(final SSLSocketChannel channel) {
- countingOut = new ByteCountingOutputStream(new
SSLSocketChannelOutputStream(channel));
- out = new BufferedOutputStream(countingOut);
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return out;
- }
-
- @Override
- public long getBytesWritten() {
- return countingOut.getBytesWritten();
- }
-}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 5b1eb1c..85d6c1a 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -63,7 +63,7 @@ public class SocketClientTransaction extends
AbstractTransaction {
this.dataAvailable = true;
break;
case NO_MORE_DATA:
- logger.debug("{} No data available from {}", peer);
+ logger.debug("{} No data available from {}", this,
peer);
this.dataAvailable = false;
return;
default:
diff --git
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
index ec86dff..048d612 100644
---
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
+++
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
@@ -16,25 +16,6 @@
*/
package org.apache.nifi.remote.protocol.socket;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPacket;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithInvalidChecksum;
-import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
-import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
@@ -43,18 +24,38 @@ import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.NoContentException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.SocketChannelInput;
-import org.apache.nifi.remote.io.socket.SocketChannelOutput;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketInput;
+import org.apache.nifi.remote.io.socket.SocketOutput;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.junit.Test;
-import static org.junit.Assert.fail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPacket;
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithInvalidChecksum;
+import static
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TestSocketClientTransaction {
private Logger logger =
LoggerFactory.getLogger(TestSocketClientTransaction.class);
@@ -63,9 +64,9 @@ public class TestSocketClientTransaction {
private SocketClientTransaction getClientTransaction(ByteArrayInputStream
bis, ByteArrayOutputStream bos, TransferDirection direction) throws IOException
{
PeerDescription description = null;
String peerUrl = "";
- SocketChannelCommunicationsSession commsSession =
mock(SocketChannelCommunicationsSession.class);
- SocketChannelInput socketIn = mock(SocketChannelInput.class);
- SocketChannelOutput socketOut = mock(SocketChannelOutput.class);
+ SocketCommunicationsSession commsSession =
mock(SocketCommunicationsSession.class);
+ SocketInput socketIn = mock(SocketInput.class);
+ SocketOutput socketOut = mock(SocketOutput.class);
when(commsSession.getInput()).thenReturn(socketIn);
when(commsSession.getOutput()).thenReturn(socketOut);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 6fa86ff..830d043 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -24,17 +24,17 @@ import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.exception.RequestExpiredException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -42,12 +42,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -90,9 +87,6 @@ public class SocketRemoteSiteListener implements
RemoteSiteListener {
final boolean secure = (sslContext != null);
final List<Thread> threads = new ArrayList<>();
- final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(true);
- serverSocketChannel.bind(new InetSocketAddress(socketPort));
stopped.set(false);
final Thread listenerThread = new Thread(new Runnable() {
@@ -100,220 +94,248 @@ public class SocketRemoteSiteListener implements
RemoteSiteListener {
@Override
public void run() {
- while (!stopped.get()) {
- LOG.trace("Accepting Connection...");
- Socket acceptedSocket = null;
- try {
- serverSocketChannel.configureBlocking(false);
- final ServerSocket serverSocket =
serverSocketChannel.socket();
- serverSocket.setSoTimeout(2000);
- while (!stopped.get() && acceptedSocket == null) {
- try {
- acceptedSocket = serverSocket.accept();
- } catch (final SocketTimeoutException ste) {
- continue;
- }
+
+ try (final ServerSocket serverSocket = createServerSocket()) {
+ serverSocket.setSoTimeout(2000);
+
+ while (!stopped.get()) {
+
+ final Socket acceptedSocket =
acceptConnection(serverSocket);
+
+ if (acceptedSocket == null) {
+ continue;
}
- } catch (final IOException e) {
- LOG.error("RemoteSiteListener Unable to accept
connection due to {}", e.toString());
- if (LOG.isDebugEnabled()) {
- LOG.error("", e);
+
+ if (stopped.get()) {
+ break;
}
- continue;
+
+ final Thread thread =
createWorkerThread(acceptedSocket);
+ thread.setName("Site-to-Site Worker Thread-" +
(threadCount++));
+ LOG.debug("Handing connection to {}", thread);
+ thread.start();
+ threads.add(thread);
+ threads.removeIf(t -> !t.isAlive());
+
}
- LOG.trace("Got connection");
- if (stopped.get()) {
- break;
+ } catch (final IOException e) {
+ LOG.error("Unable to open server socket due to {}",
e.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", e);
}
+ }
- final Socket socket = acceptedSocket;
- final SocketChannel socketChannel = socket.getChannel();
- final Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- LOG.debug("{} Determining URL of connection",
this);
- final InetAddress inetAddress =
socket.getInetAddress();
- String clientHostName = inetAddress.getHostName();
- final int slashIndex = clientHostName.indexOf("/");
- if (slashIndex == 0) {
- clientHostName = clientHostName.substring(1);
- } else if (slashIndex > 0) {
- clientHostName = clientHostName.substring(0,
slashIndex);
- }
- final int clientPort = socket.getPort();
- final String peerUri = "nifi://" + clientHostName
+ ":" + clientPort;
- LOG.debug("{} Connection URL is {}", this,
peerUri);
+ for(Thread thread : threads) {
+ if(thread != null) {
+ thread.interrupt();
+ }
+ }
+ }
- final CommunicationsSession commsSession;
- final String dn;
- try {
- if (secure) {
- final SSLSocketChannel sslSocketChannel =
new SSLSocketChannel(sslContext, socketChannel, false);
- LOG.trace("Channel is secure;
connecting...");
- sslSocketChannel.connect();
- LOG.trace("Channel connected");
-
- commsSession = new
SSLSocketChannelCommunicationsSession(sslSocketChannel);
- dn = sslSocketChannel.getDn();
- commsSession.setUserDn(dn);
- } else {
- LOG.trace("{} Channel is not secure",
this);
- commsSession = new
SocketChannelCommunicationsSession(socketChannel);
- dn = null;
- }
- } catch (final Exception e) {
- LOG.error("RemoteSiteListener Unable to accept
connection from {} due to {}", socket, e.toString());
- if (LOG.isDebugEnabled()) {
- LOG.error("", e);
- }
- try {
- socketChannel.close();
- } catch (IOException swallow) {
- }
- return;
- }
+ private Thread createWorkerThread(Socket socket) {
+ return new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("{} Determining URL of connection", this);
+ final InetAddress inetAddress =
socket.getInetAddress();
+ String clientHostName = inetAddress.getHostName();
+ final int slashIndex = clientHostName.indexOf("/");
+ if (slashIndex == 0) {
+ clientHostName = clientHostName.substring(1);
+ } else if (slashIndex > 0) {
+ clientHostName = clientHostName.substring(0,
slashIndex);
+ }
- LOG.info("Received connection from {}, User DN:
{}", socket.getInetAddress(), dn);
+ final int clientPort = socket.getPort();
+ final String peerUri = "nifi://" + clientHostName +
":" + clientPort;
+ LOG.debug("{} Connection URL is {}", this, peerUri);
- final InputStream socketIn;
- final OutputStream socketOut;
+ final CommunicationsSession commsSession;
+ final String dn;
+ try {
+ if (secure) {
+ LOG.trace("{} Connection is secure", this);
+ dn =
CertificateUtils.extractPeerDNFromSSLSocket(socket);
- try {
- socketIn =
commsSession.getInput().getInputStream();
- socketOut =
commsSession.getOutput().getOutputStream();
- } catch (final IOException e) {
- LOG.error("Connection dropped from {} before
any data was transmitted", peerUri);
- try {
- commsSession.close();
- } catch (final IOException ioe) {
- }
+ commsSession = new
SocketCommunicationsSession(socket);
+ commsSession.setUserDn(dn);
- return;
+ } else {
+ LOG.trace("{} Connection is not secure", this);
+ commsSession = new
SocketCommunicationsSession(socket);
+ dn = null;
+ }
+ } catch (final Exception e) {
+ LOG.error("RemoteSiteListener Unable to accept
connection from {} due to {}", socket, e.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", e);
}
+ return;
+ }
- final DataInputStream dis = new
DataInputStream(socketIn);
- final DataOutputStream dos = new
DataOutputStream(socketOut);
+ LOG.info("Received connection from {}, User DN: {}",
socket.getInetAddress(), dn);
- ServerProtocol protocol = null;
- Peer peer = null;
+ final InputStream socketIn;
+ final OutputStream socketOut;
+
+ try {
+ socketIn =
commsSession.getInput().getInputStream();
+ socketOut =
commsSession.getOutput().getOutputStream();
+ } catch (final IOException e) {
+ LOG.error("Connection dropped from {} before any
data was transmitted", peerUri);
try {
- // ensure that we are communicating with
another NiFi
- LOG.debug("Verifying magic bytes...");
- verifyMagicBytes(dis, peerUri);
-
- LOG.debug("Receiving Server Protocol
Negotiation");
- protocol =
RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
- protocol.setRootProcessGroup(rootGroup.get());
- protocol.setNodeInformant(nodeInformant);
- if (protocol instanceof
PeerDescriptionModifiable) {
-
((PeerDescriptionModifiable)protocol).setPeerDescriptionModifier(peerDescriptionModifier);
- }
+ commsSession.close();
+ } catch (final IOException ioe) {
+ }
- final PeerDescription description = new
PeerDescription(clientHostName, clientPort, sslContext != null);
- peer = new Peer(description, commsSession,
peerUri, "nifi://localhost:" + getPort());
- LOG.debug("Handshaking....");
- protocol.handshake(peer);
-
- if (!protocol.isHandshakeSuccessful()) {
- LOG.error("Handshake failed with {};
closing connection", peer);
- try {
- peer.close();
- } catch (final IOException e) {
- LOG.warn("Failed to close {} due to
{}", peer, e);
- }
+ return;
+ }
+
+ final DataInputStream dis = new
DataInputStream(socketIn);
+ final DataOutputStream dos = new
DataOutputStream(socketOut);
+
+ ServerProtocol protocol = null;
+ Peer peer = null;
+ try {
+ // ensure that we are communicating with another
NiFi
+ LOG.debug("Verifying magic bytes...");
+ verifyMagicBytes(dis, peerUri);
+
+ LOG.debug("Receiving Server Protocol Negotiation");
+ protocol =
RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
+ protocol.setRootProcessGroup(rootGroup.get());
+ protocol.setNodeInformant(nodeInformant);
+ if (protocol instanceof PeerDescriptionModifiable)
{
+ ((PeerDescriptionModifiable)
protocol).setPeerDescriptionModifier(peerDescriptionModifier);
+ }
- // no need to shutdown protocol because we
failed to perform handshake
- return;
+ final PeerDescription description = new
PeerDescription(clientHostName, clientPort, sslContext != null);
+ peer = new Peer(description, commsSession,
peerUri, "nifi://localhost:" + getPort());
+ LOG.debug("Handshaking....");
+ protocol.handshake(peer);
+
+ if (!protocol.isHandshakeSuccessful()) {
+ LOG.error("Handshake failed with {}; closing
connection", peer);
+ try {
+ peer.close();
+ } catch (final IOException e) {
+ LOG.warn("Failed to close {} due to {}",
peer, e);
}
- commsSession.setTimeout((int)
protocol.getRequestExpiration());
+ // no need to shutdown protocol because we
failed to perform handshake
+ return;
+ }
- LOG.info("Successfully negotiated
ServerProtocol {} Version {} with {}", protocol.getResourceName(),
protocol.getVersionNegotiator().getVersion(), peer);
+ commsSession.setTimeout((int)
protocol.getRequestExpiration());
- try {
- while (!protocol.isShutdown()) {
- LOG.trace("Getting Protocol Request
Type...");
-
- int timeoutCount = 0;
- RequestType requestType = null;
-
- while (requestType == null) {
- try {
- requestType =
protocol.getRequestType(peer);
- } catch (final
SocketTimeoutException e) {
- // Give the timeout a bit
longer (twice as long) to receive the Request Type,
- // in order to attempt to
receive more data without shutting down the socket if we don't
- // have to.
- LOG.debug("{} Timed out
waiting to receive RequestType using {} with {}", this, protocol, peer);
- timeoutCount++;
- requestType = null;
-
- if (timeoutCount >= 2) {
- throw e;
- }
+ LOG.info("Successfully negotiated ServerProtocol
{} Version {} with {}",
+ protocol.getResourceName(),
protocol.getVersionNegotiator().getVersion(), peer);
+
+ try {
+ while (!protocol.isShutdown()) {
+ LOG.trace("Getting Protocol Request
Type...");
+
+ int timeoutCount = 0;
+ RequestType requestType = null;
+
+ while (requestType == null) {
+ try {
+ requestType =
protocol.getRequestType(peer);
+ } catch (final SocketTimeoutException
e) {
+ // Give the timeout a bit longer
(twice as long) to receive the Request Type,
+ // in order to attempt to receive
more data without shutting down the socket if we don't
+ // have to.
+ LOG.debug("{} Timed out waiting to
receive RequestType using {} with {}", this, protocol, peer);
+ timeoutCount++;
+ requestType = null;
+
+ if (timeoutCount >= 2) {
+ throw e;
}
}
-
- handleRequest(protocol, peer,
requestType);
- }
- LOG.debug("Finished communicating with {}
({})", peer, protocol);
- } catch (final Exception e) {
- LOG.error("Unable to communicate with
remote instance {} ({}) due to {}; closing connection", peer, protocol,
e.toString());
- if (LOG.isDebugEnabled()) {
- LOG.error("", e);
}
+
+ handleRequest(protocol, peer, requestType);
}
- } catch (final IOException e) {
- LOG.error("Unable to communicate with remote
instance {} due to {}; closing connection", peer, e.toString());
+ LOG.debug("Finished communicating with {}
({})", peer, protocol);
+ } catch (final Exception e) {
+ LOG.error("Unable to communicate with remote
instance {} ({}) due to {}; closing connection", peer, protocol, e.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
- } catch (final Throwable t) {
- LOG.error("Handshake failed when communicating
with {}; closing connection. Reason for failure: {}", peerUri, t.toString());
- if (LOG.isDebugEnabled()) {
- LOG.error("", t);
- }
- } finally {
- LOG.trace("Cleaning up");
- try {
- if (protocol != null && peer != null) {
- protocol.shutdown(peer);
- }
- } catch (final Exception protocolException) {
- LOG.warn("Failed to shutdown protocol due
to {}", protocolException.toString());
+ }
+ } catch (final IOException e) {
+ LOG.error("Unable to communicate with remote
instance {} due to {}; closing connection", peer, e.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", e);
+ }
+ } catch (final Throwable t) {
+ LOG.error("Handshake failed when communicating
with {}; closing connection. Reason for failure: {}", peerUri, t.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", t);
+ }
+ } finally {
+ LOG.trace("Cleaning up");
+ try {
+ if (protocol != null && peer != null) {
+ protocol.shutdown(peer);
}
+ } catch (final Exception protocolException) {
+ LOG.warn("Failed to shutdown protocol due to
{}", protocolException.toString());
+ }
- try {
- if (peer != null) {
- peer.close();
- }
- } catch (final Exception peerException) {
- LOG.warn("Failed to close peer due to {};
some resources may not be appropriately cleaned up", peerException.toString());
+ try {
+ if (peer != null) {
+ peer.close();
}
- LOG.trace("Finished cleaning up");
+ } catch (final Exception peerException) {
+ LOG.warn("Failed to close peer due to {}; some
resources may not be appropriately cleaned up", peerException.toString());
}
+ LOG.trace("Finished cleaning up");
}
- });
- thread.setName("Site-to-Site Worker Thread-" +
(threadCount++));
- LOG.debug("Handing connection to {}", thread);
- thread.start();
- threads.add(thread);
- threads.removeIf(t -> !t.isAlive());
- }
-
- for(Thread thread : threads) {
- if(thread != null) {
- thread.interrupt();
}
- }
+ });
}
});
+
listenerThread.setName("Site-to-Site Listener");
listenerThread.start();
}
+ private ServerSocket createServerSocket() throws IOException {
+ if (sslContext != null) {
+ final ServerSocket serverSocket =
sslContext.getServerSocketFactory().createServerSocket(socketPort);
+ ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
+ return serverSocket;
+ } else {
+ return new ServerSocket(socketPort);
+ }
+ }
+
+ private Socket acceptConnection(ServerSocket serverSocket) {
+ LOG.trace("Accepting Connection...");
+ Socket acceptedSocket = null;
+ try {
+ while (!stopped.get() && acceptedSocket == null) {
+ try {
+ acceptedSocket = serverSocket.accept();
+ } catch (final SocketTimeoutException ste) {
+ LOG.trace("SocketTimeoutException occurred. {}",
ste.getMessage());
+ }
+ }
+ } catch (final IOException e) {
+ LOG.error("RemoteSiteListener Unable to accept connection due to
{}", e.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", e);
+ }
+ return acceptedSocket;
+ }
+ LOG.trace("Got connection");
+ return acceptedSocket;
+ }
+
private void handleRequest(final ServerProtocol protocol, final Peer peer,
final RequestType requestType)
throws IOException, NotAuthorizedException, BadRequestException,
RequestExpiredException {
LOG.debug("Request type from {} is {}", protocol, requestType);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 0831f51..d7b423d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -31,7 +31,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@@ -44,7 +44,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
-import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -61,6 +60,7 @@ import org.apache.nifi.util.NiFiProperties;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -154,28 +154,27 @@ public class TestStandardRemoteGroupPort {
final String peerUrl = "nifi://node1.example.com:9090";
final PeerDescription peerDescription = new
PeerDescription("node1.example.com", 9090, true);
- try (final SocketChannel socketChannel = SocketChannel.open()) {
- final CommunicationsSession commsSession = new
SocketChannelCommunicationsSession(socketChannel);
- commsSession.setUserDn("nifi.node1.example.com");
- final Peer peer = new Peer(peerDescription, commsSession, peerUrl,
REMOTE_CLUSTER_URL);
+ final CommunicationsSession commsSession =
mock(SocketCommunicationsSession.class);
+ when(commsSession.createTransitUri(anyString(),
anyString())).thenReturn("nifi://node1.example.com:9090/flowfile-uuid");
+ when(commsSession.getUserDn()).thenReturn("nifi.node1.example.com");
+ final Peer peer = new Peer(peerDescription, commsSession, peerUrl,
REMOTE_CLUSTER_URL);
- doReturn(peer).when(transaction).getCommunicant();
+ doReturn(peer).when(transaction).getCommunicant();
- final MockFlowFile flowFile =
processSession.createFlowFile("0123456789".getBytes());
- sessionState.getFlowFileQueue().offer(flowFile);
+ final MockFlowFile flowFile =
processSession.createFlowFile("0123456789".getBytes());
+ sessionState.getFlowFileQueue().offer(flowFile);
- port.onTrigger(processContext, processSession);
+ port.onTrigger(processContext, processSession);
- // Assert provenance.
- final List<ProvenanceEventRecord> provenanceEvents =
sessionState.getProvenanceEvents();
- assertEquals(1, provenanceEvents.size());
- final ProvenanceEventRecord provenanceEvent =
provenanceEvents.get(0);
- assertEquals(ProvenanceEventType.SEND,
provenanceEvent.getEventType());
- assertEquals(peerUrl + "/" +
flowFile.getAttribute(CoreAttributes.UUID.key()),
provenanceEvent.getTransitUri());
- assertEquals("Remote DN=nifi.node1.example.com",
provenanceEvent.getDetails());
- assertEquals("remote-group-port-id",
provenanceEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key()));
+ // Assert provenance.
+ final List<ProvenanceEventRecord> provenanceEvents =
sessionState.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
+ assertEquals("nifi://node1.example.com:9090/flowfile-uuid",
provenanceEvent.getTransitUri());
+ assertEquals("Remote DN=nifi.node1.example.com",
provenanceEvent.getDetails());
+ assertEquals("remote-group-port-id",
provenanceEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key()));
- }
}
@Test
@@ -186,43 +185,42 @@ public class TestStandardRemoteGroupPort {
final String peerUrl = "nifi://node1.example.com:9090";
final PeerDescription peerDescription = new
PeerDescription("node1.example.com", 9090, true);
- try (final SocketChannel socketChannel = SocketChannel.open()) {
- final CommunicationsSession commsSession = new
SocketChannelCommunicationsSession(socketChannel);
- commsSession.setUserDn("nifi.node1.example.com");
- final Peer peer = new Peer(peerDescription, commsSession, peerUrl,
REMOTE_CLUSTER_URL);
+ final CommunicationsSession commsSession =
mock(SocketCommunicationsSession.class);
+ when(commsSession.createTransitUri(anyString(),
anyString())).thenReturn("nifi://node1.example.com:9090/flowfile-uuid");
+ when(commsSession.getUserDn()).thenReturn("nifi.node1.example.com");
+ final Peer peer = new Peer(peerDescription, commsSession, peerUrl,
REMOTE_CLUSTER_URL);
- doReturn(peer).when(transaction).getCommunicant();
+ doReturn(peer).when(transaction).getCommunicant();
+
+ final String sourceFlowFileUuid = "flowfile-uuid";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid);
+ final byte[] dataPacketContents = "DataPacket Contents".getBytes();
+ final ByteArrayInputStream dataPacketInputStream = new
ByteArrayInputStream(dataPacketContents);
+ final DataPacket dataPacket = new StandardDataPacket(attributes,
+ dataPacketInputStream, dataPacketContents.length);
- final String sourceFlowFileUuid = "flowfile-uuid";
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid);
- final byte[] dataPacketContents = "DataPacket Contents".getBytes();
- final ByteArrayInputStream dataPacketInputStream = new
ByteArrayInputStream(dataPacketContents);
- final DataPacket dataPacket = new StandardDataPacket(attributes,
- dataPacketInputStream, dataPacketContents.length);
+ // Return null when it gets called second time.
+ doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
- // Return null when it gets called second time.
-
doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
+ port.onTrigger(processContext, processSession);
- port.onTrigger(processContext, processSession);
+ // Assert provenance.
+ final List<ProvenanceEventRecord> provenanceEvents =
sessionState.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent.getEventType());
+ assertEquals("nifi://node1.example.com:9090/flowfile-uuid",
provenanceEvent.getTransitUri());
+ assertEquals("Remote DN=nifi.node1.example.com",
provenanceEvent.getDetails());
- // Assert provenance.
- final List<ProvenanceEventRecord> provenanceEvents =
sessionState.getProvenanceEvents();
- assertEquals(1, provenanceEvents.size());
- final ProvenanceEventRecord provenanceEvent =
provenanceEvents.get(0);
- assertEquals(ProvenanceEventType.RECEIVE,
provenanceEvent.getEventType());
- assertEquals(peerUrl + "/" + sourceFlowFileUuid,
provenanceEvent.getTransitUri());
- assertEquals("Remote DN=nifi.node1.example.com",
provenanceEvent.getDetails());
-
- // Assert received flow files.
-
processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
- final List<MockFlowFile> flowFiles =
processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
- assertEquals(1, flowFiles.size());
- final MockFlowFile flowFile = flowFiles.get(0);
-
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(),
peer.getHost());
-
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(),
peer.getHost() + ":" + peer.getPort());
-
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_PORT_ID.key(),
"remote-group-port-id");
- }
+ // Assert received flow files.
+ processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
+ final List<MockFlowFile> flowFiles =
processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
+ assertEquals(1, flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.get(0);
+ flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(),
peer.getHost());
+ flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(),
peer.getHost() + ":" + peer.getPort());
+ flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_PORT_ID.key(),
"remote-group-port-id");
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
index 77c39bc..dbc9c91 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
@@ -21,9 +21,9 @@ import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.SocketChannelInput;
-import org.apache.nifi.remote.io.socket.SocketChannelOutput;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketInput;
+import org.apache.nifi.remote.io.socket.SocketOutput;
import org.apache.nifi.remote.protocol.HandshakeProperties;
import org.apache.nifi.remote.protocol.HandshakeProperty;
import org.apache.nifi.remote.protocol.Response;
@@ -75,9 +75,9 @@ public class TestSocketFlowFileServerProtocol {
final InputStream inputStream = new ByteArrayInputStream(inputBytes);
- final SocketChannelCommunicationsSession commsSession =
mock(SocketChannelCommunicationsSession.class);
- final SocketChannelInput channelInput = mock(SocketChannelInput.class);
- final SocketChannelOutput channelOutput =
mock(SocketChannelOutput.class);
+ final SocketCommunicationsSession commsSession =
mock(SocketCommunicationsSession.class);
+ final SocketInput channelInput = mock(SocketInput.class);
+ final SocketOutput channelOutput = mock(SocketOutput.class);
when(commsSession.getInput()).thenReturn(channelInput);
when(commsSession.getOutput()).thenReturn(channelOutput);