This is an automated email from the ASF dual-hosted git repository.

joshfischer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new ee1c44a  Improve concurrency for needed parts. (#3107)
ee1c44a is described below

commit ee1c44aefa8a5a5dd4e1effcd736014af70ec3b6
Author: choi se <[email protected]>
AuthorDate: Thu Mar 5 00:21:08 2020 +0900

    Improve concurrency for needed parts. (#3107)
    
    * Change concurrent Map
    
    * Change concurrent Map
    
    * HashMap changes for unneeded parts.
    
    * HashMap changes for unneeded parts.
    
    * Review changes
    
    * Changes HashMap for unneeded parts.
    
    * Improve concurrency for needed parts.
    
    * Remove unused imports.
    
    * Remove unused imports.
    
    * Remove unused imports.
    
    * Fix NPE
    
    (cherry picked from commit 545d3814b315c29d3e396309a2ededaad193ec32)
    
    * Fix WhitespaceAround
    
    * Add dummy Object
    
    * Fix ConstantName
    
    (cherry picked from commit 8d6d5067072e92d6e276f93e18297ddedc647c6c)
---
 .../apache/heron/api/metric/MultiAssignableMetric.java |  3 ++-
 .../org/apache/heron/api/metric/MultiCountMetric.java  |  3 ++-
 .../apache/heron/api/metric/MultiReducedMetric.java    |  3 ++-
 .../src/java/org/apache/heron/api/tuple/Fields.java    |  4 ++--
 .../org/apache/heron/common/network/HeronClient.java   | 18 ++++++++++--------
 .../org/apache/heron/common/network/HeronServer.java   |  8 ++++----
 .../heron/common/utils/metrics/MetricsCollector.java   |  6 +++---
 .../heron/common/utils/misc/PhysicalPlanHelper.java    |  3 ++-
 .../utils/topology/GeneralTopologyContextImpl.java     |  5 +++--
 .../backtype/storm/metric/api/MultiCountMetric.java    |  3 ++-
 .../backtype/storm/metric/api/MultiReducedMetric.java  |  3 ++-
 .../org/apache/storm/metric/api/MultiCountMetric.java  |  3 ++-
 .../apache/storm/metric/api/MultiReducedMetric.java    |  3 ++-
 13 files changed, 38 insertions(+), 27 deletions(-)

diff --git 
a/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java 
b/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java
index d00a93c..c34bc28 100644
--- a/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java
+++ b/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java
@@ -21,9 +21,10 @@ package org.apache.heron.api.metric;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class MultiAssignableMetric<T extends Number> implements 
IMetric<Map<String, T>> {
-  private final Map<String, AssignableMetric<T>> value = new HashMap<>();
+  private final Map<String, AssignableMetric<T>> value = new 
ConcurrentHashMap<>();
   private T initialValue;
 
   public MultiAssignableMetric(T initialValue) {
diff --git 
a/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java 
b/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java
index 2f2fee0..979037a 100644
--- a/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java
+++ b/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java
@@ -21,9 +21,10 @@ package org.apache.heron.api.metric;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class MultiCountMetric implements IMetric<Map<String, Long>> {
-  private Map<String, CountMetric> value = new HashMap<>();
+  private Map<String, CountMetric> value = new ConcurrentHashMap<>();
 
   public MultiCountMetric() {
   }
diff --git 
a/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java 
b/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java
index 9512793..39ed090 100644
--- a/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java
+++ b/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java
@@ -21,6 +21,7 @@ package org.apache.heron.api.metric;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /*
  * A reduce metric that can hold multiple scoped values.
@@ -29,7 +30,7 @@ import java.util.Map;
  * @param <V> type of reduced value
  */
 public class MultiReducedMetric<T, U, V> implements IMetric<Map<String, V>> {
-  private Map<String, ReducedMetric<T, U, V>> value = new HashMap<>();
+  private Map<String, ReducedMetric<T, U, V>> value = new 
ConcurrentHashMap<>();
   private IReducer<T, U, V> reducer;
 
   public MultiReducedMetric(IReducer<T, U, V> reducer) {
diff --git a/heron/api/src/java/org/apache/heron/api/tuple/Fields.java 
b/heron/api/src/java/org/apache/heron/api/tuple/Fields.java
index 50e9905..8526c14 100644
--- a/heron/api/src/java/org/apache/heron/api/tuple/Fields.java
+++ b/heron/api/src/java/org/apache/heron/api/tuple/Fields.java
@@ -22,16 +22,16 @@ package org.apache.heron.api.tuple;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class Fields implements Iterable<String>, Serializable {
   private static final long serialVersionUID = -1045737418722082345L;
 
   private List<String> fields;
-  private Map<String, Integer> mIndex = new HashMap<String, Integer>();
+  private Map<String, Integer> mIndex = new ConcurrentHashMap<String, 
Integer>();
 
   public Fields(String... pFields) {
     this(Arrays.asList(pFields));
diff --git 
a/heron/common/src/java/org/apache/heron/common/network/HeronClient.java 
b/heron/common/src/java/org/apache/heron/common/network/HeronClient.java
index ff30b21..58d2d42 100644
--- a/heron/common/src/java/org/apache/heron/common/network/HeronClient.java
+++ b/heron/common/src/java/org/apache/heron/common/network/HeronClient.java
@@ -24,9 +24,10 @@ import java.net.InetSocketAddress;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SocketChannel;
 import java.time.Duration;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -57,6 +58,7 @@ import org.apache.heron.common.basics.NIOLooper;
  */
 public abstract class HeronClient implements ISelectHandler {
   private static final Logger LOG = 
Logger.getLogger(HeronClient.class.getName());
+  private static final Object DUMMY = new Object();
 
   // When we send a request, we need to:
   // record the the context for this particular RID, and prepare the response 
for that RID
@@ -99,9 +101,9 @@ public abstract class HeronClient implements ISelectHandler {
     socketOptions = options;
 
     isConnected = false;
-    contextMap = new HashMap<REQID, Object>();
-    responseMessageMap = new HashMap<REQID, Message.Builder>();
-    messageMap = new HashMap<String, Message.Builder>();
+    contextMap = new ConcurrentHashMap<REQID, Object>();
+    responseMessageMap = new ConcurrentHashMap<REQID, Message.Builder>();
+    messageMap = new ConcurrentHashMap<String, Message.Builder>();
   }
 
   // Register the protobuf Message's name with protobuf Message
@@ -193,7 +195,7 @@ public abstract class HeronClient implements ISelectHandler 
{
                           Duration timeout) {
     // Pack it as a no-timeout request and send it!
     final REQID rid = REQID.generate();
-    contextMap.put(rid, context);
+    contextMap.put(rid, Objects.nonNull(context) ? context : DUMMY); // Fix NPE
     responseMessageMap.put(rid, responseBuilder);
 
     // Add timeout for this request if necessary
@@ -402,15 +404,15 @@ public abstract class HeronClient implements 
ISelectHandler {
   // Following protected methods are just used for testing
   /////////////////////////////////////////////////////////
   protected Map<String, Message.Builder> getMessageMap() {
-    return new HashMap<String, Message.Builder>(messageMap);
+    return new ConcurrentHashMap<String, Message.Builder>(messageMap);
   }
 
   protected Map<REQID, Message.Builder> getResponseMessageMap() {
-    return new HashMap<REQID, Message.Builder>(responseMessageMap);
+    return new ConcurrentHashMap<REQID, Message.Builder>(responseMessageMap);
   }
 
   protected Map<REQID, Object> getContextMap() {
-    return new HashMap<REQID, Object>(contextMap);
+    return new ConcurrentHashMap<>(contextMap);
   }
 
   protected SocketChannelHelper getSocketChannelHelper() {
diff --git 
a/heron/common/src/java/org/apache/heron/common/network/HeronServer.java 
b/heron/common/src/java/org/apache/heron/common/network/HeronServer.java
index 4ee87f9..8589f58 100644
--- a/heron/common/src/java/org/apache/heron/common/network/HeronServer.java
+++ b/heron/common/src/java/org/apache/heron/common/network/HeronServer.java
@@ -26,9 +26,9 @@ import java.nio.channels.SelectableChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.time.Duration;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -71,9 +71,9 @@ public abstract class HeronServer implements ISelectHandler {
     nioLooper = s;
     endpoint = new InetSocketAddress(host, port);
     socketOptions = options;
-    requestMap = new HashMap<String, Message.Builder>();
-    messageMap = new HashMap<String, Message.Builder>();
-    activeConnections = new HashMap<SocketChannel, SocketChannelHelper>();
+    requestMap = new ConcurrentHashMap<String, Message.Builder>();
+    messageMap = new ConcurrentHashMap<String, Message.Builder>();
+    activeConnections = new ConcurrentHashMap<SocketChannel, 
SocketChannelHelper>();
   }
 
   public InetSocketAddress getEndpoint() {
diff --git 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
index 0a2bfc9..f0b1166 100644
--- 
a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
+++ 
b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
@@ -22,10 +22,10 @@ package org.apache.heron.common.utils.metrics;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Logger;
 
 import org.apache.heron.api.metric.CumulativeCountMetric;
@@ -53,8 +53,8 @@ public class MetricsCollector implements IMetricsRegister {
 
   public MetricsCollector(WakeableLooper runnableToGatherMetrics,
                           Communicator<Metrics.MetricPublisherPublishMessage> 
queue) {
-    metrics = new HashMap<>();
-    timeBucketToMetricNames = new HashMap<>();
+    metrics = new ConcurrentHashMap<>();
+    timeBucketToMetricNames = new ConcurrentHashMap<>();
     this.queue = queue;
     this.runnableToGatherMetrics = runnableToGatherMetrics;
     metricCollectionCount = new CumulativeCountMetric();
diff --git 
a/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java
 
b/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java
index 485e821..6b4d504 100644
--- 
a/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java
+++ 
b/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Logger;
 
 import org.apache.heron.api.Config;
@@ -101,7 +102,7 @@ public class PhysicalPlanHelper {
     }
 
     // setup outputSchema
-    outputSchema = new HashMap<String, Integer>();
+    outputSchema = new ConcurrentHashMap<String, Integer>();
     List<TopologyAPI.OutputStream> outputs;
     if (mySpout != null) {
       outputs = mySpout.getOutputsList();
diff --git 
a/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java
 
b/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java
index 9432871..e06c18c 100644
--- 
a/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java
+++ 
b/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.heron.api.Config;
 import org.apache.heron.api.generated.TopologyAPI;
@@ -63,8 +64,8 @@ public class GeneralTopologyContextImpl implements 
GeneralTopologyContext {
     this.topology = topology;
     this.topologyConfig = new HashMap<>(clusterConfig);
     this.taskToComponentMap = taskToComponentMap;
-    this.inputs = new HashMap<>();
-    this.outputs = new HashMap<>();
+    this.inputs = new ConcurrentHashMap<>();
+    this.outputs = new ConcurrentHashMap<>();
     this.componentsOutputFields = new HashMap<>();
 
     for (int i = 0; i < this.topology.getSpoutsCount(); ++i) {
diff --git 
a/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java 
b/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java
index 6e126fe..7f2abf8 100644
--- 
a/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java
+++ 
b/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java
@@ -20,9 +20,10 @@ package backtype.storm.metric.api;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class MultiCountMetric implements IMetric {
-  private Map<String, CountMetric> value = new HashMap<>();
+  private Map<String, CountMetric> value = new ConcurrentHashMap<>();
 
   public MultiCountMetric() {
   }
diff --git 
a/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java
 
b/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java
index 91c1eea..4f0f855 100644
--- 
a/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java
+++ 
b/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java
@@ -20,10 +20,11 @@ package backtype.storm.metric.api;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @SuppressWarnings("rawtypes")
 public class MultiReducedMetric implements IMetric {
-  private Map<String, ReducedMetric> value = new HashMap<>();
+  private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
   private IReducer reducer;
 
   public MultiReducedMetric(IReducer reducer) {
diff --git 
a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java
 
b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java
index f55eab5..c911947 100644
--- 
a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java
+++ 
b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java
@@ -20,9 +20,10 @@ package org.apache.storm.metric.api;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class MultiCountMetric implements IMetric {
-  private Map<String, CountMetric> value = new HashMap<>();
+  private Map<String, CountMetric> value = new ConcurrentHashMap<>();
 
   public MultiCountMetric() {
   }
diff --git 
a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java
 
b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java
index 15e05a2..0866773 100644
--- 
a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java
+++ 
b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java
@@ -20,10 +20,11 @@ package org.apache.storm.metric.api;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @SuppressWarnings("rawtypes")
 public class MultiReducedMetric implements IMetric {
-  private Map<String, ReducedMetric> value = new HashMap<>();
+  private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
   private IReducer reducer;
 
   public MultiReducedMetric(IReducer reducer) {

Reply via email to