This is an automated email from the ASF dual-hosted git repository.

htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit c77174917bc8de3b3a0d050d06ec1b06ce39bf0f
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Thu Sep 1 13:32:19 2022 -0700

    [ASTERIXDB-3072] Make HDFS scheduler to use different type of channels
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    HDFS scheduler only uses plain connection. This change allows it
    to use different type of connections
    
    Change-Id: I64f9f4a30a6564aac26e99433ccc695975a22a3b
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17224
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../apache/asterix/external/util/HDFSUtils.java    |  5 ++-
 .../apache/hyracks/hdfs/scheduler/Scheduler.java   | 42 ++++++++++------------
 .../apache/hyracks/hdfs2/scheduler/Scheduler.java  | 17 ++++-----
 3 files changed, 29 insertions(+), 35 deletions(-)

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 9e49d86612..3506216e10 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -62,6 +62,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 
@@ -72,10 +73,12 @@ public class HDFSUtils {
 
     public static Scheduler initializeHDFSScheduler(ICCServiceContext 
serviceCtx) throws HyracksDataException {
         ICCContext ccContext = serviceCtx.getCCContext();
+        INetworkSecurityManager networkSecurityManager = 
serviceCtx.getControllerService().getNetworkSecurityManager();
         Scheduler scheduler = null;
         try {
             scheduler = new 
Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
-                    ccContext.getClusterControllerInfo().getClientNetPort());
+                    ccContext.getClusterControllerInfo().getClientNetPort(),
+                    networkSecurityManager.getSocketChannelFactory());
         } catch (HyracksException e) {
             throw new 
RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index b9d68f7bf3..f5bf07b6b0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.hdfs.api.INcCollection;
 import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
@@ -56,16 +57,24 @@ public class Scheduler {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    /** a list of NCs */
+    /**
+     * a list of NCs
+     */
     private String[] NCs;
 
-    /** a map from ip to NCs */
+    /**
+     * a map from ip to NCs
+     */
     private Map<String, List<String>> ipToNcMapping = new HashMap<>();
 
-    /** a map from the NC name to the index */
+    /**
+     * a map from the NC name to the index
+     */
     private Map<String, Integer> ncNameToIndex = new HashMap<>();
 
-    /** a map from NC name to the NodeControllerInfo */
+    /**
+     * a map from NC name to the NodeControllerInfo
+     */
     private Map<String, NodeControllerInfo> ncNameToNcInfos;
 
     /**
@@ -76,13 +85,15 @@ public class Scheduler {
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
+     * @param ipAddress      IP address
+     * @param port           Port
+     * @param channelFactory Channel Factory
      * @throws HyracksException
      */
 
-    public Scheduler(String ipAddress, int port) throws HyracksException {
+    public Scheduler(String ipAddress, int port, ISocketChannelFactory 
channelFactory) throws HyracksException {
         try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, 
port);
+            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, 
port, channelFactory);
             this.ncNameToNcInfos = hcc.getNodeControllerInfos();
             ClusterTopology topology = hcc.getClusterTopology();
             this.ncCollectionBuilder = topology == null ? new 
IPProximityNcCollectionBuilder()
@@ -93,23 +104,6 @@ public class Scheduler {
         }
     }
 
-    /**
-     * The constructor of the scheduler.
-     *
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    public Scheduler(String ipAddress, int port, INcCollectionBuilder 
ncCollectionBuilder) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, 
port);
-            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
-            this.ncCollectionBuilder = ncCollectionBuilder;
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw HyracksException.create(e);
-        }
-    }
-
     /**
      * The constructor of the scheduler.
      *
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
index ddf140f034..a26a5f7261 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs2/scheduler/Scheduler.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
 
@@ -44,15 +45,14 @@ public class Scheduler {
      * @param ncNameToNcInfos
      * @throws HyracksException
      */
-    public Scheduler(String ipAddress, int port) throws HyracksException {
-        scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, 
port);
+    public Scheduler(String ipAddress, int port, ISocketChannelFactory 
channelFactory) throws HyracksException {
+        scheduler = new org.apache.hyracks.hdfs.scheduler.Scheduler(ipAddress, 
port, channelFactory);
     }
 
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws 
HyracksException {
@@ -62,10 +62,8 @@ public class Scheduler {
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
-     * @param topology
-     *            the hyracks cluster toplogy
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
+     * @param topology        the hyracks cluster toplogy
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, 
ClusterTopology topology)
@@ -76,8 +74,7 @@ public class Scheduler {
     /**
      * The constructor of the scheduler.
      *
-     * @param ncNameToNcInfos
-     *            the mapping from nc names to nc infos
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
      * @throws HyracksException
      */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, 
INcCollectionBuilder builder)

Reply via email to