This is an automated email from the ASF dual-hosted git repository.
joshfischer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new ee1c44a Improve concurrency for needed parts. (#3107)
ee1c44a is described below
commit ee1c44aefa8a5a5dd4e1effcd736014af70ec3b6
Author: choi se <[email protected]>
AuthorDate: Thu Mar 5 00:21:08 2020 +0900
Improve concurrency for needed parts. (#3107)
* Change concurrent Map
* Change concurrent Map
* HashMap changes for unneeded parts.
* HashMap changes for unneeded parts.
* Review changes
* Changes HashMap for unneeded parts.
* Improve concurrency for needed parts.
* Remove unused imports.
* Remove unused imports.
* Remove unused imports.
* Fix NPE
(cherry picked from commit 545d3814b315c29d3e396309a2ededaad193ec32)
* Fix WhitespaceAround
* Add dummy Object
* Fix ConstantName
(cherry picked from commit 8d6d5067072e92d6e276f93e18297ddedc647c6c)
---
.../apache/heron/api/metric/MultiAssignableMetric.java | 3 ++-
.../org/apache/heron/api/metric/MultiCountMetric.java | 3 ++-
.../apache/heron/api/metric/MultiReducedMetric.java | 3 ++-
.../src/java/org/apache/heron/api/tuple/Fields.java | 4 ++--
.../org/apache/heron/common/network/HeronClient.java | 18 ++++++++++--------
.../org/apache/heron/common/network/HeronServer.java | 8 ++++----
.../heron/common/utils/metrics/MetricsCollector.java | 6 +++---
.../heron/common/utils/misc/PhysicalPlanHelper.java | 3 ++-
.../utils/topology/GeneralTopologyContextImpl.java | 5 +++--
.../backtype/storm/metric/api/MultiCountMetric.java | 3 ++-
.../backtype/storm/metric/api/MultiReducedMetric.java | 3 ++-
.../org/apache/storm/metric/api/MultiCountMetric.java | 3 ++-
.../apache/storm/metric/api/MultiReducedMetric.java | 3 ++-
13 files changed, 38 insertions(+), 27 deletions(-)
diff --git
a/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java
b/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java
index d00a93c..c34bc28 100644
--- a/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java
+++ b/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java
@@ -21,9 +21,10 @@ package org.apache.heron.api.metric;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class MultiAssignableMetric<T extends Number> implements
IMetric<Map<String, T>> {
- private final Map<String, AssignableMetric<T>> value = new HashMap<>();
+ private final Map<String, AssignableMetric<T>> value = new
ConcurrentHashMap<>();
private T initialValue;
public MultiAssignableMetric(T initialValue) {
diff --git
a/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java
b/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java
index 2f2fee0..979037a 100644
--- a/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java
+++ b/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java
@@ -21,9 +21,10 @@ package org.apache.heron.api.metric;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class MultiCountMetric implements IMetric<Map<String, Long>> {
- private Map<String, CountMetric> value = new HashMap<>();
+ private Map<String, CountMetric> value = new ConcurrentHashMap<>();
public MultiCountMetric() {
}
diff --git
a/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java
b/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java
index 9512793..39ed090 100644
--- a/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java
+++ b/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java
@@ -21,6 +21,7 @@ package org.apache.heron.api.metric;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/*
* A reduce metric that can hold multiple scoped values.
@@ -29,7 +30,7 @@ import java.util.Map;
* @param <V> type of reduced value
*/
public class MultiReducedMetric<T, U, V> implements IMetric<Map<String, V>> {
- private Map<String, ReducedMetric<T, U, V>> value = new HashMap<>();
+ private Map<String, ReducedMetric<T, U, V>> value = new
ConcurrentHashMap<>();
private IReducer<T, U, V> reducer;
public MultiReducedMetric(IReducer<T, U, V> reducer) {
diff --git a/heron/api/src/java/org/apache/heron/api/tuple/Fields.java
b/heron/api/src/java/org/apache/heron/api/tuple/Fields.java
index 50e9905..8526c14 100644
--- a/heron/api/src/java/org/apache/heron/api/tuple/Fields.java
+++ b/heron/api/src/java/org/apache/heron/api/tuple/Fields.java
@@ -22,16 +22,16 @@ package org.apache.heron.api.tuple;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class Fields implements Iterable<String>, Serializable {
private static final long serialVersionUID = -1045737418722082345L;
private List<String> fields;
- private Map<String, Integer> mIndex = new HashMap<String, Integer>();
+ private Map<String, Integer> mIndex = new ConcurrentHashMap<String,
Integer>();
public Fields(String... pFields) {
this(Arrays.asList(pFields));
diff --git
a/heron/common/src/java/org/apache/heron/common/network/HeronClient.java
b/heron/common/src/java/org/apache/heron/common/network/HeronClient.java
index ff30b21..58d2d42 100644
--- a/heron/common/src/java/org/apache/heron/common/network/HeronClient.java
+++ b/heron/common/src/java/org/apache/heron/common/network/HeronClient.java
@@ -24,9 +24,10 @@ import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -57,6 +58,7 @@ import org.apache.heron.common.basics.NIOLooper;
*/
public abstract class HeronClient implements ISelectHandler {
private static final Logger LOG =
Logger.getLogger(HeronClient.class.getName());
+ private static final Object DUMMY = new Object();
// When we send a request, we need to:
// record the the context for this particular RID, and prepare the response
for that RID
@@ -99,9 +101,9 @@ public abstract class HeronClient implements ISelectHandler {
socketOptions = options;
isConnected = false;
- contextMap = new HashMap<REQID, Object>();
- responseMessageMap = new HashMap<REQID, Message.Builder>();
- messageMap = new HashMap<String, Message.Builder>();
+ contextMap = new ConcurrentHashMap<REQID, Object>();
+ responseMessageMap = new ConcurrentHashMap<REQID, Message.Builder>();
+ messageMap = new ConcurrentHashMap<String, Message.Builder>();
}
// Register the protobuf Message's name with protobuf Message
@@ -193,7 +195,7 @@ public abstract class HeronClient implements ISelectHandler
{
Duration timeout) {
// Pack it as a no-timeout request and send it!
final REQID rid = REQID.generate();
- contextMap.put(rid, context);
+ contextMap.put(rid, Objects.nonNull(context) ? context : DUMMY); // Fix NPE
responseMessageMap.put(rid, responseBuilder);
// Add timeout for this request if necessary
@@ -402,15 +404,15 @@ public abstract class HeronClient implements
ISelectHandler {
// Following protected methods are just used for testing
/////////////////////////////////////////////////////////
protected Map<String, Message.Builder> getMessageMap() {
- return new HashMap<String, Message.Builder>(messageMap);
+ return new ConcurrentHashMap<String, Message.Builder>(messageMap);
}
protected Map<REQID, Message.Builder> getResponseMessageMap() {
- return new HashMap<REQID, Message.Builder>(responseMessageMap);
+ return new ConcurrentHashMap<REQID, Message.Builder>(responseMessageMap);
}
protected Map<REQID, Object> getContextMap() {
- return new HashMap<REQID, Object>(contextMap);
+ return new ConcurrentHashMap<>(contextMap);
}
protected SocketChannelHelper getSocketChannelHelper() {
diff --git
a/heron/common/src/java/org/apache/heron/common/network/HeronServer.java
b/heron/common/src/java/org/apache/heron/common/network/HeronServer.java
index 4ee87f9..8589f58 100644
--- a/heron/common/src/java/org/apache/heron/common/network/HeronServer.java
+++ b/heron/common/src/java/org/apache/heron/common/network/HeronServer.java
@@ -26,9 +26,9 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -71,9 +71,9 @@ public abstract class HeronServer implements ISelectHandler {
nioLooper = s;
endpoint = new InetSocketAddress(host, port);
socketOptions = options;
- requestMap = new HashMap<String, Message.Builder>();
- messageMap = new HashMap<String, Message.Builder>();
- activeConnections = new HashMap<SocketChannel, SocketChannelHelper>();
+ requestMap = new ConcurrentHashMap<String, Message.Builder>();
+ messageMap = new ConcurrentHashMap<String, Message.Builder>();
+ activeConnections = new ConcurrentHashMap<SocketChannel,
SocketChannelHelper>();
}
public InetSocketAddress getEndpoint() {
diff --git
a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
index 0a2bfc9..f0b1166 100644
---
a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
+++
b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
@@ -22,10 +22,10 @@ package org.apache.heron.common.utils.metrics;
import java.time.Duration;
import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import org.apache.heron.api.metric.CumulativeCountMetric;
@@ -53,8 +53,8 @@ public class MetricsCollector implements IMetricsRegister {
public MetricsCollector(WakeableLooper runnableToGatherMetrics,
Communicator<Metrics.MetricPublisherPublishMessage>
queue) {
- metrics = new HashMap<>();
- timeBucketToMetricNames = new HashMap<>();
+ metrics = new ConcurrentHashMap<>();
+ timeBucketToMetricNames = new ConcurrentHashMap<>();
this.queue = queue;
this.runnableToGatherMetrics = runnableToGatherMetrics;
metricCollectionCount = new CumulativeCountMetric();
diff --git
a/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java
b/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java
index 485e821..6b4d504 100644
---
a/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java
+++
b/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import org.apache.heron.api.Config;
@@ -101,7 +102,7 @@ public class PhysicalPlanHelper {
}
// setup outputSchema
- outputSchema = new HashMap<String, Integer>();
+ outputSchema = new ConcurrentHashMap<String, Integer>();
List<TopologyAPI.OutputStream> outputs;
if (mySpout != null) {
outputs = mySpout.getOutputsList();
diff --git
a/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java
b/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java
index 9432871..e06c18c 100644
---
a/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java
+++
b/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.heron.api.Config;
import org.apache.heron.api.generated.TopologyAPI;
@@ -63,8 +64,8 @@ public class GeneralTopologyContextImpl implements
GeneralTopologyContext {
this.topology = topology;
this.topologyConfig = new HashMap<>(clusterConfig);
this.taskToComponentMap = taskToComponentMap;
- this.inputs = new HashMap<>();
- this.outputs = new HashMap<>();
+ this.inputs = new ConcurrentHashMap<>();
+ this.outputs = new ConcurrentHashMap<>();
this.componentsOutputFields = new HashMap<>();
for (int i = 0; i < this.topology.getSpoutsCount(); ++i) {
diff --git
a/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java
b/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java
index 6e126fe..7f2abf8 100644
---
a/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java
+++
b/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java
@@ -20,9 +20,10 @@ package backtype.storm.metric.api;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class MultiCountMetric implements IMetric {
- private Map<String, CountMetric> value = new HashMap<>();
+ private Map<String, CountMetric> value = new ConcurrentHashMap<>();
public MultiCountMetric() {
}
diff --git
a/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java
b/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java
index 91c1eea..4f0f855 100644
---
a/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java
+++
b/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java
@@ -20,10 +20,11 @@ package backtype.storm.metric.api;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("rawtypes")
public class MultiReducedMetric implements IMetric {
- private Map<String, ReducedMetric> value = new HashMap<>();
+ private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
private IReducer reducer;
public MultiReducedMetric(IReducer reducer) {
diff --git
a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java
b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java
index f55eab5..c911947 100644
---
a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java
+++
b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java
@@ -20,9 +20,10 @@ package org.apache.storm.metric.api;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class MultiCountMetric implements IMetric {
- private Map<String, CountMetric> value = new HashMap<>();
+ private Map<String, CountMetric> value = new ConcurrentHashMap<>();
public MultiCountMetric() {
}
diff --git
a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java
b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java
index 15e05a2..0866773 100644
---
a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java
+++
b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java
@@ -20,10 +20,11 @@ package org.apache.storm.metric.api;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("rawtypes")
public class MultiReducedMetric implements IMetric {
- private Map<String, ReducedMetric> value = new HashMap<>();
+ private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
private IReducer reducer;
public MultiReducedMetric(IReducer reducer) {