NIFI-2028 This closes #714. This closes #751. fixed typo in classname
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e23b2356 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e23b2356 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e23b2356 Branch: refs/heads/master Commit: e23b2356172e128086585fe2c425523c3628d0e7 Parents: aae2d27 Author: joewitt <[email protected]> Authored: Tue Aug 2 09:07:16 2016 -0400 Committer: joewitt <[email protected]> Committed: Tue Aug 2 09:13:41 2016 -0400 ---------------------------------------------------------------------- .../nifi/remote/HttpRemoteSiteListener.java | 18 ++-- .../AbstractFlowFileServerProtocol.java | 6 +- .../remote/protocol/HandshakeProperties.java | 96 ++++++++++++++++++++ .../remote/protocol/HandshakenProperties.java | 96 -------------------- .../StandardHttpFlowFileServerProtocol.java | 8 +- .../socket/SocketFlowFileServerProtocol.java | 6 +- .../nifi/remote/TestHttpRemoteSiteListener.java | 4 +- 7 files changed, 117 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e23b2356/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java index e08a3ac..b335f48 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java @@ -18,7 +18,7 @@ package org.apache.nifi.remote; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.remote.protocol.FlowFileTransaction; -import org.apache.nifi.remote.protocol.HandshakenProperties; +import org.apache.nifi.remote.protocol.HandshakeProperties; import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; @@ -91,10 +91,10 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { private class TransactionWrapper { private final FlowFileTransaction transaction; - private final HandshakenProperties handshakenProperties; + private final HandshakeProperties handshakenProperties; private long lastCommunicationAt; - private TransactionWrapper(final FlowFileTransaction transaction, final HandshakenProperties handshakenProperties) { + private TransactionWrapper(final FlowFileTransaction transaction, final HandshakeProperties handshakenProperties) { this.transaction = transaction; this.handshakenProperties = handshakenProperties; this.lastCommunicationAt = System.currentTimeMillis(); @@ -191,12 +191,12 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { /** * @param transactionId transactionId to check - * @return Returns a HandshakenProperties instance which is created when this transaction is started, - * only if the transaction is active, - * and it holds a HandshakenProperties, - * otherwise return null + * @return Returns a HandshakeProperties instance which is created when this transaction is started, + only if the transaction is active, + and it holds a HandshakeProperties, + otherwise return null */ - public HandshakenProperties getHandshakenProperties(final String transactionId) { + public HandshakeProperties getHandshakenProperties(final String transactionId) { TransactionWrapper transaction = transactions.get(transactionId); if (isTransactionActive(transaction)) { return transaction.handshakenProperties; @@ -205,7 +205,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { } public void holdTransaction(final String transactionId, final FlowFileTransaction transaction, - final HandshakenProperties handshakenProperties) throws IllegalStateException { + final HandshakeProperties handshakenProperties) throws IllegalStateException { // We don't check expiration of the transaction here, to support large file transport or slow network. // The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource. TransactionWrapper currentTransaction = transactions.remove(transactionId); http://git-wip-us.apache.org/repos/asf/nifi/blob/e23b2356/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java index 8e7d2c5..8600368 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java @@ -63,7 +63,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { protected boolean shutdown = false; protected FlowFileCodec negotiatedFlowFileCodec = null; - protected HandshakenProperties handshakenProperties; + protected HandshakeProperties handshakenProperties; protected static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); @@ -82,7 +82,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { return handshakeCompleted; } - protected void validateHandshakeRequest(HandshakenProperties confirmed, final Peer peer, final Map<String, String> properties) throws HandshakeException { + protected void validateHandshakeRequest(HandshakeProperties confirmed, final Peer peer, final Map<String, String> properties) throws HandshakeException { Boolean useGzip = null; for (final Map.Entry<String, String> entry : properties.entrySet()) { final String propertyName = entry.getKey(); @@ -201,7 +201,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { handshakeCompleted = true; } - abstract protected HandshakenProperties doHandshake(final Peer peer) throws IOException, HandshakeException; + abstract protected HandshakeProperties doHandshake(final Peer peer) throws IOException, HandshakeException; @Override public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { http://git-wip-us.apache.org/repos/asf/nifi/blob/e23b2356/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperties.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperties.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperties.java new file mode 100644 index 0000000..c4538da --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperties.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import org.apache.nifi.remote.exception.HandshakeException; + +public class HandshakeProperties { + + private String commsIdentifier; + private String transitUriPrefix = null; + private boolean useGzip; + private long expirationMillis; + private int batchCount = 0; + private long batchBytes = 0L; + private long batchDurationNanos = 0L; + + + public String getCommsIdentifier() { + return commsIdentifier; + } + + public void setCommsIdentifier(String commsIdentifier) { + this.commsIdentifier = commsIdentifier; + } + + public String getTransitUriPrefix() { + return transitUriPrefix; + } + + public void setTransitUriPrefix(String transitUriPrefix) { + this.transitUriPrefix = transitUriPrefix; + } + + public boolean isUseGzip() { + return useGzip; + } + + public void setUseGzip(Boolean useGzip) { + this.useGzip = useGzip; + } + + public long getExpirationMillis() { + return expirationMillis; + } + + public void setExpirationMillis(long expirationMillis) { + this.expirationMillis = expirationMillis; + } + + public int getBatchCount() { + return batchCount; + } + + public void setBatchCount(int batchCount) throws HandshakeException { + if (batchCount < 0) { + throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + batchCount); + } + this.batchCount = batchCount; + } + + public long getBatchBytes() { + return batchBytes; + } + + public void setBatchBytes(long batchBytes) throws HandshakeException { + if (batchBytes < 0) { + throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + batchBytes); + } + this.batchBytes = batchBytes; + } + + public long getBatchDurationNanos() { + return batchDurationNanos; + } + + public void setBatchDurationNanos(long batchDurationNanos) throws HandshakeException { + if (batchDurationNanos < 0) { + throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + batchDurationNanos); + } + this.batchDurationNanos = batchDurationNanos; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e23b2356/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java deleted file mode 100644 index 816689b..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import org.apache.nifi.remote.exception.HandshakeException; - -public class HandshakenProperties { - - private String commsIdentifier; - private String transitUriPrefix = null; - private boolean useGzip; - private long expirationMillis; - private int batchCount = 0; - private long batchBytes = 0L; - private long batchDurationNanos = 0L; - - - public String getCommsIdentifier() { - return commsIdentifier; - } - - public void setCommsIdentifier(String commsIdentifier) { - this.commsIdentifier = commsIdentifier; - } - - public String getTransitUriPrefix() { - return transitUriPrefix; - } - - public void setTransitUriPrefix(String transitUriPrefix) { - this.transitUriPrefix = transitUriPrefix; - } - - public boolean isUseGzip() { - return useGzip; - } - - public void setUseGzip(Boolean useGzip) { - this.useGzip = useGzip; - } - - public long getExpirationMillis() { - return expirationMillis; - } - - public void setExpirationMillis(long expirationMillis) { - this.expirationMillis = expirationMillis; - } - - public int getBatchCount() { - return batchCount; - } - - public void setBatchCount(int batchCount) throws HandshakeException { - if (batchCount < 0) { - throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + batchCount); - } - this.batchCount = batchCount; - } - - public long getBatchBytes() { - return batchBytes; - } - - public void setBatchBytes(long batchBytes) throws HandshakeException { - if (batchBytes < 0) { - throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + batchBytes); - } - this.batchBytes = batchBytes; - } - - public long getBatchDurationNanos() { - return batchDurationNanos; - } - - public void setBatchDurationNanos(long batchDurationNanos) throws HandshakeException { - if (batchDurationNanos < 0) { - throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + batchDurationNanos); - } - this.batchDurationNanos = batchDurationNanos; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/e23b2356/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java index 660c498..b2171df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java @@ -28,7 +28,7 @@ import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession; import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol; import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.FlowFileTransaction; -import org.apache.nifi.remote.protocol.HandshakenProperties; +import org.apache.nifi.remote.protocol.HandshakeProperties; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.Response; import org.apache.nifi.remote.protocol.ResponseCode; @@ -65,19 +65,19 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr } @Override - protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException { + protected HandshakeProperties doHandshake(Peer peer) throws IOException, HandshakeException { HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); final String transactionId = commsSession.getTransactionId(); - HandshakenProperties confirmed = null; + HandshakeProperties confirmed = null; if (!StringUtils.isEmpty(transactionId)) { // If handshake is already done, use it. confirmed = transactionManager.getHandshakenProperties(transactionId); } if (confirmed == null) { // If it's not, then do handshake. - confirmed = new HandshakenProperties(); + confirmed = new HandshakeProperties(); confirmed.setCommsIdentifier(transactionId); validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e23b2356/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 6e4b860..574c726 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -27,7 +27,7 @@ import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol; import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.HandshakenProperties; +import org.apache.nifi.remote.protocol.HandshakeProperties; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.util.NiFiProperties; @@ -51,9 +51,9 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1); @Override - protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException { + protected HandshakeProperties doHandshake(Peer peer) throws IOException, HandshakeException { - HandshakenProperties confirmed = new HandshakenProperties(); + HandshakeProperties confirmed = new HandshakeProperties(); final CommunicationsSession commsSession = peer.getCommunicationsSession(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); http://git-wip-us.apache.org/repos/asf/nifi/blob/e23b2356/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java index 6ab8988..e485095 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java @@ -18,7 +18,7 @@ package org.apache.nifi.remote; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.remote.protocol.FlowFileTransaction; -import org.apache.nifi.remote.protocol.HandshakenProperties; +import org.apache.nifi.remote.protocol.HandshakeProperties; import org.apache.nifi.util.NiFiProperties; import org.junit.BeforeClass; import org.junit.Test; @@ -46,7 +46,7 @@ public class TestHttpRemoteSiteListener { ProcessSession processSession = Mockito.mock(ProcessSession.class); FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); - transactionManager.holdTransaction(transactionId, transaction, new HandshakenProperties()); + transactionManager.holdTransaction(transactionId, transaction, new HandshakeProperties()); assertNotNull(transactionManager.getHandshakenProperties(transactionId));
