NIFI-1202: Site-to-Site batch settings.

- Added batchCount, batchSize, batchDuration to limit flow files to be
  included in a single Site-to-Site transaction.
- Added batch throttling logic when StandardRemoteGroupPort transfers
  flow files to a remote input port using the batch limit configurations,
  so that users can limit batch not only for pulling data, but also pushing 
data.
- Added destination list shuffle to provide better load distribution.
  Previously, the load distribution algorithm produced the same host 
consecutively.
- Added new batch settings to FlowConfiguration.xsd.
- Added new batch settings to Flow Fingerprint.
- Added new batch settings to Audit.
- Sort ports by name at 'Remote Process Group Ports' dialog.
- Show 'No value set' when a batch configuration is not set
- Updated batch settings tooltip to clearly explain how it works the 
configuration works differently for input and output ports.
- Updated DTO by separating batch settings to BatchSettingsDTO to indicate 
count, size and duration are a set of configurations.
- This closes #1306


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a41a2a9b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a41a2a9b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a41a2a9b

Branch: refs/heads/master
Commit: a41a2a9b1ab79b558831177e91e8fde49d6e748a
Parents: cf0e8bb
Author: Koji Kawamura <[email protected]>
Authored: Tue Apr 18 13:31:27 2017 +0900
Committer: Matt Gilman <[email protected]>
Committed: Thu Apr 27 10:35:07 2017 -0400

----------------------------------------------------------------------
 .../apache/nifi/remote/client/PeerSelector.java |   6 +
 .../nifi/remote/client/TestPeerSelector.java    |  35 +++++
 .../nifi/web/api/dto/BatchSettingsDTO.java      |  76 +++++++++
 .../web/api/dto/RemoteProcessGroupPortDTO.java  |  15 ++
 .../RemoteProcessGroupPortDescriptor.java       |  15 ++
 .../org/apache/nifi/remote/RemoteGroupPort.java |  12 ++
 .../apache/nifi/controller/FlowController.java  |   7 +
 .../serialization/FlowFromDOMFactory.java       |   3 +
 .../serialization/StandardFlowSerializer.java   |  12 ++
 .../nifi/fingerprint/FingerprintFactory.java    |   2 +-
 .../nifi/remote/StandardRemoteProcessGroup.java |  19 +++
 ...tandardRemoteProcessGroupPortDescriptor.java |  30 ++++
 .../src/main/resources/FlowConfiguration.xsd    |   3 +
 .../fingerprint/FingerprintFactoryTest.java     |  41 +++++
 .../nifi/remote/StandardRemoteGroupPort.java    |  76 ++++++++-
 .../remote/TestStandardRemoteGroupPort.java     | 156 ++++++++++++++++++-
 .../nifi/audit/RemoteProcessGroupAuditor.java   |   9 ++
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  13 ++
 .../dao/impl/StandardRemoteProcessGroupDAO.java |  69 +++++---
 .../audit/TestRemoteProcessGroupAuditor.java    |  39 +++++
 .../impl/TestStandardRemoteProcessGroupDAO.java | 127 +++++++++++++++
 .../canvas/remote-port-configuration.jsp        |  34 ++++
 .../css/remote-process-group-configuration.css  |  17 +-
 .../nf/canvas/nf-remote-process-group-ports.js  | 131 +++++++++++++++-
 24 files changed, 908 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index 0ec8951..a7bd094 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -202,6 +203,11 @@ public class PeerSelector {
             }
         }
 
+        // Shuffle destinations to provide better distribution.
+        // Without this, same host will be used continuously, especially when 
remote peers have the same number of queued files.
+        // Use Random(0) to provide consistent result for unit testing. 
Randomness is not important to shuffle destinations.
+        Collections.shuffle(destinations, new Random(0));
+
         final StringBuilder distributionDescription = new StringBuilder();
         distributionDescription.append("New Weighted Distribution of Nodes:");
         for (final Map.Entry<PeerStatus, Integer> entry : 
entryCountMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
index c434c7b..6a69fee 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
@@ -39,6 +39,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -60,6 +61,40 @@ public class TestPeerSelector {
     }
 
     @Test
+    public void testFormulateDestinationListForOutputEven() throws IOException 
{
+        final Set<PeerStatus> collection = new HashSet<>();
+        collection.add(new PeerStatus(new PeerDescription("Node1", 1111, 
true), 4096, true));
+        collection.add(new PeerStatus(new PeerDescription("Node2", 2222, 
true), 4096, true));
+        collection.add(new PeerStatus(new PeerDescription("Node3", 3333, 
true), 4096, true));
+        collection.add(new PeerStatus(new PeerDescription("Node4", 4444, 
true), 4096, true));
+        collection.add(new PeerStatus(new PeerDescription("Node5", 5555, 
true), 4096, true));
+
+        PeerStatusProvider peerStatusProvider = 
Mockito.mock(PeerStatusProvider.class);
+        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
+
+        final List<PeerStatus> destinations = 
peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
+        final Map<String, Integer> selectedCounts = 
calculateAverageSelectedCount(collection, destinations);
+
+        logger.info("selectedCounts={}", selectedCounts);
+
+        int consecutiveSamePeerCount = 0;
+        PeerStatus previousPeer = null;
+        for (PeerStatus peer : destinations) {
+            if (previousPeer != null && 
peer.getPeerDescription().equals(previousPeer.getPeerDescription())) {
+                consecutiveSamePeerCount++;
+                // The same peer shouldn't be used consecutively (number of 
nodes - 1) times or more.
+                if (consecutiveSamePeerCount >= (collection.size() - 1)) {
+                    fail("The same peer is returned consecutively too 
frequently.");
+                }
+            } else {
+                consecutiveSamePeerCount = 0;
+            }
+            previousPeer = peer;
+        }
+
+    }
+
+    @Test
     public void testFormulateDestinationListForOutput() throws IOException {
         final Set<PeerStatus> collection = new HashSet<>();
         collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, 
true), 4096, true));

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java
new file mode 100644
index 0000000..e1d63f8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * Details of batch settings of a remote process group port.
+ */
+@XmlType(name = "batchSettings")
+public class BatchSettingsDTO {
+
+    private Integer count;
+    private String size;
+    private String duration;
+
+    /**
+     * @return preferred number of flow files to include in a transaction
+     */
+    @ApiModelProperty(
+            value = "Preferred number of flow files to include in a 
transaction."
+    )
+    public Integer getCount() {
+        return count;
+    }
+
+    public void setCount(Integer count) {
+        this.count = count;
+    }
+
+    /**
+     * @return preferred number of bytes to include in a transaction
+     */
+    @ApiModelProperty(
+            value = "Preferred number of bytes to include in a transaction."
+    )
+    public String getSize() {
+        return size;
+    }
+
+    public void setSize(String size) {
+        this.size = size;
+    }
+
+    /**
+     * @return preferred amount of time that a transaction should span
+     */
+    @ApiModelProperty(
+            value = "Preferred amount of time that a transaction should span."
+    )
+    public String getDuration() {
+        return duration;
+    }
+
+    public void setDuration(String duration) {
+        this.duration = duration;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java
index e4a8131..2a34d9c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java
@@ -35,6 +35,7 @@ public class RemoteProcessGroupPortDTO {
     private Boolean exists;
     private Boolean targetRunning;
     private Boolean connected;
+    private BatchSettingsDTO batchSettings;
 
     /**
      * @return comments as configured in the target port
@@ -176,6 +177,20 @@ public class RemoteProcessGroupPortDTO {
         this.connected = connected;
     }
 
+    /**
+     * @return batch settings for data transmission
+     */
+    @ApiModelProperty(
+            value = "The batch settings for data transmission."
+    )
+    public BatchSettingsDTO getBatchSettings() {
+        return batchSettings;
+    }
+
+    public void setBatchSettings(BatchSettingsDTO batchSettings) {
+        this.batchSettings = batchSettings;
+    }
+
     @Override
     public int hashCode() {
         return 923847 + String.valueOf(name).hashCode();

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
index 4d7f774..c330c13 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
@@ -55,6 +55,21 @@ public interface RemoteProcessGroupPortDescriptor {
     Boolean getUseCompression();
 
     /**
+     * @return Preferred number of flow files to include in a transaction
+     */
+    Integer getBatchCount();
+
+    /**
+     * @return Preferred number of bytes to include in a transaction
+     */
+    String getBatchSize();
+
+    /**
+     * @return Preferred amount of for a transaction to span
+     */
+    String getBatchDuration();
+
+    /**
      * @return Whether or not the target port exists
      */
     Boolean getExists();

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
index f8f4b20..07faf42 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
@@ -41,4 +41,16 @@ public abstract class RemoteGroupPort extends AbstractPort 
implements Port, Remo
     public abstract boolean getTargetExists();
 
     public abstract boolean isTargetRunning();
+
+    public abstract Integer getBatchCount();
+
+    public abstract void setBatchCount(Integer batchCount);
+
+    public abstract String getBatchSize();
+
+    public abstract void setBatchSize(String batchSize);
+
+    public abstract String getBatchDuration();
+
+    public abstract void setBatchDuration(String batchDuration);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 151640e..b628668 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -195,6 +195,7 @@ import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.BatchSettingsDTO;
 import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
@@ -2011,6 +2012,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 
descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount());
                 descriptor.setTransmitting(port.isTransmitting());
                 descriptor.setUseCompression(port.getUseCompression());
+                final BatchSettingsDTO batchSettings = port.getBatchSettings();
+                if (batchSettings != null) {
+                    descriptor.setBatchCount(batchSettings.getCount());
+                    descriptor.setBatchSize(batchSettings.getSize());
+                    descriptor.setBatchDuration(batchSettings.getDuration());
+                }
                 remotePorts.add(descriptor);
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index 731d914..3bd037d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -357,6 +357,9 @@ public class FlowFromDOMFactory {
         descriptor.setComments(getString(element, "comments"));
         descriptor.setConcurrentlySchedulableTaskCount(getInt(element, 
"maxConcurrentTasks"));
         descriptor.setUseCompression(getBoolean(element, "useCompression"));
+        descriptor.setBatchCount(getOptionalInt(element, "batchCount"));
+        descriptor.setBatchSize(getString(element, "batchSize"));
+        descriptor.setBatchDuration(getString(element, "batchDuration"));
         
descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, 
"scheduledState")));
 
         return descriptor;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index aa7022b..c5f3f48 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -313,6 +313,18 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         addTextElement(element, "scheduledState", 
port.getScheduledState().name());
         addTextElement(element, "maxConcurrentTasks", 
port.getMaxConcurrentTasks());
         addTextElement(element, "useCompression", 
String.valueOf(port.isUseCompression()));
+        final Integer batchCount = port.getBatchCount();
+        if (batchCount != null && batchCount > 0) {
+            addTextElement(element, "batchCount", batchCount);
+        }
+        final String batchSize = port.getBatchSize();
+        if (batchSize != null && batchSize.length() > 0) {
+            addTextElement(element, "batchSize", batchSize);
+        }
+        final String batchDuration = port.getBatchDuration();
+        if (batchDuration != null && batchDuration.length() > 0) {
+            addTextElement(element, "batchDuration", batchDuration);
+        }
 
         parentElement.appendChild(element);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 1db80fc..d9e048e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -511,7 +511,7 @@ public class FingerprintFactory {
     }
 
     private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder 
builder, final Element remoteGroupPortElement) {
-        for (final String childName : new String[] {"id", 
"maxConcurrentTasks", "useCompression"}) {
+        for (final String childName : new String[] {"id", 
"maxConcurrentTasks", "useCompression", "batchCount", "batchSize", 
"batchDuration"}) {
             appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName));
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 7aee480..0cc8433 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.net.ssl.SSLContext;
 import javax.ws.rs.core.Response;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
@@ -632,6 +633,15 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             if (descriptor.getUseCompression() != null) {
                 port.setUseCompression(descriptor.getUseCompression());
             }
+            if (descriptor.getBatchCount() != null && 
descriptor.getBatchCount() > 0) {
+                port.setBatchCount(descriptor.getBatchCount());
+            }
+            if (!StringUtils.isBlank(descriptor.getBatchSize())) {
+                port.setBatchSize(descriptor.getBatchSize());
+            }
+            if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
+                port.setBatchDuration(descriptor.getBatchDuration());
+            }
         } finally {
             writeLock.unlock();
         }
@@ -697,6 +707,15 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             if (descriptor.getUseCompression() != null) {
                 port.setUseCompression(descriptor.getUseCompression());
             }
+            if (descriptor.getBatchCount() != null && 
descriptor.getBatchCount() > 0) {
+                port.setBatchCount(descriptor.getBatchCount());
+            }
+            if (!StringUtils.isBlank(descriptor.getBatchSize())) {
+                port.setBatchSize(descriptor.getBatchSize());
+            }
+            if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
+                port.setBatchDuration(descriptor.getBatchDuration());
+            }
 
             inputPorts.put(descriptor.getId(), port);
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java
index ed90186..c3a8f5e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java
@@ -27,6 +27,9 @@ public class StandardRemoteProcessGroupPortDescriptor 
implements RemoteProcessGr
     private Integer concurrentlySchedulableTaskCount;
     private Boolean transmitting;
     private Boolean useCompression;
+    private Integer batchCount;
+    private String batchSize;
+    private String batchDuration;
     private Boolean exists;
     private Boolean targetRunning;
     private Boolean connected;
@@ -95,6 +98,33 @@ public class StandardRemoteProcessGroupPortDescriptor 
implements RemoteProcessGr
     }
 
     @Override
+    public Integer getBatchCount() {
+        return batchCount;
+    }
+
+    public void setBatchCount(Integer batchCount) {
+        this.batchCount = batchCount;
+    }
+
+    @Override
+    public String getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(String batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public String getBatchDuration() {
+        return batchDuration;
+    }
+
+    public void setBatchDuration(String batchDuration) {
+        this.batchDuration = batchDuration;
+    }
+
+    @Override
     public Boolean getExists() {
         return exists;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index 8cf2ad8..30fff1f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -281,6 +281,9 @@
                 <xs:sequence>
                     <xs:element name="maxConcurrentTasks" 
type="xs:positiveInteger"></xs:element>
                     <xs:element name="useCompression" 
type="xs:boolean"></xs:element>
+                    <xs:element name="batchCount" type="xs:positiveInteger" 
minOccurs="0" maxOccurs="1" />
+                    <xs:element name="batchSize" type="xs:string" 
minOccurs="0" maxOccurs="1" />
+                    <xs:element name="batchDuration" type="xs:string" 
minOccurs="0" maxOccurs="1" />
                 </xs:sequence>
             </xs:extension>
         </xs:complexContent>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
index 87a372d..67f1ad4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
@@ -26,14 +26,17 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.Collections;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.serialization.FlowSerializer;
 import org.apache.nifi.controller.serialization.StandardFlowSerializer;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.Before;
@@ -273,4 +276,42 @@ public class FingerprintFactoryTest {
         assertEquals(expected.toString(), 
fingerprint("addRemoteProcessGroupFingerprint", Element.class, 
componentElement));
     }
 
+    @Test
+    public void testRemotePortFingerprint() throws Exception {
+
+        // Fill out every configuration.
+        final RemoteProcessGroup groupComponent = 
mock(RemoteProcessGroup.class);
+        when(groupComponent.getName()).thenReturn("name");
+        when(groupComponent.getIdentifier()).thenReturn("id");
+        when(groupComponent.getPosition()).thenReturn(new Position(10.5, 
20.3));
+        
when(groupComponent.getTargetUri()).thenReturn("http://node1:8080/nifi";);
+        
when(groupComponent.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+
+        final RemoteGroupPort portComponent = mock(RemoteGroupPort.class);
+        
when(groupComponent.getInputPorts()).thenReturn(Collections.singleton(portComponent));
+        when(portComponent.getName()).thenReturn("portName");
+        when(portComponent.getIdentifier()).thenReturn("portId");
+        when(portComponent.getPosition()).thenReturn(new Position(10.5, 20.3));
+        when(portComponent.getComments()).thenReturn("portComment");
+        
when(portComponent.getScheduledState()).thenReturn(ScheduledState.RUNNING);
+        when(portComponent.getMaxConcurrentTasks()).thenReturn(3);
+        when(portComponent.isUseCompression()).thenReturn(true);
+        when(portComponent.getBatchCount()).thenReturn(1234);
+        when(portComponent.getBatchSize()).thenReturn("64KB");
+        when(portComponent.getBatchDuration()).thenReturn("10sec");
+        // Serializer doesn't serialize if a port doesn't have any connection.
+        when(portComponent.hasIncomingConnection()).thenReturn(true);
+
+        // Assert fingerprints with expected one.
+        final String expected = "portId" +
+                "3" +
+                "true" +
+                "1234" +
+                "64KB" +
+                "10sec";
+
+        final Element rootElement = serializeElement(encryptor, 
RemoteProcessGroup.class, groupComponent, "addRemoteProcessGroup");
+        final Element componentElement = (Element) 
rootElement.getElementsByTagName("inputPort").item(0);
+        assertEquals(expected.toString(), 
fingerprint("addRemoteGroupPortFingerprint", Element.class, componentElement));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 92931f2..b1288f3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -46,11 +46,13 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
@@ -80,6 +82,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRemoteGroupPort.class);
     private final RemoteProcessGroup remoteGroup;
     private final AtomicBoolean useCompression = new AtomicBoolean(false);
+    private final AtomicReference<Integer> batchCount = new 
AtomicReference<>();
+    private final AtomicReference<String> batchSize = new AtomicReference<>();
+    private final AtomicReference<String> batchDuration = new 
AtomicReference<>();
     private final AtomicBoolean targetExists = new AtomicBoolean(true);
     private final AtomicBoolean targetRunning = new AtomicBoolean(true);
     private final SSLContext sslContext;
@@ -157,7 +162,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
 
         final long penalizationMillis = 
FormatUtils.getTimeDuration(remoteGroup.getYieldDuration(), 
TimeUnit.MILLISECONDS);
 
-        final SiteToSiteClient client = new SiteToSiteClient.Builder()
+        final SiteToSiteClient.Builder clientBuilder = new 
SiteToSiteClient.Builder()
                 
.urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris()))
                 .portIdentifier(getIdentifier())
                 .sslContext(sslContext)
@@ -168,9 +173,24 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
                 
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS)
                 .transportProtocol(remoteGroup.getTransportProtocol())
                 .httpProxy(new HttpProxy(remoteGroup.getProxyHost(), 
remoteGroup.getProxyPort(), remoteGroup.getProxyUser(), 
remoteGroup.getProxyPassword()))
-                .localAddress(remoteGroup.getLocalAddress())
-                .build();
-        clientRef.set(client);
+                .localAddress(remoteGroup.getLocalAddress());
+
+        final Integer batchCount = getBatchCount();
+        if (batchCount != null) {
+            clientBuilder.requestBatchCount(batchCount);
+        }
+
+        final String batchSize = getBatchSize();
+        if (batchSize != null && batchSize.length() > 0) {
+            
clientBuilder.requestBatchSize(DataUnit.parseDataSize(batchSize.trim(), 
DataUnit.B).intValue());
+        }
+
+        final String batchDuration = getBatchDuration();
+        if (batchDuration != null && batchDuration.length() > 0) {
+            
clientBuilder.requestBatchDuration(FormatUtils.getTimeDuration(batchDuration.trim(),
 TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+        }
+
+        clientRef.set(clientBuilder.build());
     }
 
     @Override
@@ -278,6 +298,13 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             final StopWatch stopWatch = new StopWatch(true);
             long bytesSent = 0L;
 
+            final SiteToSiteClientConfig siteToSiteClientConfig = 
getSiteToSiteClient().getConfig();
+            final long maxBatchBytes = 
siteToSiteClientConfig.getPreferredBatchSize();
+            final int maxBatchCount = 
siteToSiteClientConfig.getPreferredBatchCount();
+            final long preferredBatchDuration = 
siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
+            final long maxBatchDuration = preferredBatchDuration > 0 ? 
preferredBatchDuration : BATCH_SEND_NANOS;
+
+
             final Set<FlowFile> flowFilesSent = new HashSet<>();
             boolean continueTransaction = true;
             while (continueTransaction) {
@@ -304,10 +331,15 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
                 session.remove(flowFile);
 
                 final long sendingNanos = System.nanoTime() - 
startSendingNanos;
-                if (sendingNanos < BATCH_SEND_NANOS) {
-                    flowFile = session.get();
-                } else {
+
+                if (maxBatchCount > 0 && flowFilesSent.size() >= 
maxBatchCount) {
+                    flowFile = null;
+                } else if (maxBatchBytes > 0 && bytesSent >= maxBatchBytes) {
                     flowFile = null;
+                } else if (sendingNanos >= maxBatchDuration) {
+                    flowFile = null;
+                } else {
+                    flowFile = session.get();
                 }
 
                 continueTransaction = (flowFile != null);
@@ -478,6 +510,36 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
     }
 
     @Override
+    public Integer getBatchCount() {
+        return batchCount.get();
+    }
+
+    @Override
+    public void setBatchCount(Integer batchCount) {
+        this.batchCount.set(batchCount);
+    }
+
+    @Override
+    public String getBatchSize() {
+        return batchSize.get();
+    }
+
+    @Override
+    public void setBatchSize(String batchSize) {
+        this.batchSize.set(batchSize);
+    }
+
+    @Override
+    public String getBatchDuration() {
+        return batchDuration.get();
+    }
+
+    @Override
+    public void setBatchDuration(String batchDuration) {
+        this.batchDuration.set(batchDuration);
+    }
+
+    @Override
     public String toString() {
         return "RemoteGroupPort[name=" + getName() + ",targets=" + 
remoteGroup.getTargetUris() + "]";
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 31cd154..f677b88 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -19,6 +19,7 @@ package org.apache.nifi.remote;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.groups.ProcessGroup;
@@ -28,6 +29,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
 import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
@@ -43,17 +45,23 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
 import org.apache.nifi.util.NiFiProperties;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -70,7 +78,7 @@ public class TestStandardRemoteGroupPort {
     private Transaction transaction;
     private EventReporter eventReporter;
     private ProcessGroup processGroup;
-    public static final String REMOTE_CLUSTER_URL = 
"http://node0.example.com:8080/nifi";;
+    private static final String REMOTE_CLUSTER_URL = 
"http://node0.example.com:8080/nifi";;
     private StandardRemoteGroupPort port;
     private SharedSessionState sessionState;
     private MockProcessSession processSession;
@@ -84,17 +92,18 @@ public class TestStandardRemoteGroupPort {
 
     private void setupMock(final SiteToSiteTransportProtocol protocol,
             final TransferDirection direction) throws Exception {
-        setupMock(protocol, direction, mock(Transaction.class));
+        final SiteToSiteClientConfig siteToSiteClientConfig = new 
SiteToSiteClient.Builder().buildConfig();
+        setupMock(protocol, direction, siteToSiteClientConfig);
     }
 
     private void setupMock(final SiteToSiteTransportProtocol protocol,
             final TransferDirection direction,
-            final Transaction transaction) throws Exception {
+           final SiteToSiteClientConfig siteToSiteClientConfig) throws 
Exception {
         processGroup = null;
         remoteGroup = mock(RemoteProcessGroup.class);
         scheduler = null;
         siteToSiteClient = mock(SiteToSiteClient.class);
-        this.transaction = transaction;
+        this.transaction = mock(Transaction.class);
 
         eventReporter = mock(EventReporter.class);
 
@@ -119,6 +128,7 @@ public class TestStandardRemoteGroupPort {
         doReturn(REMOTE_CLUSTER_URL).when(remoteGroup).getTargetUri();
         doReturn(siteToSiteClient).when(port).getSiteToSiteClient();
         
doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction));
+        doReturn(siteToSiteClientConfig).when(siteToSiteClient).getConfig();
         doReturn(eventReporter).when(remoteGroup).getEventReporter();
 
     }
@@ -246,6 +256,144 @@ public class TestStandardRemoteGroupPort {
     }
 
     @Test
+    public void testSendBatchByCount() throws Exception {
+
+        final SiteToSiteClientConfig siteToSiteClientConfig = new 
SiteToSiteClient.Builder()
+                .requestBatchCount(2)
+                .buildConfig();
+        setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, 
siteToSiteClientConfig);
+
+        // t1 = {0, 1}, t2 = {2, 3}, t3 = {4}
+        final int[] expectedNumberOfPackets = {2, 2, 1};
+        testSendBatch(expectedNumberOfPackets);
+
+    }
+
+    @Test
+    public void testSendBatchBySize() throws Exception {
+
+        final SiteToSiteClientConfig siteToSiteClientConfig = new 
SiteToSiteClient.Builder()
+                .requestBatchSize(30)
+                .buildConfig();
+        setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, 
siteToSiteClientConfig);
+
+        // t1 = {10, 11, 12}, t2 = {13, 14}
+        final int[] expectedNumberOfPackets = {3, 2};
+        testSendBatch(expectedNumberOfPackets);
+
+    }
+
+    @Test
+    public void testSendBatchByDuration() throws Exception {
+
+        final SiteToSiteClientConfig siteToSiteClientConfig = new 
SiteToSiteClient.Builder()
+                .requestBatchDuration(1, TimeUnit.NANOSECONDS)
+                .buildConfig();
+        setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, 
siteToSiteClientConfig);
+
+        // t1 = {1}, t2 = {2} .. and so on.
+        final int[] expectedNumberOfPackets = {1, 1, 1, 1, 1};
+        testSendBatch(expectedNumberOfPackets);
+
+    }
+
+    /**
+     * Generate flow files to be sent, and execute port's onTrigger method.
+     * Finally, this method verifies whether packets are sent as expected.
+     * @param expectedNumberOfPackets Specify how many packets should be sent 
by each transaction.
+     *                                E.g. passing {2, 2, 1}, would generate 5 
flow files in total.
+     *                                Based on the siteToSiteClientConfig 
batch parameters,
+     *                                it's expected to be sent via 3 
transactions,
+     *                                transaction 0 will send flow file 0 and 
1,
+     *                                transaction 1 will send flow file 2 and 
3,
+     *                                and transaction 2 will send flow file 4.
+     *                                Each flow file has different content 
size generated automatically.
+     *                                The content size starts with 10, and 
increases as more flow files are generated.
+     *                                E.g. flow file 1 will have 10 bytes, 
flow file 2 has 11 bytes, f3 has 12 and so on.
+     *
+     */
+    private void testSendBatch(final int[] expectedNumberOfPackets) throws 
Exception {
+
+        setupMockProcessSession();
+
+        final String peerUrl = "http://node1.example.com:8080/nifi";;
+        final PeerDescription peerDescription = new 
PeerDescription("node1.example.com", 8080, false);
+        final HttpCommunicationsSession commsSession = new 
HttpCommunicationsSession();
+        final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
+
+        final String flowFileEndpointUri = 
"http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";;
+
+        doReturn(peer).when(transaction).getCommunicant();
+        commsSession.setDataTransferUrl(flowFileEndpointUri);
+
+        // Capture packets being sent to the remote peer
+        final AtomicInteger totalPacketsSent = new AtomicInteger(0);
+        final List<List<DataPacket>> sentPackets = new 
ArrayList<>(expectedNumberOfPackets.length);
+        final List<DataPacket> sentPacketsPerTransaction = new ArrayList<>();
+        doAnswer(invocation -> {
+            
sentPacketsPerTransaction.add((DataPacket)invocation.getArguments()[0]);
+            totalPacketsSent.incrementAndGet();
+            return null;
+        }).when(transaction).send(any(DataPacket.class));
+        doAnswer(invocation -> {
+            sentPackets.add(new ArrayList<>(sentPacketsPerTransaction));
+            sentPacketsPerTransaction.clear();
+            return null;
+        }).when(transaction).confirm();
+
+
+        // Execute onTrigger while offering new flow files.
+        final List<MockFlowFile> flowFiles = new ArrayList<>();
+        for (int i = 0; i < expectedNumberOfPackets.length; i++) {
+            int numOfPackets = expectedNumberOfPackets[i];
+            int startF = flowFiles.size();
+            int endF = startF + numOfPackets;
+            IntStream.range(startF, endF).forEach(f -> {
+                final StringBuilder flowFileContents = new 
StringBuilder("0123456789");
+                for (int c = 0; c < f; c++) {
+                    flowFileContents.append(c);
+                }
+                final byte[] bytes = flowFileContents.toString().getBytes();
+                final MockFlowFile flowFile = 
spy(processSession.createFlowFile(bytes));
+                when(flowFile.getSize()).then(invocation -> {
+                    Thread.sleep(1); // For testSendBatchByDuration
+                    return bytes.length;
+                });
+                sessionState.getFlowFileQueue().offer(flowFile);
+                flowFiles.add(flowFile);
+            });
+            port.onTrigger(processContext, processSession);
+        }
+
+        // Verify transactions, sent packets, and provenance events.
+        assertEquals(flowFiles.size(), totalPacketsSent.get());
+        assertEquals("The number of transactions should match as expected.", 
expectedNumberOfPackets.length, sentPackets.size());
+        final List<ProvenanceEventRecord> provenanceEvents = 
sessionState.getProvenanceEvents();
+        assertEquals(flowFiles.size(), provenanceEvents.size());
+
+        int f = 0;
+        for (int i = 0; i < expectedNumberOfPackets.length; i++) {
+            final List<DataPacket> dataPackets = sentPackets.get(i);
+            assertEquals(expectedNumberOfPackets[i], dataPackets.size());
+
+            for (int p = 0; p < dataPackets.size(); p++) {
+                final FlowFile flowFile = flowFiles.get(f);
+
+                // Assert sent packet
+                final DataPacket dataPacket = dataPackets.get(p);
+                assertEquals(flowFile.getSize(), dataPacket.getSize());
+
+                // Assert provenance event
+                final ProvenanceEventRecord provenanceEvent = 
provenanceEvents.get(f);
+                assertEquals(ProvenanceEventType.SEND, 
provenanceEvent.getEventType());
+                assertEquals(flowFileEndpointUri, 
provenanceEvent.getTransitUri());
+
+                f++;
+            }
+        }
+    }
+
+    @Test
     public void testReceiveHttp() throws Exception {
 
         setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.RECEIVE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
index 5a69cfe..690d404 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
@@ -93,6 +93,15 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
                     .setConvertName(PORT_NAME_CONVERT),
             new ConfigurationRecorder<RemoteGroupPort, 
RemoteProcessGroupPortDTO>("Compressed",
                     dto -> dto.getUseCompression() != null, 
RemoteGroupPort::isUseCompression)
+                    .setConvertName(PORT_NAME_CONVERT),
+            new ConfigurationRecorder<RemoteGroupPort, 
RemoteProcessGroupPortDTO>("Batch Count",
+                    dto -> dto.getBatchSettings() != null && 
dto.getBatchSettings().getCount() != null, RemoteGroupPort::getBatchCount)
+                    .setConvertName(PORT_NAME_CONVERT),
+            new ConfigurationRecorder<RemoteGroupPort, 
RemoteProcessGroupPortDTO>("Batch Size",
+                    dto -> dto.getBatchSettings() != null && 
dto.getBatchSettings().getSize() != null, RemoteGroupPort::getBatchSize)
+                    .setConvertName(PORT_NAME_CONVERT),
+            new ConfigurationRecorder<RemoteGroupPort, 
RemoteProcessGroupPortDTO>("Batch Duration",
+                    dto -> dto.getBatchSettings() != null && 
dto.getBatchSettings().getDuration() != null, RemoteGroupPort::getBatchDuration)
                     .setConvertName(PORT_NAME_CONVERT)
     );
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index aaa33d0..5154811 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1493,6 +1493,12 @@ public final class DtoFactory {
         dto.setUseCompression(port.isUseCompression());
         dto.setExists(port.getTargetExists());
 
+        final BatchSettingsDTO batchDTO = new BatchSettingsDTO();
+        batchDTO.setCount(port.getBatchCount());
+        batchDTO.setSize(port.getBatchSize());
+        batchDTO.setDuration(port.getBatchDuration());
+        dto.setBatchSettings(batchDTO);
+
         // determine if this port is currently connected to another component 
locally
         if 
(ConnectableType.REMOTE_OUTPUT_PORT.equals(port.getConnectableType())) {
             dto.setConnected(!port.getConnections().isEmpty());
@@ -2962,6 +2968,13 @@ public final class DtoFactory {
         
copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount());
         copy.setUseCompression(original.getUseCompression());
         copy.setExists(original.getExists());
+        final BatchSettingsDTO batchOrg = original.getBatchSettings();
+        if (batchOrg != null) {
+            final BatchSettingsDTO batchCopy = new BatchSettingsDTO();
+            batchCopy.setCount(batchOrg.getCount());
+            batchCopy.setSize(batchOrg.getSize());
+            batchCopy.setDuration(batchOrg.getDuration());
+        }
         return copy;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index a93c410..0392442 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -29,10 +29,12 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.exception.ValidationException;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.BatchSettingsDTO;
 import org.apache.nifi.web.api.dto.DtoFactory;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
@@ -203,7 +205,9 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
 
 
         // verify update when appropriate
-        if 
(isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), 
remoteProcessGroupPortDto.getUseCompression())) {
+        if 
(isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(),
+                remoteProcessGroupPortDto.getUseCompression(),
+                remoteProcessGroupPortDto.getBatchSettings())) {
             port.verifyCanUpdate();
         }
     }
@@ -219,6 +223,30 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
             validationErrors.add(String.format("Concurrent tasks for port '%s' 
must be a positive integer.", remoteGroupPort.getName()));
         }
 
+        final BatchSettingsDTO batchSettingsDTO = 
remoteProcessGroupPortDTO.getBatchSettings();
+        if (batchSettingsDTO != null) {
+            final Integer batchCount = batchSettingsDTO.getCount();
+            if (isNotNull(batchCount) && batchCount < 0) {
+                validationErrors.add(String.format("Batch count for port '%s' 
must be a positive integer.", remoteGroupPort.getName()));
+            }
+
+            final String batchSize = batchSettingsDTO.getSize();
+            if (isNotNull(batchSize) && batchSize.length() > 0
+                    && 
!DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) {
+                validationErrors.add(String.format("Batch size for port '%s' 
must be of format <Data Size> <Data Unit>" +
+                        " where <Data Size> is a non-negative integer and 
<Data Unit> is a supported Data"
+                        + " Unit, such as: B, KB, MB, GB, TB", 
remoteGroupPort.getName()));
+            }
+
+            final String batchDuration = batchSettingsDTO.getDuration();
+            if (isNotNull(batchDuration) && batchDuration.length() > 0
+                    && 
!FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches())
 {
+                validationErrors.add(String.format("Batch duration for port 
'%s' must be of format <duration> <TimeUnit>" +
+                        " where <duration> is a non-negative integer and 
TimeUnit is a supported Time Unit, such "
+                        + "as: nanos, millis, secs, mins, hrs, days", 
remoteGroupPort.getName()));
+            }
+        }
+
         return validationErrors;
     }
 
@@ -284,22 +312,7 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
         verifyUpdatePort(port, remoteProcessGroupPortDto);
 
         // perform the update
-        if 
(isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) {
-            
port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount());
-        }
-        if (isNotNull(remoteProcessGroupPortDto.getUseCompression())) {
-            
port.setUseCompression(remoteProcessGroupPortDto.getUseCompression());
-        }
-
-        final Boolean isTransmitting = 
remoteProcessGroupPortDto.isTransmitting();
-        if (isNotNull(isTransmitting)) {
-            // start or stop as necessary
-            if (!port.isRunning() && isTransmitting) {
-                remoteProcessGroup.startTransmitting(port);
-            } else if (port.isRunning() && !isTransmitting) {
-                remoteProcessGroup.stopTransmitting(port);
-            }
-        }
+        updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
 
         return port;
     }
@@ -318,6 +331,19 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
         verifyUpdatePort(port, remoteProcessGroupPortDto);
 
         // perform the update
+        updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
+
+        return port;
+    }
+
+    /**
+     *
+     * @param port Port instance to be updated.
+     * @param remoteProcessGroupPortDto DTO containing updated remote process 
group port settings.
+     * @param remoteProcessGroup If remoteProcessGroupPortDto has updated 
isTransmitting input,
+     *                           this method will start or stop the port in 
this remoteProcessGroup as necessary.
+     */
+    private void updatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO 
remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup) {
         if 
(isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) {
             
port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount());
         }
@@ -325,6 +351,13 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
             
port.setUseCompression(remoteProcessGroupPortDto.getUseCompression());
         }
 
+        final BatchSettingsDTO batchSettingsDTO = 
remoteProcessGroupPortDto.getBatchSettings();
+        if (isNotNull(batchSettingsDTO)) {
+            port.setBatchCount(batchSettingsDTO.getCount());
+            port.setBatchSize(batchSettingsDTO.getSize());
+            port.setBatchDuration(batchSettingsDTO.getDuration());
+        }
+
         final Boolean isTransmitting = 
remoteProcessGroupPortDto.isTransmitting();
         if (isNotNull(isTransmitting)) {
             // start or stop as necessary
@@ -334,8 +367,6 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
                 remoteProcessGroup.stopTransmitting(port);
             }
         }
-
-        return port;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
index 725e4d4..ea7fa7d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
@@ -29,6 +29,7 @@ import org.apache.nifi.authorization.user.StandardNiFiUser;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.web.api.dto.BatchSettingsDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
@@ -40,6 +41,7 @@ import 
org.springframework.security.core.context.SecurityContext;
 import org.springframework.security.core.context.SecurityContextHolder;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK;
@@ -434,6 +436,13 @@ public class TestRemoteProcessGroupAuditor {
         
when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount());
         
when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression());
 
+        final BatchSettingsDTO batchSettings = 
inputRPGPortDTO.getBatchSettings();
+        if (batchSettings != null) {
+            
when(updatedRPGPort.getBatchCount()).thenReturn(batchSettings.getCount());
+            
when(updatedRPGPort.getBatchSize()).thenReturn(batchSettings.getSize());
+            
when(updatedRPGPort.getBatchDuration()).thenReturn(batchSettings.getDuration());
+        }
+
         when(joinPoint.proceed()).thenReturn(updatedRPGPort);
 
         // Capture added actions so that those can be asserted later.
@@ -553,4 +562,34 @@ public class TestRemoteProcessGroupAuditor {
         assertConfigureDetails(action.getActionDetails(), 
"input-port-1.Compressed", "false", "true");
 
     }
+
+    @Test
+    public void testConfigurePortBatchSettings() throws Throwable {
+
+        final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
+        when(existingRPGPort.getName()).thenReturn("input-port-1");
+
+        final RemoteProcessGroupPortDTO inputRPGPortDTO = 
defaultRemoteProcessGroupPortDTO();
+        final BatchSettingsDTO batchSettingsDTO = new BatchSettingsDTO();
+        batchSettingsDTO.setCount(1234);
+        batchSettingsDTO.setSize("64KB");
+        batchSettingsDTO.setDuration("10sec");
+        inputRPGPortDTO.setBatchSettings(batchSettingsDTO);
+
+        final Collection<Action> actions = 
updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
+
+        assertEquals(3, actions.size());
+        final Iterator<Action> iterator = actions.iterator();
+        Action action = iterator.next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch 
Count", "0", "1234");
+
+        action = iterator.next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch 
Size", "", "64KB");
+
+        action = iterator.next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch 
Duration", "", "10sec");
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java
new file mode 100644
index 0000000..f68a115
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.dao.impl;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.exception.ValidationException;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.web.api.dto.BatchSettingsDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStandardRemoteProcessGroupDAO {
+
+    private void validate(final StandardRemoteProcessGroupDAO dao, final 
RemoteProcessGroupPortDTO dto, final String ... errMessageKeywords) {
+        try {
+            dao.verifyUpdateInputPort(dto.getGroupId(), dto);
+            if (errMessageKeywords.length > 0) {
+                fail("Validation should fail with keywords: " + 
Arrays.asList(errMessageKeywords));
+            }
+        } catch (ValidationException e) {
+            if (errMessageKeywords.length == 0) {
+                fail("Validation should pass, but failed with: " + e);
+            }
+            final List<String> validationErrors = e.getValidationErrors();
+            assertEquals("Validation should return one validationErrors", 1, 
validationErrors.size());
+            final String validationError = validationErrors.get(0);
+            for (String errMessageKeyword : errMessageKeywords) {
+                assertTrue("validation error message should contain " + 
errMessageKeyword + ", but was: " + validationError,
+                        validationError.contains(errMessageKeyword));
+            }
+        }
+    }
+
+    @Test
+    public void testVerifyUpdateInputPort() {
+        final StandardRemoteProcessGroupDAO dao = new 
StandardRemoteProcessGroupDAO();
+
+        final String remoteProcessGroupId = "remote-process-group-id";
+        final String remoteProcessGroupInputPortId = 
"remote-process-group-input-port-id";
+
+        final FlowController flowController = mock(FlowController.class);
+        final ProcessGroup processGroup = mock(ProcessGroup.class);
+        final RemoteProcessGroup remoteProcessGroup = 
mock(RemoteProcessGroup.class);
+        final RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
+
+        dao.setFlowController(flowController);
+        when(flowController.getGroup(any())).thenReturn(processGroup);
+        
when(processGroup.findRemoteProcessGroup(eq(remoteProcessGroupId))).thenReturn(remoteProcessGroup);
+        
when(remoteProcessGroup.getInputPort(remoteProcessGroupInputPortId)).thenReturn(remoteGroupPort);
+        when(remoteGroupPort.getName()).thenReturn("remote-group-port");
+
+        final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO();
+        dto.setGroupId(remoteProcessGroupId);
+        dto.setId(remoteProcessGroupInputPortId);
+        final BatchSettingsDTO batchSettings = new BatchSettingsDTO();
+        dto.setBatchSettings(batchSettings);
+
+        // Empty input values should pass validation.
+        dao.verifyUpdateInputPort(remoteProcessGroupId, dto);
+
+        // Concurrent tasks
+        dto.setConcurrentlySchedulableTaskCount(0);
+        validate(dao, dto, "Concurrent tasks", "positive integer");
+
+        dto.setConcurrentlySchedulableTaskCount(2);
+        validate(dao, dto);
+
+        // Batch count
+        batchSettings.setCount(-1);
+        validate(dao, dto, "Batch count", "positive integer");
+
+        batchSettings.setCount(0);
+        validate(dao, dto);
+
+        batchSettings.setCount(1000);
+        validate(dao, dto);
+
+        // Batch size
+        batchSettings.setSize("AB");
+        validate(dao, dto, "Batch size", "Data Size");
+
+        batchSettings.setSize("10 days");
+        validate(dao, dto, "Batch size", "Data Size");
+
+        batchSettings.setSize("300MB");
+        validate(dao, dto);
+
+        // Batch duration
+        batchSettings.setDuration("AB");
+        validate(dao, dto, "Batch duration", "Time Unit");
+
+        batchSettings.setDuration("10 KB");
+        validate(dao, dto, "Batch duration", "Time Unit");
+
+        batchSettings.setDuration("10 secs");
+        validate(dao, dto);
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp
index 8f38369..a62700c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp
@@ -37,6 +37,40 @@
                     <span class="nf-checkbox-label">Compressed</span>
                 </div>
             </div>
+            <div class="clear"></div>
+        </div>
+        <div class="batch-settings">
+            <div class="setting-name">
+                Batch Settings:
+            </div>
+            <div class="setting batch-setting">
+                <div class="setting-name">
+                    Count
+                    <div class="fa fa-question-circle" alt="Info" title="The 
preferred number of flow files to include in a transaction for this 
port."></div>
+                </div>
+                <div class="setting-field">
+                    <input id="remote-port-batch-count" type="text"/>
+                </div>
+            </div>
+            <div class="setting batch-setting">
+                <div class="setting-name">
+                    Size
+                    <div class="fa fa-question-circle" alt="Info" title="The 
preferred number of bytes to include in a transaction for this port."></div>
+                </div>
+                <div class="setting-field">
+                    <input id="remote-port-batch-size" type="text"/>
+                </div>
+            </div>
+            <div class="setting batch-setting">
+                <div class="setting-name">
+                    Duration
+                    <div class="fa fa-question-circle" alt="Info" title="The 
preferred amount of time that a transaction should span for this port."></div>
+                </div>
+                <div class="setting-field">
+                    <input id="remote-port-batch-duration" type="text"/>
+                </div>
+            </div>
+            <div class="clear"></div>
         </div>
     </div>
 </div>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css
index 58dcfcd..4a3fad4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css
@@ -125,6 +125,7 @@ div.remote-port-description {
 
 div.concurrent-task-container {
     float: left;
+    width: 200px;
 }
 
 img.concurrent-tasks-info {
@@ -133,7 +134,11 @@ img.concurrent-tasks-info {
 }
 
 div.compression-container {
-    float: right;
+    float: left;
+}
+
+div.batch-settings-container {
+    margin-top: 8px;
 }
 
 div.remote-port-transmission-container {
@@ -165,7 +170,7 @@ div.disabled-transmission-switch {
 #remote-port-concurrent-tasks {
     font-size: 11px !important;
     float: left;
-    width: 75%;
+    width: 266px;
 }
 
 #remote-port-use-compression-container {
@@ -179,4 +184,10 @@ div.disabled-transmission-switch {
 
 #remote-port-concurrent-task-header {
     margin-top: 5px;
-}
\ No newline at end of file
+}
+
+div.batch-setting {
+    margin-right: 8px;
+    width: 30%;
+    float: left;
+}

Reply via email to