This is an automated email from the ASF dual-hosted git repository. htowaileb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit c77174917bc8de3b3a0d050d06ec1b06ce39bf0f Author: Wail Alkowaileet <[email protected]> AuthorDate: Thu Sep 1 13:32:19 2022 -0700 [ASTERIXDB-3072] Make HDFS scheduler to use different type of channels - user model changes: no - storage format changes: no - interface changes: no Details: HDFS scheduler only uses plain connection. This change allows it to use different type of connections Change-Id: I64f9f4a30a6564aac26e99433ccc695975a22a3b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17224 Tested-by: Jenkins <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../apache/asterix/external/util/HDFSUtils.java | 5 ++- .../apache/hyracks/hdfs/scheduler/Scheduler.java | 42 ++++++++++------------ .../apache/hyracks/hdfs2/scheduler/Scheduler.java | 17 ++++----- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index 9e49d86612..3506216e10 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -62,6 +62,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.api.network.INetworkSecurityManager; import org.apache.hyracks.hdfs.scheduler.Scheduler; import org.apache.parquet.hadoop.ParquetInputFormat; @@ -72,10 +73,12 @@ public class HDFSUtils { public static Scheduler initializeHDFSScheduler(ICCServiceContext serviceCtx) throws HyracksDataException { ICCContext ccContext = serviceCtx.getCCContext(); + INetworkSecurityManager networkSecurityManager = serviceCtx.getControllerService().getNetworkSecurityManager(); Scheduler scheduler = null; try { scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), - ccContext.getClusterControllerInfo().getClientNetPort()); + ccContext.getClusterControllerInfo().getClientNetPort(), + networkSecurityManager.getSocketChannelFactory()); } catch (HyracksException e) { throw new RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER); } diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java index b9d68f7bf3..f5bf07b6b0 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.api.topology.ClusterTopology; import org.apache.hyracks.hdfs.api.INcCollection; import org.apache.hyracks.hdfs.api.INcCollectionBuilder; @@ -56,16 +57,24 @@ public class Scheduler { private static final Logger LOGGER = LogManager.getLogger(); - /** a list of NCs */ + /** + * a list of NCs + */ private String[] NCs; - /** a map from ip to NCs */ + /** + * a map from ip to NCs + */ private Map<String, List<String>> ipToNcMapping = new HashMap<>(); - /** a map from the NC name to the index */ + /** + * a map from the NC name to the index + */ private Map<String, Integer> ncNameToIndex = new HashMap<>(); - /** a map from NC name to the NodeControllerInfo */ + /** + * a map from NC name to the NodeControllerInfo + */ private Map<String, NodeControllerInfo> ncNameToNcInfos; /** @@ -76,13 +85,15 @@ public class Scheduler { /** * The constructor of the scheduler. * - * @param ncNameToNcInfos + * @param ipAddress IP address + * @param port Port + * @param channelFactory Channel Factory * @throws HyracksException */ - public Scheduler(String ipAddress, int port) throws HyracksException { + public Scheduler(String ipAddress, int port, ISocketChannelFactory channelFactory) throws HyracksException { try { - IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port); + IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port, channelFactory); this.ncNameToNcInfos = hcc.getNodeControllerInfos(); ClusterTopology topology = hcc.getClusterTopology(); this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder() @@ -93,23 +104,6 @@ public class Scheduler { } } - /** - * The constructor of the scheduler. - * - * @param ncNameToNcInfos - * @throws HyracksException - */ - public Scheduler(String ipAddress, int port, INcCollectionBuilder ncCollectionBuilder) throws HyracksException { - try { - IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port); - this.ncNameToNcInfos = hcc.getNodeControllerInfos(); - this.ncCollectionBuilder = ncCollectionBuilder; - loadIPAddressToNCMap(ncNameToNcInfos); - } catch (Exception e) { - throw HyracksException.create(e); - } - } - /** * The constructor of the scheduler. * diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java index ddf140f034..a26a5f7261 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.api.topology.ClusterTopology; import org.apache.hyracks.hdfs.api.INcCollectionBuilder; @@ -44,15 +45,14 @@ public class Scheduler { * @param ncNameToNcInfos * @throws HyracksException */ - public Scheduler(String ipAddress, int port) throws HyracksException { - scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port); + public Scheduler(String ipAddress, int port, ISocketChannelFactory channelFactory) throws HyracksException { + scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, port, channelFactory); } /** * The constructor of the scheduler. * - * @param ncNameToNcInfos - * the mapping from nc names to nc infos + * @param ncNameToNcInfos the mapping from nc names to nc infos * @throws HyracksException */ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException { @@ -62,10 +62,8 @@ public class Scheduler { /** * The constructor of the scheduler. * - * @param ncNameToNcInfos - * the mapping from nc names to nc infos - * @param topology - * the hyracks cluster toplogy + * @param ncNameToNcInfos the mapping from nc names to nc infos + * @param topology the hyracks cluster toplogy * @throws HyracksException */ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) @@ -76,8 +74,7 @@ public class Scheduler { /** * The constructor of the scheduler. * - * @param ncNameToNcInfos - * the mapping from nc names to nc infos + * @param ncNameToNcInfos the mapping from nc names to nc infos * @throws HyracksException */ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder builder)
