Repository: nifi Updated Branches: refs/heads/master f7f809c3d -> ff0005026
NIFI-5241: Updated EventSumValue to use synchronized methods instead of many atomic values. This is more efficient and uses less heap. Also noticed that the Logger instance in ProcessorNode was not used so removed it, and in testing this also noticed that the default connection pool size for OkHttpReplicationClient was only 5, which can cause a lot of unnecessary HTTP connections to be created so adjusted the pool size NIFI-5241: Extended timeout that Jetty uses before closing an active HTTP connection. Because the UI refreshes every 30 seconds by default, and the Jetty connection pool times out every 30 seconds by default, we very frequently saw new HTTP connections being created for the UI refreshes. This resulted in 4 new connections and 4 SSL handshakes occurring every 30 seconds. By extending the timeout, we now see those connections being reused and SSL Handshakes no longer occurring frequently NIFI-5241: Set Jetty idle timeout to double the amount of time for browser to refresh NIFI-5241: Fixed synchronization issue with EventSumValue This closes #2752 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ff000502 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ff000502 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ff000502 Branch: refs/heads/master Commit: ff00050266e1b607babd957253ecce08ec324c1a Parents: f7f809c Author: Mark Payne <[email protected]> Authored: Fri Jun 1 10:21:17 2018 -0400 Committer: Matt Gilman <[email protected]> Committed: Thu Jun 7 09:15:32 2018 -0400 ---------------------------------------------------------------------- .../okhttp/OkHttpReplicationClient.java | 3 + .../apache/nifi/controller/ProcessorNode.java | 4 - .../repository/metrics/EventSumValue.java | 232 +++++++------------ .../org/apache/nifi/web/server/JettyServer.java | 12 + 4 files changed, 102 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ff000502/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java index 3df44ec..c4016de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java @@ -68,6 +68,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import okhttp3.Call; +import okhttp3.ConnectionPool; import okhttp3.Headers; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -289,6 +290,8 @@ public class OkHttpReplicationClient implements HttpReplicationClient { okHttpClientBuilder.connectTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS); okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS); okHttpClientBuilder.followRedirects(true); + final int connectionPoolSize = properties.getClusterNodeMaxConcurrentRequests(); + okHttpClientBuilder.connectionPool(new ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES)); final Tuple<SSLSocketFactory, X509TrustManager> tuple = createSslSocketFactory(properties); if (tuple != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/ff000502/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index cfe979a..0da86d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -38,13 +38,9 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class ProcessorNode extends AbstractComponentNode implements Connectable { - private static final Logger logger = LoggerFactory.getLogger(ProcessorNode.class); - protected final AtomicReference<ScheduledState> scheduledState; public ProcessorNode(final String id, http://git-wip-us.apache.org/repos/asf/nifi/blob/ff000502/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java index 3306e2b..90990df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java @@ -18,34 +18,31 @@ package org.apache.nifi.controller.repository.metrics; import java.util.Collections; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.controller.repository.FlowFileEvent; public class EventSumValue { - private final AtomicInteger flowFilesIn = new AtomicInteger(0); - private final AtomicInteger flowFilesOut = new AtomicInteger(0); - private final AtomicInteger flowFilesRemoved = new AtomicInteger(0); - private final AtomicInteger flowFilesReceived = new AtomicInteger(0); - private final AtomicInteger flowFilesSent = new AtomicInteger(0); - - private final AtomicLong contentSizeIn = new AtomicLong(0L); - private final AtomicLong contentSizeOut = new AtomicLong(0L); - private final AtomicLong contentSizeRemoved = new AtomicLong(0L); - private final AtomicLong bytesRead = new AtomicLong(0L); - private final AtomicLong bytesWritten = new AtomicLong(0L); - - private final AtomicLong bytesReceived = new AtomicLong(0L); - private final AtomicLong bytesSent = new AtomicLong(0L); - private final AtomicLong processingNanos = new AtomicLong(0L); - private final AtomicLong aggregateLineageMillis = new AtomicLong(0L); - private final AtomicInteger invocations = new AtomicInteger(0); - private final ConcurrentMap<String, Long> counters = new ConcurrentHashMap<>(); + private int flowFilesIn = 0; + private int flowFilesOut = 0; + private int flowFilesRemoved = 0; + private int flowFilesReceived = 0; + private int flowFilesSent = 0; + + private long contentSizeIn = 0; + private long contentSizeOut = 0; + private long contentSizeRemoved = 0; + private long bytesRead = 0; + private long bytesWritten = 0; + + private long bytesReceived = 0; + private long bytesSent = 0; + private long processingNanos = 0; + private long aggregateLineageMillis = 0; + private int invocations = 0; + private Map<String, Long> counters; private final long minuteTimestamp; private final long millisecondTimestamp; @@ -56,22 +53,22 @@ public class EventSumValue { this.minuteTimestamp = millisecondTimestamp / 60000; } - public void add(final FlowFileEvent flowFileEvent) { - this.aggregateLineageMillis.addAndGet(flowFileEvent.getAggregateLineageMillis()); - this.bytesRead.addAndGet(flowFileEvent.getBytesRead()); - this.bytesReceived.addAndGet(flowFileEvent.getBytesReceived()); - this.bytesSent.addAndGet(flowFileEvent.getBytesSent()); - this.bytesWritten.addAndGet(flowFileEvent.getBytesWritten()); - this.contentSizeIn.addAndGet(flowFileEvent.getContentSizeIn()); - this.contentSizeOut.addAndGet(flowFileEvent.getContentSizeOut()); - this.contentSizeRemoved.addAndGet(flowFileEvent.getContentSizeRemoved()); - this.flowFilesIn.addAndGet(flowFileEvent.getFlowFilesIn()); - this.flowFilesOut.addAndGet(flowFileEvent.getFlowFilesOut()); - this.flowFilesReceived.addAndGet(flowFileEvent.getFlowFilesReceived()); - this.flowFilesRemoved.addAndGet(flowFileEvent.getFlowFilesRemoved()); - this.flowFilesSent.addAndGet(flowFileEvent.getFlowFilesSent()); - this.invocations.addAndGet(flowFileEvent.getInvocations()); - this.processingNanos.addAndGet(flowFileEvent.getProcessingNanoseconds()); + public synchronized void add(final FlowFileEvent flowFileEvent) { + this.aggregateLineageMillis += flowFileEvent.getAggregateLineageMillis(); + this.bytesRead += flowFileEvent.getBytesRead(); + this.bytesReceived += flowFileEvent.getBytesReceived(); + this.bytesSent += flowFileEvent.getBytesSent(); + this.bytesWritten += flowFileEvent.getBytesWritten(); + this.contentSizeIn += flowFileEvent.getContentSizeIn(); + this.contentSizeOut += flowFileEvent.getContentSizeOut(); + this.contentSizeRemoved += flowFileEvent.getContentSizeRemoved(); + this.flowFilesIn += flowFileEvent.getFlowFilesIn(); + this.flowFilesOut += flowFileEvent.getFlowFilesOut(); + this.flowFilesReceived += flowFileEvent.getFlowFilesReceived(); + this.flowFilesRemoved += flowFileEvent.getFlowFilesRemoved(); + this.flowFilesSent += flowFileEvent.getFlowFilesSent(); + this.invocations += flowFileEvent.getInvocations(); + this.processingNanos += flowFileEvent.getProcessingNanoseconds(); final Map<String, Long> eventCounters = flowFileEvent.getCounters(); if (eventCounters != null) { @@ -79,129 +76,74 @@ public class EventSumValue { final String counterName = entry.getKey(); final Long counterValue = entry.getValue(); + if (counters == null) { + counters = new HashMap<>(); + } counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue); } } } - public FlowFileEvent toFlowFileEvent(final String componentId) { + public synchronized FlowFileEvent toFlowFileEvent(final String componentId) { final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId); - event.setAggregateLineageMillis(getAggregateLineageMillis()); - event.setBytesRead(getBytesRead()); - event.setBytesReceived(getBytesReceived()); - event.setBytesSent(getBytesSent()); - event.setBytesWritten(getBytesWritten()); - event.setContentSizeIn(getContentSizeIn()); - event.setContentSizeOut(getContentSizeOut()); - event.setContentSizeRemoved(getContentSizeRemoved()); - event.setFlowFilesIn(getFlowFilesIn()); - event.setFlowFilesOut(getFlowFilesOut()); - event.setFlowFilesReceived(getFlowFilesReceived()); - event.setFlowFilesRemoved(getFlowFilesRemoved()); - event.setFlowFilesSent(getFlowFilesSent()); - event.setInvocations(getInvocations()); - event.setProcessingNanos(getProcessingNanoseconds()); - event.setCounters(Collections.unmodifiableMap(this.counters)); + event.setAggregateLineageMillis(aggregateLineageMillis); + event.setBytesRead(bytesRead); + event.setBytesReceived(bytesReceived); + event.setBytesSent(bytesSent); + event.setBytesWritten(bytesWritten); + event.setContentSizeIn(contentSizeIn); + event.setContentSizeOut(contentSizeOut); + event.setContentSizeRemoved(contentSizeRemoved); + event.setFlowFilesIn(flowFilesIn); + event.setFlowFilesOut(flowFilesOut); + event.setFlowFilesReceived(flowFilesReceived); + event.setFlowFilesRemoved(flowFilesRemoved); + event.setFlowFilesSent(flowFilesSent); + event.setInvocations(invocations); + event.setProcessingNanos(processingNanos); + event.setCounters(this.counters == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.counters)); return event; } - public void add(final EventSumValue other) { - this.aggregateLineageMillis.addAndGet(other.getAggregateLineageMillis()); - this.bytesRead.addAndGet(other.getBytesRead()); - this.bytesReceived.addAndGet(other.getBytesReceived()); - this.bytesSent.addAndGet(other.getBytesSent()); - this.bytesWritten.addAndGet(other.getBytesWritten()); - this.contentSizeIn.addAndGet(other.getContentSizeIn()); - this.contentSizeOut.addAndGet(other.getContentSizeOut()); - this.contentSizeRemoved.addAndGet(other.getContentSizeRemoved()); - this.flowFilesIn.addAndGet(other.getFlowFilesIn()); - this.flowFilesOut.addAndGet(other.getFlowFilesOut()); - this.flowFilesReceived.addAndGet(other.getFlowFilesReceived()); - this.flowFilesRemoved.addAndGet(other.getFlowFilesRemoved()); - this.flowFilesSent.addAndGet(other.getFlowFilesSent()); - this.invocations.addAndGet(other.getInvocations()); - this.processingNanos.addAndGet(other.getProcessingNanoseconds()); - - final Map<String, Long> eventCounters = other.getCounters(); - if (eventCounters != null) { - for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) { - final String counterName = entry.getKey(); - final Long counterValue = entry.getValue(); - - counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue); + public synchronized void add(final EventSumValue other) { + synchronized (other) { + this.aggregateLineageMillis += other.aggregateLineageMillis; + this.bytesRead += other.bytesRead; + this.bytesReceived += other.bytesReceived; + this.bytesSent += other.bytesSent; + this.bytesWritten += other.bytesWritten; + this.contentSizeIn += other.contentSizeIn; + this.contentSizeOut += other.contentSizeOut; + this.contentSizeRemoved += other.contentSizeRemoved; + this.flowFilesIn += other.flowFilesIn; + this.flowFilesOut += other.flowFilesOut; + this.flowFilesReceived += other.flowFilesReceived; + this.flowFilesRemoved += other.flowFilesRemoved; + this.flowFilesSent += other.flowFilesSent; + this.invocations += other.invocations; + this.processingNanos += other.processingNanos; + + final Map<String, Long> eventCounters = other.counters; + if (eventCounters != null) { + if (counters == null) { + counters = new HashMap<>(); + } + + for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) { + final String counterName = entry.getKey(); + final Long counterValue = entry.getValue(); + + counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue); + } } } } - public long getTimestamp() { - return millisecondTimestamp; - } - public long getMinuteTimestamp() { return minuteTimestamp; } - public long getBytesRead() { - return bytesRead.get(); - } - - public long getBytesWritten() { - return bytesWritten.get(); - } - - public int getFlowFilesIn() { - return flowFilesIn.get(); - } - - public int getFlowFilesOut() { - return flowFilesOut.get(); - } - - public long getContentSizeIn() { - return contentSizeIn.get(); - } - - public long getContentSizeOut() { - return contentSizeOut.get(); - } - - public int getFlowFilesRemoved() { - return flowFilesRemoved.get(); - } - - public long getContentSizeRemoved() { - return contentSizeRemoved.get(); - } - - public long getProcessingNanoseconds() { - return processingNanos.get(); - } - - public int getInvocations() { - return invocations.get(); - } - - public long getAggregateLineageMillis() { - return aggregateLineageMillis.get(); - } - - public int getFlowFilesReceived() { - return flowFilesReceived.get(); - } - - public int getFlowFilesSent() { - return flowFilesSent.get(); - } - - public long getBytesReceived() { - return bytesReceived.get(); - } - - public long getBytesSent() { - return bytesSent.get(); - } - - public Map<String, Long> getCounters() { - return counters; + public long getTimestamp() { + return millisecondTimestamp; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/ff000502/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java index 83b489f..ac6ec90 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.stream.Collectors; @@ -68,6 +69,7 @@ import org.apache.nifi.security.util.KeyStoreUtils; import org.apache.nifi.services.FlowService; import org.apache.nifi.ui.extension.UiExtension; import org.apache.nifi.ui.extension.UiExtensionMapping; +import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ContentAccess; import org.apache.nifi.web.NiFiWebConfigurationContext; @@ -680,6 +682,13 @@ public class JettyServer implements NiFiServer { final List<Connector> serverConnectors = Lists.newArrayList(); + // Calculate Idle Timeout as twice the auto-refresh interval. This ensures that even with some variance in timing, + // we are able to avoid closing connections from users' browsers most of the time. This can make a significant difference + // in HTTPS connections, as each HTTPS connection that is established must perform the SSL handshake. + final String autoRefreshInterval = props.getAutoRefreshInterval(); + final long autoRefreshMillis = autoRefreshInterval == null ? 30000L : FormatUtils.getTimeDuration(autoRefreshInterval, TimeUnit.MILLISECONDS); + final long idleTimeout = autoRefreshMillis * 2; + // If the interfaces collection is empty or each element is empty if (networkInterfaces.isEmpty() || networkInterfaces.values().stream().filter(value -> !Strings.isNullOrEmpty(value)).collect(Collectors.toList()).isEmpty()) { final ServerConnector serverConnector = serverConnectorCreator.create(server, configuration); @@ -689,6 +698,7 @@ public class JettyServer implements NiFiServer { serverConnector.setHost(hostname); } serverConnector.setPort(port); + serverConnector.setIdleTimeout(idleTimeout); serverConnectors.add(serverConnector); } else { // Add connectors for all IPs from network interfaces @@ -710,6 +720,8 @@ public class JettyServer implements NiFiServer { // Set host and port serverConnector.setHost(inetAddress.getHostAddress()); serverConnector.setPort(port); + serverConnector.setIdleTimeout(idleTimeout); + return serverConnector; }).collect(Collectors.toList()))); }
