This is an automated email from the ASF dual-hosted git repository.

payert pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 4fd9938  AMBARI-25549 NegativeArraySizeException thrown when invoking 
CurrentCollectorHost (#3225)
4fd9938 is described below

commit 4fd9938a469398fc270b89d5f84133ccab2a4eb0
Author: payert <35402259+pay...@users.noreply.github.com>
AuthorDate: Tue Sep 15 09:40:03 2020 +0200

    AMBARI-25549 NegativeArraySizeException thrown when invoking 
CurrentCollectorHost (#3225)
    
    * AMBARI-25549 NegativeArraySizeException thrown when invoking 
CurrentCollectorHost
---
 .../sink/timeline/AbstractTimelineMetricsSink.java | 178 ++++++++++++---------
 ...etricSinkWriteShardHostnameHashingStrategy.java |   1 +
 .../timeline/AbstractTimelineMetricSinkTest.java   |  67 ++++++++
 3 files changed, 170 insertions(+), 76 deletions(-)

diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index f1511f6..d5586d9 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -119,6 +119,8 @@ public abstract class AbstractTimelineMetricsSink {
   // well as timed refresh
   protected Supplier<String> targetCollectorHostSupplier;
 
+  private final CollectorShardSupplier collectorShardSupplier = new 
CollectorShardSupplier();
+
   protected final SortedSet<String> allKnownLiveCollectors = new TreeSet<>();
 
   private volatile boolean isInitializedForHA = false;
@@ -148,6 +150,20 @@ public abstract class AbstractTimelineMetricsSink {
       .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
   }
 
+  private final class CollectorShardSupplier implements Supplier<String> {
+    @Override
+    public String get() {
+      //shardExpired flag is used to determine if the Supplier.get() is 
invoked through the
+      // findPreferredCollectHost method (No need to refresh collector hosts)
+      // OR
+      // through Expiry (Refresh needed to pick up dead collectors that might 
have not become alive).
+      if (shardExpired) {
+        refreshCollectorsFromConfigured(getConfiguredCollectorHosts());
+      }
+      return metricSinkWriteShardStrategy.findCollectorShard(new 
ArrayList<>(allKnownLiveCollectors));
+    }
+  }
+
   public AbstractTimelineMetricsSink() {
     LOG = LogFactory.getLog(this.getClass());
   }
@@ -272,18 +288,22 @@ public abstract class AbstractTimelineMetricsSink {
 
   protected String getCurrentCollectorHost() {
     String collectorHost;
-    // Get cached target
-    if (targetCollectorHostSupplier != null) {
-      collectorHost = targetCollectorHostSupplier.get();
-      // Last X attempts have failed - force refresh
-      if (failedCollectorConnectionsCounter.get() > 
RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) {
-        LOG.debug("Removing collector " + collectorHost + " from 
allKnownLiveCollectors.");
-        allKnownLiveCollectors.remove(collectorHost);
-        targetCollectorHostSupplier = null;
+    /* Using collectorShardSupplier as sync_object since the Supplier.get() is 
using the same
+       object under the hood to provide thread safety. */
+    synchronized (collectorShardSupplier) {
+      // Get cached target
+      if (targetCollectorHostSupplier != null) {
+        collectorHost = targetCollectorHostSupplier.get();
+        // Last X attempts have failed - force refresh
+        if (failedCollectorConnectionsCounter.get() > 
RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) {
+          LOG.debug("Removing collector " + collectorHost + " from 
allKnownLiveCollectors.");
+          allKnownLiveCollectors.remove(collectorHost);
+          targetCollectorHostSupplier = null;
+          collectorHost = findPreferredCollectHost();
+        }
+      } else {
         collectorHost = findPreferredCollectHost();
       }
-    } else {
-      collectorHost = findPreferredCollectHost();
     }
 
     if (collectorHost == null) {
@@ -360,11 +380,15 @@ public abstract class AbstractTimelineMetricsSink {
    *
    * @return the app cookie manager
    */
-  public synchronized AppCookieManager getAppCookieManager() {
-    if (appCookieManager == null) {
-      appCookieManager = new AppCookieManager();
+  public AppCookieManager getAppCookieManager() {
+    /* Using collectorShardSupplier as sync_object since the Supplier.get() is 
using the same
+       object under the hood to provide thread safety. */
+    synchronized (collectorShardSupplier) {
+      if (appCookieManager == null) {
+        appCookieManager = new AppCookieManager();
+      }
+      return appCookieManager;
     }
-    return appCookieManager;
   }
 
   /**
@@ -513,72 +537,59 @@ public abstract class AbstractTimelineMetricsSink {
    *
    * @return String Collector hostname
    */
-  protected synchronized String findPreferredCollectHost() {
-    if (!isInitializedForHA) {
-      init();
-    }
-
-    shardExpired = false;
-    // Auto expire and re-calculate after 1 hour
-    if (targetCollectorHostSupplier != null) {
-      String targetCollector = targetCollectorHostSupplier.get();
-      if (targetCollector != null) {
-        return targetCollector;
+  protected String findPreferredCollectHost() {
+    /* Using collectorShardSupplier as sync_object since the Supplier.get() is 
using the same
+       object under the hood to provide thread safety. */
+    synchronized (collectorShardSupplier) {
+      if (!isInitializedForHA) {
+        init();
       }
-    }
 
-    // Reach out to all configured collectors before Zookeeper
-    Collection<String> collectorHosts = getConfiguredCollectorHosts();
-    refreshCollectorsFromConfigured(collectorHosts);
-
-    // Lookup Zookeeper for live hosts - max 10 seconds wait time
-    long currentTime = System.currentTimeMillis();
-    if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null
-      && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) 
{
-
-      LOG.debug("No live collectors from configuration. Requesting 
zookeeper...");
-      
allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
-      boolean noNewCollectorFromZk = true;
-      for (String collectorHostFromZk : allKnownLiveCollectors) {
-        if (!collectorHosts.contains(collectorHostFromZk)) {
-          noNewCollectorFromZk = false;
-          break;
+      shardExpired = false;
+      // Auto expire and re-calculate after 1 hour
+      if (targetCollectorHostSupplier != null) {
+        String targetCollector = targetCollectorHostSupplier.get();
+        if (targetCollector != null) {
+          return targetCollector;
         }
       }
-      if (noNewCollectorFromZk) {
-        LOG.debug("No new collector was found from Zookeeper. Will not request 
zookeeper for " + zookeeperBackoffTimeMillis + " millis");
-        lastFailedZkRequestTime = System.currentTimeMillis();
-      }
-    }
 
-    if (allKnownLiveCollectors.size() != 0) {
-      targetCollectorHostSupplier = Suppliers.memoizeWithExpiration(
-        new Supplier<String>() {
-          @Override
-          public String get() {
-            //shardExpired flag is used to determine if the Supplier.get() is 
invoked through the
-            // findPreferredCollectHost method (No need to refresh collector 
hosts
-            // OR
-            // through Expiry (Refresh needed to pick up dead collectors that 
might have not become alive).
-            if (shardExpired) {
-              refreshCollectorsFromConfigured(getConfiguredCollectorHosts());
-            }
-            return metricSinkWriteShardStrategy.findCollectorShard(new 
ArrayList<>(allKnownLiveCollectors));
+      // Reach out to all configured collectors before Zookeeper
+      Collection<String> collectorHosts = getConfiguredCollectorHosts();
+      refreshCollectorsFromConfigured(collectorHosts);
+
+      // Lookup Zookeeper for live hosts - max 10 seconds wait time
+      long currentTime = System.currentTimeMillis();
+      if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null
+              && (currentTime - lastFailedZkRequestTime) > 
zookeeperBackoffTimeMillis) {
+
+        LOG.debug("No live collectors from configuration. Requesting 
zookeeper...");
+        
allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
+        boolean noNewCollectorFromZk = true;
+        for (String collectorHostFromZk : allKnownLiveCollectors) {
+          if (!collectorHosts.contains(collectorHostFromZk)) {
+            noNewCollectorFromZk = false;
+            break;
           }
-        },  // random.nextInt(max - min + 1) + min # (60 to 75 minutes)
-        rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES
-          - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1)
-          + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES,
-        TimeUnit.MINUTES
-      );
-
-      String collectorHost = targetCollectorHostSupplier.get();
+        }
+        if (noNewCollectorFromZk) {
+          LOG.debug("No new collector was found from Zookeeper. Will not 
request zookeeper for " + zookeeperBackoffTimeMillis + " millis");
+          lastFailedZkRequestTime = System.currentTimeMillis();
+        }
+      }
+
+      if (allKnownLiveCollectors.size() != 0) {
+        SupplierExpiry expiry = getSupplierExpiry();
+        targetCollectorHostSupplier = 
Suppliers.memoizeWithExpiration(collectorShardSupplier, expiry.duration, 
expiry.timeUnit);
+
+        String collectorHost = targetCollectorHostSupplier.get();
+        shardExpired = true;
+        return collectorHost;
+      }
+      LOG.debug("Couldn't find any live collectors. Returning null");
       shardExpired = true;
-      return collectorHost;
+      return null;
     }
-    LOG.debug("Couldn't find any live collectors. Returning null");
-    shardExpired = true;
-    return null;
   }
 
   private void refreshCollectorsFromConfigured(Collection<String> 
collectorHosts) {
@@ -591,14 +602,12 @@ public abstract class AbstractTimelineMetricsSink {
           try {
             Collection<String> liveHosts = 
findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort());
             // Update live Hosts - current host will already be a part of this
-            for (String host : liveHosts) {
-              allKnownLiveCollectors.add(host);
-            }
+            allKnownLiveCollectors.addAll(liveHosts);
             break; // Found at least 1 live collector
           } catch (MetricCollectorUnavailableException e) {
             LOG.debug("Collector " + hostStr + " is not longer live. Removing 
" +
               "it from list of know live collector hosts : " + 
allKnownLiveCollectors);
-            allKnownLiveCollectors.remove(hostStr);
+            boolean res = allKnownLiveCollectors.remove(hostStr);
           }
         }
       }
@@ -711,6 +720,23 @@ public abstract class AbstractTimelineMetricsSink {
     return metricsPostCache;
   }
 
+  class SupplierExpiry {
+    final long duration;
+    final TimeUnit timeUnit;
+
+    public SupplierExpiry(long duration, TimeUnit timeUnit) {
+      this.duration = duration;
+      this.timeUnit = timeUnit;
+    }
+  }
+  protected SupplierExpiry getSupplierExpiry() {
+    // random.nextInt(max - min + 1) + min # (60 to 75 minutes)
+    long duration = rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES
+            - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1)
+            + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES;
+    return new SupplierExpiry(duration, TimeUnit.MINUTES);
+  }
+
   /**
    * Get a pre-formatted URI for the collector
    */
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
index 25bff54..23bf68c 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
@@ -36,6 +36,7 @@ public class MetricSinkWriteShardHostnameHashingStrategy 
implements MetricSinkWr
 
   @Override
   public String findCollectorShard(List<String> collectorHosts) {
+    if (collectorHosts.isEmpty()) return null;
     long index = hostnameHash % collectorHosts.size();
     index = index < 0 ? index + collectorHosts.size() : index;
     String collectorHost = collectorHosts.get((int) index);
diff --git 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
index f3f8a62..71b9703 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
@@ -30,7 +30,13 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.expect;
@@ -204,6 +210,41 @@ public class AbstractTimelineMetricSinkTest {
     verifyAll();
   }
 
+  @Test
+  public void testGetCurrentCollectorHostMultiThreaded() {
+    final int threadPoolSize = 32;
+    final int numberOfThreads = threadPoolSize * 8;
+    final AtomicBoolean stop = new AtomicBoolean(false);
+    final TestTimelineMetricsThreadingSink sink = new 
TestTimelineMetricsThreadingSink();
+    final ExecutorService executorService = 
Executors.newFixedThreadPool(threadPoolSize);
+    int threadCount = 0;
+
+    try {
+      while (!stop.get() && threadCount < numberOfThreads) {
+        executorService.execute(() -> {
+          try {
+            String host = sink.getCurrentCollectorHost();
+          } catch (Exception e) {
+            e.printStackTrace();
+            stop.set(true);
+            executorService.shutdownNow();
+          }
+        });
+
+        threadCount++;
+      }
+
+      executorService.awaitTermination(30, TimeUnit.SECONDS);
+      if(stop.get()) {
+        Assert.fail("Unexpected exception(s) has been thrown! See the stack 
traces above!");
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      executorService.shutdown();
+    }
+  }
+
   private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
     @Override
     protected String getCollectorUri(String host) {
@@ -254,5 +295,31 @@ public class AbstractTimelineMetricSinkTest {
     protected String getHostInMemoryAggregationProtocol() {
       return "http";
     }
+
+  }
+
+  private class TestTimelineMetricsThreadingSink extends 
TestTimelineMetricsSink {
+    private final AtomicInteger counter = new AtomicInteger(0);
+    private final Collection<String> hosts = 
Collections.singletonList("host1");
+
+    @Override
+    protected Collection<String> 
findLiveCollectorHostsFromKnownCollector(String host, String port) throws 
MetricCollectorUnavailableException {
+      failedCollectorConnectionsCounter.set(4);
+      int c = counter.getAndIncrement();
+      if (c % 2 == 0 || c % 3 == 0) {
+        throw new 
MetricCollectorUnavailableException("MetricCollectorUnavailable");
+      }
+      return hosts;
+    }
+
+    @Override
+    protected Collection<String> getConfiguredCollectorHosts() {
+      return hosts;
+    }
+
+    @Override
+    protected SupplierExpiry getSupplierExpiry() {
+      return new SupplierExpiry(128, TimeUnit.MILLISECONDS);
+    }
   }
 }

Reply via email to