Repository: nifi
Updated Branches:
  refs/heads/master f7f809c3d -> ff0005026


NIFI-5241: Updated EventSumValue to use synchronized methods instead of many 
atomic values. This is more efficient and uses less heap. Also noticed that the 
Logger instance in ProcessorNode was not used so removed it, and in testing 
this also noticed that the default connection pool size for 
OkHttpReplicationClient was only 5, which can cause a lot of unnecessary HTTP 
connections to be created so adjusted the pool size
NIFI-5241: Extended timeout that Jetty uses before closing an active HTTP 
connection. Because the UI refreshes every 30 seconds by default, and the Jetty 
connection pool times out every 30 seconds by default, we very frequently saw 
new HTTP connections being created for the UI refreshes. This resulted in 4 new 
connections and 4 SSL handshakes occurring every 30 seconds. By extending the 
timeout, we now see those connections being reused and SSL Handshakes no longer 
occurring frequently
NIFI-5241: Set Jetty idle timeout to double the amount of time for browser to 
refresh
NIFI-5241: Fixed synchronization issue with EventSumValue
This closes #2752


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ff000502
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ff000502
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ff000502

Branch: refs/heads/master
Commit: ff00050266e1b607babd957253ecce08ec324c1a
Parents: f7f809c
Author: Mark Payne <[email protected]>
Authored: Fri Jun 1 10:21:17 2018 -0400
Committer: Matt Gilman <[email protected]>
Committed: Thu Jun 7 09:15:32 2018 -0400

----------------------------------------------------------------------
 .../okhttp/OkHttpReplicationClient.java         |   3 +
 .../apache/nifi/controller/ProcessorNode.java   |   4 -
 .../repository/metrics/EventSumValue.java       | 232 +++++++------------
 .../org/apache/nifi/web/server/JettyServer.java |  12 +
 4 files changed, 102 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ff000502/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
index 3df44ec..c4016de 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
@@ -68,6 +68,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
 
 import okhttp3.Call;
+import okhttp3.ConnectionPool;
 import okhttp3.Headers;
 import okhttp3.HttpUrl;
 import okhttp3.MediaType;
@@ -289,6 +290,8 @@ public class OkHttpReplicationClient implements 
HttpReplicationClient {
         okHttpClientBuilder.connectTimeout(connectionTimeoutMs, 
TimeUnit.MILLISECONDS);
         okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS);
         okHttpClientBuilder.followRedirects(true);
+        final int connectionPoolSize = 
properties.getClusterNodeMaxConcurrentRequests();
+        okHttpClientBuilder.connectionPool(new 
ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
 
         final Tuple<SSLSocketFactory, X509TrustManager> tuple = 
createSslSocketFactory(properties);
         if (tuple != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ff000502/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index cfe979a..0da86d3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -38,13 +38,9 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public abstract class ProcessorNode extends AbstractComponentNode implements 
Connectable {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ProcessorNode.class);
-
     protected final AtomicReference<ScheduledState> scheduledState;
 
     public ProcessorNode(final String id,

http://git-wip-us.apache.org/repos/asf/nifi/blob/ff000502/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
index 3306e2b..90990df 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
@@ -18,34 +18,31 @@
 package org.apache.nifi.controller.repository.metrics;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.repository.FlowFileEvent;
 
 public class EventSumValue {
 
-    private final AtomicInteger flowFilesIn = new AtomicInteger(0);
-    private final AtomicInteger flowFilesOut = new AtomicInteger(0);
-    private final AtomicInteger flowFilesRemoved = new AtomicInteger(0);
-    private final AtomicInteger flowFilesReceived = new AtomicInteger(0);
-    private final AtomicInteger flowFilesSent = new AtomicInteger(0);
-
-    private final AtomicLong contentSizeIn = new AtomicLong(0L);
-    private final AtomicLong contentSizeOut = new AtomicLong(0L);
-    private final AtomicLong contentSizeRemoved = new AtomicLong(0L);
-    private final AtomicLong bytesRead = new AtomicLong(0L);
-    private final AtomicLong bytesWritten = new AtomicLong(0L);
-
-    private final AtomicLong bytesReceived = new AtomicLong(0L);
-    private final AtomicLong bytesSent = new AtomicLong(0L);
-    private final AtomicLong processingNanos = new AtomicLong(0L);
-    private final AtomicLong aggregateLineageMillis = new AtomicLong(0L);
-    private final AtomicInteger invocations = new AtomicInteger(0);
-    private final ConcurrentMap<String, Long> counters = new 
ConcurrentHashMap<>();
+    private int flowFilesIn = 0;
+    private int flowFilesOut = 0;
+    private int flowFilesRemoved = 0;
+    private int flowFilesReceived = 0;
+    private int flowFilesSent = 0;
+
+    private long contentSizeIn = 0;
+    private long contentSizeOut = 0;
+    private long contentSizeRemoved = 0;
+    private long bytesRead = 0;
+    private long bytesWritten = 0;
+
+    private long bytesReceived = 0;
+    private long bytesSent = 0;
+    private long processingNanos = 0;
+    private long aggregateLineageMillis = 0;
+    private int invocations = 0;
+    private Map<String, Long> counters;
 
     private final long minuteTimestamp;
     private final long millisecondTimestamp;
@@ -56,22 +53,22 @@ public class EventSumValue {
         this.minuteTimestamp = millisecondTimestamp / 60000;
     }
 
-    public void add(final FlowFileEvent flowFileEvent) {
-        
this.aggregateLineageMillis.addAndGet(flowFileEvent.getAggregateLineageMillis());
-        this.bytesRead.addAndGet(flowFileEvent.getBytesRead());
-        this.bytesReceived.addAndGet(flowFileEvent.getBytesReceived());
-        this.bytesSent.addAndGet(flowFileEvent.getBytesSent());
-        this.bytesWritten.addAndGet(flowFileEvent.getBytesWritten());
-        this.contentSizeIn.addAndGet(flowFileEvent.getContentSizeIn());
-        this.contentSizeOut.addAndGet(flowFileEvent.getContentSizeOut());
-        
this.contentSizeRemoved.addAndGet(flowFileEvent.getContentSizeRemoved());
-        this.flowFilesIn.addAndGet(flowFileEvent.getFlowFilesIn());
-        this.flowFilesOut.addAndGet(flowFileEvent.getFlowFilesOut());
-        this.flowFilesReceived.addAndGet(flowFileEvent.getFlowFilesReceived());
-        this.flowFilesRemoved.addAndGet(flowFileEvent.getFlowFilesRemoved());
-        this.flowFilesSent.addAndGet(flowFileEvent.getFlowFilesSent());
-        this.invocations.addAndGet(flowFileEvent.getInvocations());
-        
this.processingNanos.addAndGet(flowFileEvent.getProcessingNanoseconds());
+    public synchronized void add(final FlowFileEvent flowFileEvent) {
+        this.aggregateLineageMillis += 
flowFileEvent.getAggregateLineageMillis();
+        this.bytesRead += flowFileEvent.getBytesRead();
+        this.bytesReceived += flowFileEvent.getBytesReceived();
+        this.bytesSent += flowFileEvent.getBytesSent();
+        this.bytesWritten += flowFileEvent.getBytesWritten();
+        this.contentSizeIn += flowFileEvent.getContentSizeIn();
+        this.contentSizeOut += flowFileEvent.getContentSizeOut();
+        this.contentSizeRemoved += flowFileEvent.getContentSizeRemoved();
+        this.flowFilesIn += flowFileEvent.getFlowFilesIn();
+        this.flowFilesOut += flowFileEvent.getFlowFilesOut();
+        this.flowFilesReceived += flowFileEvent.getFlowFilesReceived();
+        this.flowFilesRemoved += flowFileEvent.getFlowFilesRemoved();
+        this.flowFilesSent += flowFileEvent.getFlowFilesSent();
+        this.invocations += flowFileEvent.getInvocations();
+        this.processingNanos += flowFileEvent.getProcessingNanoseconds();
 
         final Map<String, Long> eventCounters = flowFileEvent.getCounters();
         if (eventCounters != null) {
@@ -79,129 +76,74 @@ public class EventSumValue {
                 final String counterName = entry.getKey();
                 final Long counterValue = entry.getValue();
 
+                if (counters == null) {
+                    counters = new HashMap<>();
+                }
                 counters.compute(counterName, (key, value) -> value == null ? 
counterValue : value + counterValue);
             }
         }
     }
 
-    public FlowFileEvent toFlowFileEvent(final String componentId) {
+    public synchronized FlowFileEvent toFlowFileEvent(final String 
componentId) {
         final StandardFlowFileEvent event = new 
StandardFlowFileEvent(componentId);
-        event.setAggregateLineageMillis(getAggregateLineageMillis());
-        event.setBytesRead(getBytesRead());
-        event.setBytesReceived(getBytesReceived());
-        event.setBytesSent(getBytesSent());
-        event.setBytesWritten(getBytesWritten());
-        event.setContentSizeIn(getContentSizeIn());
-        event.setContentSizeOut(getContentSizeOut());
-        event.setContentSizeRemoved(getContentSizeRemoved());
-        event.setFlowFilesIn(getFlowFilesIn());
-        event.setFlowFilesOut(getFlowFilesOut());
-        event.setFlowFilesReceived(getFlowFilesReceived());
-        event.setFlowFilesRemoved(getFlowFilesRemoved());
-        event.setFlowFilesSent(getFlowFilesSent());
-        event.setInvocations(getInvocations());
-        event.setProcessingNanos(getProcessingNanoseconds());
-        event.setCounters(Collections.unmodifiableMap(this.counters));
+        event.setAggregateLineageMillis(aggregateLineageMillis);
+        event.setBytesRead(bytesRead);
+        event.setBytesReceived(bytesReceived);
+        event.setBytesSent(bytesSent);
+        event.setBytesWritten(bytesWritten);
+        event.setContentSizeIn(contentSizeIn);
+        event.setContentSizeOut(contentSizeOut);
+        event.setContentSizeRemoved(contentSizeRemoved);
+        event.setFlowFilesIn(flowFilesIn);
+        event.setFlowFilesOut(flowFilesOut);
+        event.setFlowFilesReceived(flowFilesReceived);
+        event.setFlowFilesRemoved(flowFilesRemoved);
+        event.setFlowFilesSent(flowFilesSent);
+        event.setInvocations(invocations);
+        event.setProcessingNanos(processingNanos);
+        event.setCounters(this.counters == null ? Collections.emptyMap() : 
Collections.unmodifiableMap(this.counters));
         return event;
     }
 
-    public void add(final EventSumValue other) {
-        
this.aggregateLineageMillis.addAndGet(other.getAggregateLineageMillis());
-        this.bytesRead.addAndGet(other.getBytesRead());
-        this.bytesReceived.addAndGet(other.getBytesReceived());
-        this.bytesSent.addAndGet(other.getBytesSent());
-        this.bytesWritten.addAndGet(other.getBytesWritten());
-        this.contentSizeIn.addAndGet(other.getContentSizeIn());
-        this.contentSizeOut.addAndGet(other.getContentSizeOut());
-        this.contentSizeRemoved.addAndGet(other.getContentSizeRemoved());
-        this.flowFilesIn.addAndGet(other.getFlowFilesIn());
-        this.flowFilesOut.addAndGet(other.getFlowFilesOut());
-        this.flowFilesReceived.addAndGet(other.getFlowFilesReceived());
-        this.flowFilesRemoved.addAndGet(other.getFlowFilesRemoved());
-        this.flowFilesSent.addAndGet(other.getFlowFilesSent());
-        this.invocations.addAndGet(other.getInvocations());
-        this.processingNanos.addAndGet(other.getProcessingNanoseconds());
-
-        final Map<String, Long> eventCounters = other.getCounters();
-        if (eventCounters != null) {
-            for (final Map.Entry<String, Long> entry : 
eventCounters.entrySet()) {
-                final String counterName = entry.getKey();
-                final Long counterValue = entry.getValue();
-
-                counters.compute(counterName, (key, value) -> value == null ? 
counterValue : value + counterValue);
+    public synchronized void add(final EventSumValue other) {
+        synchronized (other) {
+            this.aggregateLineageMillis += other.aggregateLineageMillis;
+            this.bytesRead += other.bytesRead;
+            this.bytesReceived += other.bytesReceived;
+            this.bytesSent += other.bytesSent;
+            this.bytesWritten += other.bytesWritten;
+            this.contentSizeIn += other.contentSizeIn;
+            this.contentSizeOut += other.contentSizeOut;
+            this.contentSizeRemoved += other.contentSizeRemoved;
+            this.flowFilesIn += other.flowFilesIn;
+            this.flowFilesOut += other.flowFilesOut;
+            this.flowFilesReceived += other.flowFilesReceived;
+            this.flowFilesRemoved += other.flowFilesRemoved;
+            this.flowFilesSent += other.flowFilesSent;
+            this.invocations += other.invocations;
+            this.processingNanos += other.processingNanos;
+
+            final Map<String, Long> eventCounters = other.counters;
+            if (eventCounters != null) {
+                if (counters == null) {
+                    counters = new HashMap<>();
+                }
+
+                for (final Map.Entry<String, Long> entry : 
eventCounters.entrySet()) {
+                    final String counterName = entry.getKey();
+                    final Long counterValue = entry.getValue();
+
+                    counters.compute(counterName, (key, value) -> value == 
null ? counterValue : value + counterValue);
+                }
             }
         }
     }
 
-    public long getTimestamp() {
-        return millisecondTimestamp;
-    }
-
     public long getMinuteTimestamp() {
         return minuteTimestamp;
     }
 
-    public long getBytesRead() {
-        return bytesRead.get();
-    }
-
-    public long getBytesWritten() {
-        return bytesWritten.get();
-    }
-
-    public int getFlowFilesIn() {
-        return flowFilesIn.get();
-    }
-
-    public int getFlowFilesOut() {
-        return flowFilesOut.get();
-    }
-
-    public long getContentSizeIn() {
-        return contentSizeIn.get();
-    }
-
-    public long getContentSizeOut() {
-        return contentSizeOut.get();
-    }
-
-    public int getFlowFilesRemoved() {
-        return flowFilesRemoved.get();
-    }
-
-    public long getContentSizeRemoved() {
-        return contentSizeRemoved.get();
-    }
-
-    public long getProcessingNanoseconds() {
-        return processingNanos.get();
-    }
-
-    public int getInvocations() {
-        return invocations.get();
-    }
-
-    public long getAggregateLineageMillis() {
-        return aggregateLineageMillis.get();
-    }
-
-    public int getFlowFilesReceived() {
-        return flowFilesReceived.get();
-    }
-
-    public int getFlowFilesSent() {
-        return flowFilesSent.get();
-    }
-
-    public long getBytesReceived() {
-        return bytesReceived.get();
-    }
-
-    public long getBytesSent() {
-        return bytesSent.get();
-    }
-
-    public Map<String, Long> getCounters() {
-        return counters;
+    public long getTimestamp() {
+        return millisecondTimestamp;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ff000502/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
index 83b489f..ac6ec90 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.stream.Collectors;
@@ -68,6 +69,7 @@ import org.apache.nifi.security.util.KeyStoreUtils;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.ui.extension.UiExtension;
 import org.apache.nifi.ui.extension.UiExtensionMapping;
+import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.ContentAccess;
 import org.apache.nifi.web.NiFiWebConfigurationContext;
@@ -680,6 +682,13 @@ public class JettyServer implements NiFiServer {
 
         final List<Connector> serverConnectors = Lists.newArrayList();
 
+        // Calculate Idle Timeout as twice the auto-refresh interval. This 
ensures that even with some variance in timing,
+        // we are able to avoid closing connections from users' browsers most 
of the time. This can make a significant difference
+        // in HTTPS connections, as each HTTPS connection that is established 
must perform the SSL handshake.
+        final String autoRefreshInterval = props.getAutoRefreshInterval();
+        final long autoRefreshMillis = autoRefreshInterval == null ? 30000L : 
FormatUtils.getTimeDuration(autoRefreshInterval, TimeUnit.MILLISECONDS);
+        final long idleTimeout = autoRefreshMillis * 2;
+
         // If the interfaces collection is empty or each element is empty
         if (networkInterfaces.isEmpty() || 
networkInterfaces.values().stream().filter(value -> 
!Strings.isNullOrEmpty(value)).collect(Collectors.toList()).isEmpty()) {
             final ServerConnector serverConnector = 
serverConnectorCreator.create(server, configuration);
@@ -689,6 +698,7 @@ public class JettyServer implements NiFiServer {
                 serverConnector.setHost(hostname);
             }
             serverConnector.setPort(port);
+            serverConnector.setIdleTimeout(idleTimeout);
             serverConnectors.add(serverConnector);
         } else {
             // Add connectors for all IPs from network interfaces
@@ -710,6 +720,8 @@ public class JettyServer implements NiFiServer {
                         // Set host and port
                         serverConnector.setHost(inetAddress.getHostAddress());
                         serverConnector.setPort(port);
+                        serverConnector.setIdleTimeout(idleTimeout);
+
                         return serverConnector;
                     }).collect(Collectors.toList())));
         }

Reply via email to