Repository: asterixdb Updated Branches: refs/heads/master 356f6c6a0 -> ffd6e4ac5
Fixed race condition during ncMap lookup Change-Id: I1bfbe712c100f48011a516c373ac8994028dc3dd Reviewed-on: https://asterix-gerrit.ics.uci.edu/1792 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ffd6e4ac Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ffd6e4ac Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ffd6e4ac Branch: refs/heads/master Commit: ffd6e4ac51ded48ee5e538fe4a2334d74068d414 Parents: 356f6c6 Author: Ildar Absalyamov <[email protected]> Authored: Tue May 30 18:26:45 2017 -0700 Committer: Ildar Absalyamov <[email protected]> Committed: Tue May 30 22:44:36 2017 -0700 ---------------------------------------------------------------------- .../asterix/external/api/INodeResolver.java | 7 ++- .../factory/LocalFSInputStreamFactory.java | 8 ++- .../asterix/external/util/IdentityResolver.java | 7 ++- .../asterix/external/util/NodeResolver.java | 58 +++++--------------- .../asterix/runtime/utils/RuntimeUtils.java | 4 +- 5 files changed, 35 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java index 99ffdf1..c6e87bd 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java @@ -18,6 +18,10 @@ */ package org.apache.asterix.external.api; +import java.net.InetAddress; +import java.util.Map; +import java.util.Set; + import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; @@ -35,5 +39,6 @@ public interface INodeResolver { * @return resolved result (a node controller id) * @throws AsterixException */ - String resolveNode(ICcApplicationContext appCtx, String value) throws AsterixException; + String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap, Set<String> ncs) + throws AsterixException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java index 44b0b43..d7afa13 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java @@ -18,13 +18,16 @@ */ package org.apache.asterix.external.input.stream.factory; +import java.net.InetAddress; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; @@ -37,6 +40,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.FileSystemWatcher; import org.apache.asterix.external.util.NodeResolverFactory; +import org.apache.asterix.runtime.utils.RuntimeUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -103,6 +107,8 @@ public class LocalFSInputStreamFactory implements IInputStreamFactory { private void configureFileSplits(ICcApplicationContext appCtx, String[] splits) throws AsterixException { INodeResolver resolver = getNodeResolver(); + Map<InetAddress, Set<String>> ncMap = RuntimeUtils.getForcedNodeControllerMap(appCtx); + Set<String> ncs = ncMap.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); inputFileSplits = new UnmanagedFileSplit[splits.length]; String node; String path; @@ -114,7 +120,7 @@ public class LocalFSInputStreamFactory implements IInputStreamFactory { throw new AsterixException( "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\""); } - node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0]); + node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0], ncMap, ncs); path = trimmedValue.split("://")[1]; inputFileSplits[count++] = new UnmanagedFileSplit(node, path); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java index 9a4ddff..651833f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java @@ -18,6 +18,10 @@ */ package org.apache.asterix.external.util; +import java.net.InetAddress; +import java.util.Map; +import java.util.Set; + import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.api.INodeResolver; @@ -28,7 +32,8 @@ import org.apache.asterix.external.api.INodeResolver; public class IdentityResolver implements INodeResolver { @Override - public String resolveNode(ICcApplicationContext appCtx, String value) { + public String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap, + Set<String> ncs) { return value; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java index 5b15e9e..c180dea 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java @@ -20,10 +20,7 @@ package org.apache.asterix.external.util; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Random; import java.util.Set; @@ -31,57 +28,30 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.api.INodeResolver; -import org.apache.asterix.runtime.utils.RuntimeUtils; /** * Resolves a value (DNS/IP Address) or a (Node Controller Id) to the id of a Node Controller running at the location. */ public class NodeResolver implements INodeResolver { //TODO: change this call and replace by calling AsterixClusterProperties - private static final Random random = new Random(); - private static final Map<InetAddress, Set<String>> ncMap = new HashMap<>(); - private static final Set<String> ncs = new HashSet<>(); + private final Random random = new Random(); @Override - public String resolveNode(ICcApplicationContext appCtx, String value) throws AsterixException { + public String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap, + Set<String> ncs) throws AsterixException { + if (ncs.contains(value)) { + return value; + } + InetAddress ipAddress = null; try { - if (ncMap.isEmpty()) { - NodeResolver.updateNCs(appCtx); - } - if (ncs.contains(value)) { - return value; - } else { - NodeResolver.updateNCs(appCtx); - if (ncs.contains(value)) { - return value; - } - } - InetAddress ipAddress = null; - try { - ipAddress = InetAddress.getByName(value); - } catch (UnknownHostException e) { - throw new AsterixException(ErrorCode.NODE_RESOLVER_UNABLE_RESOLVE_HOST, e, value); - } - Set<String> nodeControllers = ncMap.get(ipAddress); - if (nodeControllers == null || nodeControllers.isEmpty()) { - throw new AsterixException(ErrorCode.NODE_RESOLVER_NO_NODE_CONTROLLERS, value); - } - return nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())]; - } catch (Exception e) { - throw new AsterixException(e); + ipAddress = InetAddress.getByName(value); + } catch (UnknownHostException e) { + throw new AsterixException(ErrorCode.NODE_RESOLVER_UNABLE_RESOLVE_HOST, e, value); } - } - - private static void updateNCs(ICcApplicationContext appCtx) throws Exception { - synchronized (ncMap) { - ncMap.clear(); - RuntimeUtils.getNodeControllerMap(appCtx, ncMap); - synchronized (ncs) { - ncs.clear(); - for (Entry<InetAddress, Set<String>> entry : ncMap.entrySet()) { - ncs.addAll(entry.getValue()); - } - } + Set<String> nodeControllers = ncMap.get(ipAddress); + if (nodeControllers == null || nodeControllers.isEmpty()) { + throw new AsterixException(ErrorCode.NODE_RESOLVER_NO_NODE_CONTROLLERS, value); } + return nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())]; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java index 85e93b8..6970af5 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java @@ -60,10 +60,10 @@ public class RuntimeUtils { return map; } - public static void getNodeControllerMap(ICcApplicationContext appCtx, Map<InetAddress, Set<String>> map) { + public static Map<InetAddress, Set<String>> getForcedNodeControllerMap(ICcApplicationContext appCtx) { ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); INodeManager nodeManager = ccs.getNodeManager(); - map.putAll(nodeManager.getIpAddressNodeNameMap()); + return nodeManager.getIpAddressNodeNameMap(); } public static JobSpecification createJobSpecification(ICcApplicationContext appCtx) {
