NIFI-1202: Site-to-Site batch settings. - Added batchCount, batchSize, batchDuration to limit flow files to be included in a single Site-to-Site transaction. - Added batch throttling logic when StandardRemoteGroupPort transfers flow files to a remote input port using the batch limit configurations, so that users can limit batch not only for pulling data, but also pushing data. - Added destination list shuffle to provide better load distribution. Previously, the load distribution algorithm produced the same host consecutively. - Added new batch settings to FlowConfiguration.xsd. - Added new batch settings to Flow Fingerprint. - Added new batch settings to Audit. - Sort ports by name at 'Remote Process Group Ports' dialog. - Show 'No value set' when a batch configuration is not set - Updated batch settings tooltip to clearly explain how it works the configuration works differently for input and output ports. - Updated DTO by separating batch settings to BatchSettingsDTO to indicate count, size and duration are a set of configurations. - This closes #1306
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a41a2a9b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a41a2a9b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a41a2a9b Branch: refs/heads/master Commit: a41a2a9b1ab79b558831177e91e8fde49d6e748a Parents: cf0e8bb Author: Koji Kawamura <[email protected]> Authored: Tue Apr 18 13:31:27 2017 +0900 Committer: Matt Gilman <[email protected]> Committed: Thu Apr 27 10:35:07 2017 -0400 ---------------------------------------------------------------------- .../apache/nifi/remote/client/PeerSelector.java | 6 + .../nifi/remote/client/TestPeerSelector.java | 35 +++++ .../nifi/web/api/dto/BatchSettingsDTO.java | 76 +++++++++ .../web/api/dto/RemoteProcessGroupPortDTO.java | 15 ++ .../RemoteProcessGroupPortDescriptor.java | 15 ++ .../org/apache/nifi/remote/RemoteGroupPort.java | 12 ++ .../apache/nifi/controller/FlowController.java | 7 + .../serialization/FlowFromDOMFactory.java | 3 + .../serialization/StandardFlowSerializer.java | 12 ++ .../nifi/fingerprint/FingerprintFactory.java | 2 +- .../nifi/remote/StandardRemoteProcessGroup.java | 19 +++ ...tandardRemoteProcessGroupPortDescriptor.java | 30 ++++ .../src/main/resources/FlowConfiguration.xsd | 3 + .../fingerprint/FingerprintFactoryTest.java | 41 +++++ .../nifi/remote/StandardRemoteGroupPort.java | 76 ++++++++- .../remote/TestStandardRemoteGroupPort.java | 156 ++++++++++++++++++- .../nifi/audit/RemoteProcessGroupAuditor.java | 9 ++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 13 ++ .../dao/impl/StandardRemoteProcessGroupDAO.java | 69 +++++--- .../audit/TestRemoteProcessGroupAuditor.java | 39 +++++ .../impl/TestStandardRemoteProcessGroupDAO.java | 127 +++++++++++++++ .../canvas/remote-port-configuration.jsp | 34 ++++ .../css/remote-process-group-configuration.css | 17 +- .../nf/canvas/nf-remote-process-group-ports.js | 131 +++++++++++++++- 24 files changed, 908 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index 0ec8951..a7bd094 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -202,6 +203,11 @@ public class PeerSelector { } } + // Shuffle destinations to provide better distribution. + // Without this, same host will be used continuously, especially when remote peers have the same number of queued files. + // Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations. + Collections.shuffle(destinations, new Random(0)); + final StringBuilder distributionDescription = new StringBuilder(); distributionDescription.append("New Weighted Distribution of Nodes:"); for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java index c434c7b..6a69fee 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java @@ -39,6 +39,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -60,6 +61,40 @@ public class TestPeerSelector { } @Test + public void testFormulateDestinationListForOutputEven() throws IOException { + final Set<PeerStatus> collection = new HashSet<>(); + collection.add(new PeerStatus(new PeerDescription("Node1", 1111, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node2", 2222, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node3", 3333, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node4", 4444, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node5", 5555, true), 4096, true)); + + PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class); + PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null); + + final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE); + final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations); + + logger.info("selectedCounts={}", selectedCounts); + + int consecutiveSamePeerCount = 0; + PeerStatus previousPeer = null; + for (PeerStatus peer : destinations) { + if (previousPeer != null && peer.getPeerDescription().equals(previousPeer.getPeerDescription())) { + consecutiveSamePeerCount++; + // The same peer shouldn't be used consecutively (number of nodes - 1) times or more. + if (consecutiveSamePeerCount >= (collection.size() - 1)) { + fail("The same peer is returned consecutively too frequently."); + } + } else { + consecutiveSamePeerCount = 0; + } + previousPeer = peer; + } + + } + + @Test public void testFormulateDestinationListForOutput() throws IOException { final Set<PeerStatus> collection = new HashSet<>(); collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true)); http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java new file mode 100644 index 0000000..e1d63f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +/** + * Details of batch settings of a remote process group port. + */ +@XmlType(name = "batchSettings") +public class BatchSettingsDTO { + + private Integer count; + private String size; + private String duration; + + /** + * @return preferred number of flow files to include in a transaction + */ + @ApiModelProperty( + value = "Preferred number of flow files to include in a transaction." + ) + public Integer getCount() { + return count; + } + + public void setCount(Integer count) { + this.count = count; + } + + /** + * @return preferred number of bytes to include in a transaction + */ + @ApiModelProperty( + value = "Preferred number of bytes to include in a transaction." + ) + public String getSize() { + return size; + } + + public void setSize(String size) { + this.size = size; + } + + /** + * @return preferred amount of time that a transaction should span + */ + @ApiModelProperty( + value = "Preferred amount of time that a transaction should span." + ) + public String getDuration() { + return duration; + } + + public void setDuration(String duration) { + this.duration = duration; + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java index e4a8131..2a34d9c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java @@ -35,6 +35,7 @@ public class RemoteProcessGroupPortDTO { private Boolean exists; private Boolean targetRunning; private Boolean connected; + private BatchSettingsDTO batchSettings; /** * @return comments as configured in the target port @@ -176,6 +177,20 @@ public class RemoteProcessGroupPortDTO { this.connected = connected; } + /** + * @return batch settings for data transmission + */ + @ApiModelProperty( + value = "The batch settings for data transmission." + ) + public BatchSettingsDTO getBatchSettings() { + return batchSettings; + } + + public void setBatchSettings(BatchSettingsDTO batchSettings) { + this.batchSettings = batchSettings; + } + @Override public int hashCode() { return 923847 + String.valueOf(name).hashCode(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java index 4d7f774..c330c13 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java @@ -55,6 +55,21 @@ public interface RemoteProcessGroupPortDescriptor { Boolean getUseCompression(); /** + * @return Preferred number of flow files to include in a transaction + */ + Integer getBatchCount(); + + /** + * @return Preferred number of bytes to include in a transaction + */ + String getBatchSize(); + + /** + * @return Preferred amount of for a transaction to span + */ + String getBatchDuration(); + + /** * @return Whether or not the target port exists */ Boolean getExists(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java index f8f4b20..07faf42 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -41,4 +41,16 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port, Remo public abstract boolean getTargetExists(); public abstract boolean isTargetRunning(); + + public abstract Integer getBatchCount(); + + public abstract void setBatchCount(Integer batchCount); + + public abstract String getBatchSize(); + + public abstract void setBatchSize(String batchSize); + + public abstract String getBatchDuration(); + + public abstract void setBatchDuration(String batchDuration); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 151640e..b628668 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -195,6 +195,7 @@ import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -2011,6 +2012,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount()); descriptor.setTransmitting(port.isTransmitting()); descriptor.setUseCompression(port.getUseCompression()); + final BatchSettingsDTO batchSettings = port.getBatchSettings(); + if (batchSettings != null) { + descriptor.setBatchCount(batchSettings.getCount()); + descriptor.setBatchSize(batchSettings.getSize()); + descriptor.setBatchDuration(batchSettings.getDuration()); + } remotePorts.add(descriptor); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 731d914..3bd037d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -357,6 +357,9 @@ public class FlowFromDOMFactory { descriptor.setComments(getString(element, "comments")); descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks")); descriptor.setUseCompression(getBoolean(element, "useCompression")); + descriptor.setBatchCount(getOptionalInt(element, "batchCount")); + descriptor.setBatchSize(getString(element, "batchSize")); + descriptor.setBatchDuration(getString(element, "batchDuration")); descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState"))); return descriptor; http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index aa7022b..c5f3f48 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -313,6 +313,18 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "scheduledState", port.getScheduledState().name()); addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks()); addTextElement(element, "useCompression", String.valueOf(port.isUseCompression())); + final Integer batchCount = port.getBatchCount(); + if (batchCount != null && batchCount > 0) { + addTextElement(element, "batchCount", batchCount); + } + final String batchSize = port.getBatchSize(); + if (batchSize != null && batchSize.length() > 0) { + addTextElement(element, "batchSize", batchSize); + } + final String batchDuration = port.getBatchDuration(); + if (batchDuration != null && batchDuration.length() > 0) { + addTextElement(element, "batchDuration", batchDuration); + } parentElement.appendChild(element); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 1db80fc..d9e048e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -511,7 +511,7 @@ public class FingerprintFactory { } private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) { - for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression"}) { + for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 7aee480..0cc8433 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -632,6 +633,15 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { if (descriptor.getUseCompression() != null) { port.setUseCompression(descriptor.getUseCompression()); } + if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) { + port.setBatchCount(descriptor.getBatchCount()); + } + if (!StringUtils.isBlank(descriptor.getBatchSize())) { + port.setBatchSize(descriptor.getBatchSize()); + } + if (!StringUtils.isBlank(descriptor.getBatchDuration())) { + port.setBatchDuration(descriptor.getBatchDuration()); + } } finally { writeLock.unlock(); } @@ -697,6 +707,15 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { if (descriptor.getUseCompression() != null) { port.setUseCompression(descriptor.getUseCompression()); } + if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) { + port.setBatchCount(descriptor.getBatchCount()); + } + if (!StringUtils.isBlank(descriptor.getBatchSize())) { + port.setBatchSize(descriptor.getBatchSize()); + } + if (!StringUtils.isBlank(descriptor.getBatchDuration())) { + port.setBatchDuration(descriptor.getBatchDuration()); + } inputPorts.put(descriptor.getId(), port); } finally { http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java index ed90186..c3a8f5e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java @@ -27,6 +27,9 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr private Integer concurrentlySchedulableTaskCount; private Boolean transmitting; private Boolean useCompression; + private Integer batchCount; + private String batchSize; + private String batchDuration; private Boolean exists; private Boolean targetRunning; private Boolean connected; @@ -95,6 +98,33 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr } @Override + public Integer getBatchCount() { + return batchCount; + } + + public void setBatchCount(Integer batchCount) { + this.batchCount = batchCount; + } + + @Override + public String getBatchSize() { + return batchSize; + } + + public void setBatchSize(String batchSize) { + this.batchSize = batchSize; + } + + @Override + public String getBatchDuration() { + return batchDuration; + } + + public void setBatchDuration(String batchDuration) { + this.batchDuration = batchDuration; + } + + @Override public Boolean getExists() { return exists; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 8cf2ad8..30fff1f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -281,6 +281,9 @@ <xs:sequence> <xs:element name="maxConcurrentTasks" type="xs:positiveInteger"></xs:element> <xs:element name="useCompression" type="xs:boolean"></xs:element> + <xs:element name="batchCount" type="xs:positiveInteger" minOccurs="0" maxOccurs="1" /> + <xs:element name="batchSize" type="xs:string" minOccurs="0" maxOccurs="1" /> + <xs:element name="batchDuration" type="xs:string" minOccurs="0" maxOccurs="1" /> </xs:sequence> </xs:extension> </xs:complexContent> http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java index 87a372d..67f1ad4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java @@ -26,14 +26,17 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.lang.reflect.Method; +import java.util.Collections; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.util.NiFiProperties; import org.junit.Before; @@ -273,4 +276,42 @@ public class FingerprintFactoryTest { assertEquals(expected.toString(), fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement)); } + @Test + public void testRemotePortFingerprint() throws Exception { + + // Fill out every configuration. + final RemoteProcessGroup groupComponent = mock(RemoteProcessGroup.class); + when(groupComponent.getName()).thenReturn("name"); + when(groupComponent.getIdentifier()).thenReturn("id"); + when(groupComponent.getPosition()).thenReturn(new Position(10.5, 20.3)); + when(groupComponent.getTargetUri()).thenReturn("http://node1:8080/nifi"); + when(groupComponent.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW); + + final RemoteGroupPort portComponent = mock(RemoteGroupPort.class); + when(groupComponent.getInputPorts()).thenReturn(Collections.singleton(portComponent)); + when(portComponent.getName()).thenReturn("portName"); + when(portComponent.getIdentifier()).thenReturn("portId"); + when(portComponent.getPosition()).thenReturn(new Position(10.5, 20.3)); + when(portComponent.getComments()).thenReturn("portComment"); + when(portComponent.getScheduledState()).thenReturn(ScheduledState.RUNNING); + when(portComponent.getMaxConcurrentTasks()).thenReturn(3); + when(portComponent.isUseCompression()).thenReturn(true); + when(portComponent.getBatchCount()).thenReturn(1234); + when(portComponent.getBatchSize()).thenReturn("64KB"); + when(portComponent.getBatchDuration()).thenReturn("10sec"); + // Serializer doesn't serialize if a port doesn't have any connection. + when(portComponent.hasIncomingConnection()).thenReturn(true); + + // Assert fingerprints with expected one. + final String expected = "portId" + + "3" + + "true" + + "1234" + + "64KB" + + "10sec"; + + final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, groupComponent, "addRemoteProcessGroup"); + final Element componentElement = (Element) rootElement.getElementsByTagName("inputPort").item(0); + assertEquals(expected.toString(), fingerprint("addRemoteGroupPortFingerprint", Element.class, componentElement)); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 92931f2..b1288f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -46,11 +46,13 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -80,6 +82,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class); private final RemoteProcessGroup remoteGroup; private final AtomicBoolean useCompression = new AtomicBoolean(false); + private final AtomicReference<Integer> batchCount = new AtomicReference<>(); + private final AtomicReference<String> batchSize = new AtomicReference<>(); + private final AtomicReference<String> batchDuration = new AtomicReference<>(); private final AtomicBoolean targetExists = new AtomicBoolean(true); private final AtomicBoolean targetRunning = new AtomicBoolean(true); private final SSLContext sslContext; @@ -157,7 +162,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final long penalizationMillis = FormatUtils.getTimeDuration(remoteGroup.getYieldDuration(), TimeUnit.MILLISECONDS); - final SiteToSiteClient client = new SiteToSiteClient.Builder() + final SiteToSiteClient.Builder clientBuilder = new SiteToSiteClient.Builder() .urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris())) .portIdentifier(getIdentifier()) .sslContext(sslContext) @@ -168,9 +173,24 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { .timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .transportProtocol(remoteGroup.getTransportProtocol()) .httpProxy(new HttpProxy(remoteGroup.getProxyHost(), remoteGroup.getProxyPort(), remoteGroup.getProxyUser(), remoteGroup.getProxyPassword())) - .localAddress(remoteGroup.getLocalAddress()) - .build(); - clientRef.set(client); + .localAddress(remoteGroup.getLocalAddress()); + + final Integer batchCount = getBatchCount(); + if (batchCount != null) { + clientBuilder.requestBatchCount(batchCount); + } + + final String batchSize = getBatchSize(); + if (batchSize != null && batchSize.length() > 0) { + clientBuilder.requestBatchSize(DataUnit.parseDataSize(batchSize.trim(), DataUnit.B).intValue()); + } + + final String batchDuration = getBatchDuration(); + if (batchDuration != null && batchDuration.length() > 0) { + clientBuilder.requestBatchDuration(FormatUtils.getTimeDuration(batchDuration.trim(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } + + clientRef.set(clientBuilder.build()); } @Override @@ -278,6 +298,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final StopWatch stopWatch = new StopWatch(true); long bytesSent = 0L; + final SiteToSiteClientConfig siteToSiteClientConfig = getSiteToSiteClient().getConfig(); + final long maxBatchBytes = siteToSiteClientConfig.getPreferredBatchSize(); + final int maxBatchCount = siteToSiteClientConfig.getPreferredBatchCount(); + final long preferredBatchDuration = siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + final long maxBatchDuration = preferredBatchDuration > 0 ? preferredBatchDuration : BATCH_SEND_NANOS; + + final Set<FlowFile> flowFilesSent = new HashSet<>(); boolean continueTransaction = true; while (continueTransaction) { @@ -304,10 +331,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { session.remove(flowFile); final long sendingNanos = System.nanoTime() - startSendingNanos; - if (sendingNanos < BATCH_SEND_NANOS) { - flowFile = session.get(); - } else { + + if (maxBatchCount > 0 && flowFilesSent.size() >= maxBatchCount) { + flowFile = null; + } else if (maxBatchBytes > 0 && bytesSent >= maxBatchBytes) { flowFile = null; + } else if (sendingNanos >= maxBatchDuration) { + flowFile = null; + } else { + flowFile = session.get(); } continueTransaction = (flowFile != null); @@ -478,6 +510,36 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } @Override + public Integer getBatchCount() { + return batchCount.get(); + } + + @Override + public void setBatchCount(Integer batchCount) { + this.batchCount.set(batchCount); + } + + @Override + public String getBatchSize() { + return batchSize.get(); + } + + @Override + public void setBatchSize(String batchSize) { + this.batchSize.set(batchSize); + } + + @Override + public String getBatchDuration() { + return batchDuration.get(); + } + + @Override + public void setBatchDuration(String batchDuration) { + this.batchDuration.set(batchDuration); + } + + @Override public String toString() { return "RemoteGroupPort[name=" + getName() + ",targets=" + remoteGroup.getTargetUris() + "]"; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index 31cd154..f677b88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -19,6 +19,7 @@ package org.apache.nifi.remote; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.groups.ProcessGroup; @@ -28,6 +29,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.io.http.HttpCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.protocol.CommunicationsSession; @@ -43,17 +45,23 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; import org.apache.nifi.util.NiFiProperties; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -70,7 +78,7 @@ public class TestStandardRemoteGroupPort { private Transaction transaction; private EventReporter eventReporter; private ProcessGroup processGroup; - public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi"; + private static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi"; private StandardRemoteGroupPort port; private SharedSessionState sessionState; private MockProcessSession processSession; @@ -84,17 +92,18 @@ public class TestStandardRemoteGroupPort { private void setupMock(final SiteToSiteTransportProtocol protocol, final TransferDirection direction) throws Exception { - setupMock(protocol, direction, mock(Transaction.class)); + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder().buildConfig(); + setupMock(protocol, direction, siteToSiteClientConfig); } private void setupMock(final SiteToSiteTransportProtocol protocol, final TransferDirection direction, - final Transaction transaction) throws Exception { + final SiteToSiteClientConfig siteToSiteClientConfig) throws Exception { processGroup = null; remoteGroup = mock(RemoteProcessGroup.class); scheduler = null; siteToSiteClient = mock(SiteToSiteClient.class); - this.transaction = transaction; + this.transaction = mock(Transaction.class); eventReporter = mock(EventReporter.class); @@ -119,6 +128,7 @@ public class TestStandardRemoteGroupPort { doReturn(REMOTE_CLUSTER_URL).when(remoteGroup).getTargetUri(); doReturn(siteToSiteClient).when(port).getSiteToSiteClient(); doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction)); + doReturn(siteToSiteClientConfig).when(siteToSiteClient).getConfig(); doReturn(eventReporter).when(remoteGroup).getEventReporter(); } @@ -246,6 +256,144 @@ public class TestStandardRemoteGroupPort { } @Test + public void testSendBatchByCount() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchCount(2) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {0, 1}, t2 = {2, 3}, t3 = {4} + final int[] expectedNumberOfPackets = {2, 2, 1}; + testSendBatch(expectedNumberOfPackets); + + } + + @Test + public void testSendBatchBySize() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchSize(30) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {10, 11, 12}, t2 = {13, 14} + final int[] expectedNumberOfPackets = {3, 2}; + testSendBatch(expectedNumberOfPackets); + + } + + @Test + public void testSendBatchByDuration() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchDuration(1, TimeUnit.NANOSECONDS) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {1}, t2 = {2} .. and so on. + final int[] expectedNumberOfPackets = {1, 1, 1, 1, 1}; + testSendBatch(expectedNumberOfPackets); + + } + + /** + * Generate flow files to be sent, and execute port's onTrigger method. + * Finally, this method verifies whether packets are sent as expected. + * @param expectedNumberOfPackets Specify how many packets should be sent by each transaction. + * E.g. passing {2, 2, 1}, would generate 5 flow files in total. + * Based on the siteToSiteClientConfig batch parameters, + * it's expected to be sent via 3 transactions, + * transaction 0 will send flow file 0 and 1, + * transaction 1 will send flow file 2 and 3, + * and transaction 2 will send flow file 4. + * Each flow file has different content size generated automatically. + * The content size starts with 10, and increases as more flow files are generated. + * E.g. flow file 1 will have 10 bytes, flow file 2 has 11 bytes, f3 has 12 and so on. + * + */ + private void testSendBatch(final int[] expectedNumberOfPackets) throws Exception { + + setupMockProcessSession(); + + final String peerUrl = "http://node1.example.com:8080/nifi"; + final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false); + final HttpCommunicationsSession commsSession = new HttpCommunicationsSession(); + final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); + + final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files"; + + doReturn(peer).when(transaction).getCommunicant(); + commsSession.setDataTransferUrl(flowFileEndpointUri); + + // Capture packets being sent to the remote peer + final AtomicInteger totalPacketsSent = new AtomicInteger(0); + final List<List<DataPacket>> sentPackets = new ArrayList<>(expectedNumberOfPackets.length); + final List<DataPacket> sentPacketsPerTransaction = new ArrayList<>(); + doAnswer(invocation -> { + sentPacketsPerTransaction.add((DataPacket)invocation.getArguments()[0]); + totalPacketsSent.incrementAndGet(); + return null; + }).when(transaction).send(any(DataPacket.class)); + doAnswer(invocation -> { + sentPackets.add(new ArrayList<>(sentPacketsPerTransaction)); + sentPacketsPerTransaction.clear(); + return null; + }).when(transaction).confirm(); + + + // Execute onTrigger while offering new flow files. + final List<MockFlowFile> flowFiles = new ArrayList<>(); + for (int i = 0; i < expectedNumberOfPackets.length; i++) { + int numOfPackets = expectedNumberOfPackets[i]; + int startF = flowFiles.size(); + int endF = startF + numOfPackets; + IntStream.range(startF, endF).forEach(f -> { + final StringBuilder flowFileContents = new StringBuilder("0123456789"); + for (int c = 0; c < f; c++) { + flowFileContents.append(c); + } + final byte[] bytes = flowFileContents.toString().getBytes(); + final MockFlowFile flowFile = spy(processSession.createFlowFile(bytes)); + when(flowFile.getSize()).then(invocation -> { + Thread.sleep(1); // For testSendBatchByDuration + return bytes.length; + }); + sessionState.getFlowFileQueue().offer(flowFile); + flowFiles.add(flowFile); + }); + port.onTrigger(processContext, processSession); + } + + // Verify transactions, sent packets, and provenance events. + assertEquals(flowFiles.size(), totalPacketsSent.get()); + assertEquals("The number of transactions should match as expected.", expectedNumberOfPackets.length, sentPackets.size()); + final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents(); + assertEquals(flowFiles.size(), provenanceEvents.size()); + + int f = 0; + for (int i = 0; i < expectedNumberOfPackets.length; i++) { + final List<DataPacket> dataPackets = sentPackets.get(i); + assertEquals(expectedNumberOfPackets[i], dataPackets.size()); + + for (int p = 0; p < dataPackets.size(); p++) { + final FlowFile flowFile = flowFiles.get(f); + + // Assert sent packet + final DataPacket dataPacket = dataPackets.get(p); + assertEquals(flowFile.getSize(), dataPacket.getSize()); + + // Assert provenance event + final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(f); + assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); + assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri()); + + f++; + } + } + } + + @Test public void testReceiveHttp() throws Exception { setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.RECEIVE); http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java index 5a69cfe..690d404 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java @@ -93,6 +93,15 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { .setConvertName(PORT_NAME_CONVERT), new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Compressed", dto -> dto.getUseCompression() != null, RemoteGroupPort::isUseCompression) + .setConvertName(PORT_NAME_CONVERT), + new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Batch Count", + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getCount() != null, RemoteGroupPort::getBatchCount) + .setConvertName(PORT_NAME_CONVERT), + new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Batch Size", + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getSize() != null, RemoteGroupPort::getBatchSize) + .setConvertName(PORT_NAME_CONVERT), + new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Batch Duration", + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getDuration() != null, RemoteGroupPort::getBatchDuration) .setConvertName(PORT_NAME_CONVERT) ); http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index aaa33d0..5154811 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1493,6 +1493,12 @@ public final class DtoFactory { dto.setUseCompression(port.isUseCompression()); dto.setExists(port.getTargetExists()); + final BatchSettingsDTO batchDTO = new BatchSettingsDTO(); + batchDTO.setCount(port.getBatchCount()); + batchDTO.setSize(port.getBatchSize()); + batchDTO.setDuration(port.getBatchDuration()); + dto.setBatchSettings(batchDTO); + // determine if this port is currently connected to another component locally if (ConnectableType.REMOTE_OUTPUT_PORT.equals(port.getConnectableType())) { dto.setConnected(!port.getConnections().isEmpty()); @@ -2962,6 +2968,13 @@ public final class DtoFactory { copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount()); copy.setUseCompression(original.getUseCompression()); copy.setExists(original.getExists()); + final BatchSettingsDTO batchOrg = original.getBatchSettings(); + if (batchOrg != null) { + final BatchSettingsDTO batchCopy = new BatchSettingsDTO(); + batchCopy.setCount(batchOrg.getCount()); + batchCopy.setSize(batchOrg.getSize()); + batchCopy.setDuration(batchOrg.getDuration()); + } return copy; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index a93c410..0392442 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -29,10 +29,12 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; @@ -203,7 +205,9 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // verify update when appropriate - if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), remoteProcessGroupPortDto.getUseCompression())) { + if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), + remoteProcessGroupPortDto.getUseCompression(), + remoteProcessGroupPortDto.getBatchSettings())) { port.verifyCanUpdate(); } } @@ -219,6 +223,30 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot validationErrors.add(String.format("Concurrent tasks for port '%s' must be a positive integer.", remoteGroupPort.getName())); } + final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDTO.getBatchSettings(); + if (batchSettingsDTO != null) { + final Integer batchCount = batchSettingsDTO.getCount(); + if (isNotNull(batchCount) && batchCount < 0) { + validationErrors.add(String.format("Batch count for port '%s' must be a positive integer.", remoteGroupPort.getName())); + } + + final String batchSize = batchSettingsDTO.getSize(); + if (isNotNull(batchSize) && batchSize.length() > 0 + && !DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) { + validationErrors.add(String.format("Batch size for port '%s' must be of format <Data Size> <Data Unit>" + + " where <Data Size> is a non-negative integer and <Data Unit> is a supported Data" + + " Unit, such as: B, KB, MB, GB, TB", remoteGroupPort.getName())); + } + + final String batchDuration = batchSettingsDTO.getDuration(); + if (isNotNull(batchDuration) && batchDuration.length() > 0 + && !FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches()) { + validationErrors.add(String.format("Batch duration for port '%s' must be of format <duration> <TimeUnit>" + + " where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such " + + "as: nanos, millis, secs, mins, hrs, days", remoteGroupPort.getName())); + } + } + return validationErrors; } @@ -284,22 +312,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot verifyUpdatePort(port, remoteProcessGroupPortDto); // perform the update - if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) { - port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount()); - } - if (isNotNull(remoteProcessGroupPortDto.getUseCompression())) { - port.setUseCompression(remoteProcessGroupPortDto.getUseCompression()); - } - - final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting(); - if (isNotNull(isTransmitting)) { - // start or stop as necessary - if (!port.isRunning() && isTransmitting) { - remoteProcessGroup.startTransmitting(port); - } else if (port.isRunning() && !isTransmitting) { - remoteProcessGroup.stopTransmitting(port); - } - } + updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); return port; } @@ -318,6 +331,19 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot verifyUpdatePort(port, remoteProcessGroupPortDto); // perform the update + updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); + + return port; + } + + /** + * + * @param port Port instance to be updated. + * @param remoteProcessGroupPortDto DTO containing updated remote process group port settings. + * @param remoteProcessGroup If remoteProcessGroupPortDto has updated isTransmitting input, + * this method will start or stop the port in this remoteProcessGroup as necessary. + */ + private void updatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup) { if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) { port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount()); } @@ -325,6 +351,13 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot port.setUseCompression(remoteProcessGroupPortDto.getUseCompression()); } + final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDto.getBatchSettings(); + if (isNotNull(batchSettingsDTO)) { + port.setBatchCount(batchSettingsDTO.getCount()); + port.setBatchSize(batchSettingsDTO.getSize()); + port.setBatchDuration(batchSettingsDTO.getDuration()); + } + final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting(); if (isNotNull(isTransmitting)) { // start or stop as necessary @@ -334,8 +367,6 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot remoteProcessGroup.stopTransmitting(port); } } - - return port; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java index 725e4d4..ea7fa7d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java @@ -29,6 +29,7 @@ import org.apache.nifi.authorization.user.StandardNiFiUser; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; @@ -40,6 +41,7 @@ import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicReference; import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK; @@ -434,6 +436,13 @@ public class TestRemoteProcessGroupAuditor { when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount()); when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression()); + final BatchSettingsDTO batchSettings = inputRPGPortDTO.getBatchSettings(); + if (batchSettings != null) { + when(updatedRPGPort.getBatchCount()).thenReturn(batchSettings.getCount()); + when(updatedRPGPort.getBatchSize()).thenReturn(batchSettings.getSize()); + when(updatedRPGPort.getBatchDuration()).thenReturn(batchSettings.getDuration()); + } + when(joinPoint.proceed()).thenReturn(updatedRPGPort); // Capture added actions so that those can be asserted later. @@ -553,4 +562,34 @@ public class TestRemoteProcessGroupAuditor { assertConfigureDetails(action.getActionDetails(), "input-port-1.Compressed", "false", "true"); } + + @Test + public void testConfigurePortBatchSettings() throws Throwable { + + final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort(); + when(existingRPGPort.getName()).thenReturn("input-port-1"); + + final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO(); + final BatchSettingsDTO batchSettingsDTO = new BatchSettingsDTO(); + batchSettingsDTO.setCount(1234); + batchSettingsDTO.setSize("64KB"); + batchSettingsDTO.setDuration("10sec"); + inputRPGPortDTO.setBatchSettings(batchSettingsDTO); + + final Collection<Action> actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort); + + assertEquals(3, actions.size()); + final Iterator<Action> iterator = actions.iterator(); + Action action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Count", "0", "1234"); + + action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Size", "", "64KB"); + + action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Duration", "", "10sec"); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java new file mode 100644 index 0000000..f68a115 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.dao.impl; + +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.exception.ValidationException; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestStandardRemoteProcessGroupDAO { + + private void validate(final StandardRemoteProcessGroupDAO dao, final RemoteProcessGroupPortDTO dto, final String ... errMessageKeywords) { + try { + dao.verifyUpdateInputPort(dto.getGroupId(), dto); + if (errMessageKeywords.length > 0) { + fail("Validation should fail with keywords: " + Arrays.asList(errMessageKeywords)); + } + } catch (ValidationException e) { + if (errMessageKeywords.length == 0) { + fail("Validation should pass, but failed with: " + e); + } + final List<String> validationErrors = e.getValidationErrors(); + assertEquals("Validation should return one validationErrors", 1, validationErrors.size()); + final String validationError = validationErrors.get(0); + for (String errMessageKeyword : errMessageKeywords) { + assertTrue("validation error message should contain " + errMessageKeyword + ", but was: " + validationError, + validationError.contains(errMessageKeyword)); + } + } + } + + @Test + public void testVerifyUpdateInputPort() { + final StandardRemoteProcessGroupDAO dao = new StandardRemoteProcessGroupDAO(); + + final String remoteProcessGroupId = "remote-process-group-id"; + final String remoteProcessGroupInputPortId = "remote-process-group-input-port-id"; + + final FlowController flowController = mock(FlowController.class); + final ProcessGroup processGroup = mock(ProcessGroup.class); + final RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class); + final RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class); + + dao.setFlowController(flowController); + when(flowController.getGroup(any())).thenReturn(processGroup); + when(processGroup.findRemoteProcessGroup(eq(remoteProcessGroupId))).thenReturn(remoteProcessGroup); + when(remoteProcessGroup.getInputPort(remoteProcessGroupInputPortId)).thenReturn(remoteGroupPort); + when(remoteGroupPort.getName()).thenReturn("remote-group-port"); + + final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO(); + dto.setGroupId(remoteProcessGroupId); + dto.setId(remoteProcessGroupInputPortId); + final BatchSettingsDTO batchSettings = new BatchSettingsDTO(); + dto.setBatchSettings(batchSettings); + + // Empty input values should pass validation. + dao.verifyUpdateInputPort(remoteProcessGroupId, dto); + + // Concurrent tasks + dto.setConcurrentlySchedulableTaskCount(0); + validate(dao, dto, "Concurrent tasks", "positive integer"); + + dto.setConcurrentlySchedulableTaskCount(2); + validate(dao, dto); + + // Batch count + batchSettings.setCount(-1); + validate(dao, dto, "Batch count", "positive integer"); + + batchSettings.setCount(0); + validate(dao, dto); + + batchSettings.setCount(1000); + validate(dao, dto); + + // Batch size + batchSettings.setSize("AB"); + validate(dao, dto, "Batch size", "Data Size"); + + batchSettings.setSize("10 days"); + validate(dao, dto, "Batch size", "Data Size"); + + batchSettings.setSize("300MB"); + validate(dao, dto); + + // Batch duration + batchSettings.setDuration("AB"); + validate(dao, dto, "Batch duration", "Time Unit"); + + batchSettings.setDuration("10 KB"); + validate(dao, dto, "Batch duration", "Time Unit"); + + batchSettings.setDuration("10 secs"); + validate(dao, dto); + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp index 8f38369..a62700c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp @@ -37,6 +37,40 @@ <span class="nf-checkbox-label">Compressed</span> </div> </div> + <div class="clear"></div> + </div> + <div class="batch-settings"> + <div class="setting-name"> + Batch Settings: + </div> + <div class="setting batch-setting"> + <div class="setting-name"> + Count + <div class="fa fa-question-circle" alt="Info" title="The preferred number of flow files to include in a transaction for this port."></div> + </div> + <div class="setting-field"> + <input id="remote-port-batch-count" type="text"/> + </div> + </div> + <div class="setting batch-setting"> + <div class="setting-name"> + Size + <div class="fa fa-question-circle" alt="Info" title="The preferred number of bytes to include in a transaction for this port."></div> + </div> + <div class="setting-field"> + <input id="remote-port-batch-size" type="text"/> + </div> + </div> + <div class="setting batch-setting"> + <div class="setting-name"> + Duration + <div class="fa fa-question-circle" alt="Info" title="The preferred amount of time that a transaction should span for this port."></div> + </div> + <div class="setting-field"> + <input id="remote-port-batch-duration" type="text"/> + </div> + </div> + <div class="clear"></div> </div> </div> </div> http://git-wip-us.apache.org/repos/asf/nifi/blob/a41a2a9b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css index 58dcfcd..4a3fad4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css @@ -125,6 +125,7 @@ div.remote-port-description { div.concurrent-task-container { float: left; + width: 200px; } img.concurrent-tasks-info { @@ -133,7 +134,11 @@ img.concurrent-tasks-info { } div.compression-container { - float: right; + float: left; +} + +div.batch-settings-container { + margin-top: 8px; } div.remote-port-transmission-container { @@ -165,7 +170,7 @@ div.disabled-transmission-switch { #remote-port-concurrent-tasks { font-size: 11px !important; float: left; - width: 75%; + width: 266px; } #remote-port-use-compression-container { @@ -179,4 +184,10 @@ div.disabled-transmission-switch { #remote-port-concurrent-task-header { margin-top: 5px; -} \ No newline at end of file +} + +div.batch-setting { + margin-right: 8px; + width: 30%; + float: left; +}
