This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 15b88ef  [CARBONDATA-3512]Index Server enhancement
15b88ef is described below

commit 15b88ef6a143f626a4969284582403597f4ca083
Author: BJangir <[email protected]>
AuthorDate: Wed Sep 4 23:33:54 2019 +0530

    [CARBONDATA-3512]Index Server enhancement
    
    What changes are proposed
    
    1. Remove the keytab dependency for IndexServer. Currently IndexServer 
needs to configure keytab and prinicipal for both
    Client side and Server Side.But indexServer is super user and having super 
user's keytab and principal in client is not
    correct(specialy spark-submit). Since IndexServer is wrapped around spark 
application so no need to ask Keytab from User
    for IndexServer.
    
    2. Authentication:-This happens in 
org.apache.hadoop.security.SaslRpcClient#createSaslClient .it checks 
getServerPrincipal
    (spark.carbon.indexserver.principal) and Server protocol (UGI of 
IndexServer). User need to configure spark.carbon.indexserver.principal
    properly.
    
    3. Authorization(ACL):- Support User who can access the IndexServer. 
Authorization is controlled by hadoop.security.authorization
    parameter. IndexServer has below scenarios.
        1. Spark-submit,spark-shell,spark-Sql :-> These type of spark 
Application has UGI where LoginUser and LoginUser will be same
        either based on kinit or based on spark.yarn.principal. Authorization 
is done in org.apache.hadoop.ipc.Server#authorize
        using IndexServer ProtocolClass and ACL list which is prepared by 
org.apache.hadoop.security.authorize.PolicyProvider
        (generally hadoop-policy.xml with key security.indexserver.protocol.acl 
, by default *).
        2. Spark JDBCServer :- It has UGI based on ProxyUser like 
user1(auth:PROXY)via spark//. where user1 is currentUser and spark is
        LoginUser (JDBCServer started UGI).This type of Authorization happens 
in org.apache.hadoop.security.authorize.ProxyUsers#authorize
        with proxyUserAcl list prepared by 
hadoop.proxyuser.<INDEXSERVER_UGI>.users 
,hadoop.proxyuser.<INDEXSERVER_UGI>.hosts ,
        hadoop.proxyuser.<INDEXSERVER_UGI>.groups.
    
    TokenRenewer:- IndexServer is NOT Token based Hadoop Service. It does not 
required Delegation Token as IndexServer does not connect to
    KDC since it is inside SparkApplication(both, Indexclient and IndexServer) 
so take advantage of it.
    
    This closes #3375
---
 .../carbondata/core/datamap/DataMapUtil.java       | 16 +---
 docs/index-server.md                               | 23 ++----
 .../carbondata/events/IndexServerEvents.scala      |  6 +-
 .../carbondata/indexserver/DataMapJobs.scala       |  2 +-
 .../carbondata/indexserver/IndexServer.scala       | 87 ++++++++++++++--------
 5 files changed, 74 insertions(+), 60 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 8e63449..a9020e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -113,18 +113,10 @@ public class DataMapUtil {
     for (Segment segment : validAndInvalidSegmentsInfo.getInvalidSegments()) {
       invalidSegment.add(segment.getSegmentNo());
     }
-    DistributableDataMapFormat dataMapFormat = new 
DistributableDataMapFormat(carbonTable,
-        validAndInvalidSegmentsInfo.getValidSegments(), invalidSegment, true,
-        dataMapToClear);
-    try {
-      dataMapJob.execute(dataMapFormat);
-    } catch (Exception e) {
-      if 
(dataMapJob.getClass().getName().equalsIgnoreCase(DISTRIBUTED_JOB_NAME)) {
-        LOGGER.warn("Failed to clear distributed cache.", e);
-      } else {
-        throw e;
-      }
-    }
+    DistributableDataMapFormat dataMapFormat =
+        new DistributableDataMapFormat(carbonTable, 
validAndInvalidSegmentsInfo.getValidSegments(),
+            invalidSegment, true, dataMapToClear);
+    dataMapJob.execute(dataMapFormat);
   }
 
   public static void executeClearDataMapJob(CarbonTable carbonTable, String 
jobClassName)
diff --git a/docs/index-server.md b/docs/index-server.md
index c743184..7aff6df 100644
--- a/docs/index-server.md
+++ b/docs/index-server.md
@@ -119,16 +119,6 @@ be written to file.
 The user can set the location for these file by using 
'carbon.indexserver.temp.path'. By default
 table path would be used to write the files.
 
-## Security
-The security for the index server is controlled through 
'spark.carbon.indexserver.keytab' and 'spark
-.carbon.indexserver.principal'. These allow the RPC framework to login using 
the principal. It is
-recommended that the principal should be a super user, and the user should be 
exclusive for index
-server so that it does not grant access to any other service. Internally the 
operations would be
-executed  as a Privileged Action using the login user.
-
-The Index Server is a long running service therefore the 'spark.yarn.keytab' 
and 'spark.yarn
-.principal' should be configured.
-
 ## Configurations
 
 ##### carbon.properties(JDBCServer) 
@@ -160,8 +150,6 @@ The Index Server is a long running service therefore the 
'spark.yarn.keytab' and
 
 | Name     |      Default Value    |  Description |
 |:----------:|:-------------:|:------:       |
-| spark.carbon.indexserver.principal |  NA | Used for authentication, whether 
a valid service is  trying to connect to the server or not. Set in both 
IndexServer and JDBCServer.     |
-| spark.carbon.indexserver.keytab |    NA   |   Specify the path to the keytab 
file through which authentication would happen. Set in both IndexServer and 
JDBCServer. |
 | spark.dynamicAllocation.enabled | true | Set to false, so that spark does 
not kill the executor, If executors are killed, cache would be lost. Applicable 
only for Index Server. |
 | spark.yarn.principal | NA | Should be set to the same user used for 
JDBCServer. Required only for IndexServer.   |
 |spark.yarn.keytab| NA | Should be set to the same as JDBCServer.   |
@@ -180,6 +168,12 @@ that will authenticate the user to access the index server 
and no other service.
 | Name     |      Default Value    |  Description |
 |:----------:|:-------------:|:------:       |
 | ipc.client.rpc-timeout.ms |  NA | Set the above property to some appropriate 
value based on your estimated query time. The best option is to set this to the 
same value as spark.network.timeout. |
+| hadoop.security.authorization |  false | Property to enable the hadoop 
security which is required only on the server side. |
+| hadoop.proxyuser.<indexserver_user>.users |  NA | Property to set Proxy User 
list for which IndexServer permission were to be given ,check 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
+| hadoop.proxyuser.<indexserver_user>.hosts |  NA | Property to set hosts list 
for which IndexServer permission were to be given ,check 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
+| hadoop.proxyuser.<indexserver_user>.groups |  NA | Property to set groups 
list for which IndexServer permission to be given ,check 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
+| security.indexserver.protocol.acl |  * | Property to set List of User to be 
Authorized for Other than proxy Spark Application |
+
 
 ##### dynamic-properties(set command)
 
@@ -206,11 +200,6 @@ A. The exception would show the size of response it is 
trying to send over the
 network. Use ipc.maximum.response.length to a value bigger than the
 response size.
 
-Q. **Index server is throwing Kerberos principal not set exception**
-
-A. Set spark.carbon.indexserver.principal to the correct principal in both 
IndexServer and
-JDBCServer configurations.
-
 Q. **Unable to connect to index server**
 
 A. Check whether the carbon.properties configurations are set in JDBCServer as 
well as the index
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
index fb86cd3..a8a8544 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
@@ -25,5 +25,9 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 case class IndexServerLoadEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     segment: List[Segment],
-    invalidsegment: List[String])
+    invalidsegment: List[String]) extends Event with IndexServerEventInfo
+
+case class IndexServerEvent(sparkSession: SparkSession,
+    carbonTable: CarbonTable,
+    username: String)
   extends Event with IndexServerEventInfo
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 1fee051..49ef8bb 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -130,7 +130,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
       .getCarbonTable.getTablePath, dataMapFormat.getQueryId, 
dataMapFormat.isCountStarJob)
     // Fire a job to clear the cache from executors as Embedded mode does not 
maintain the cache.
     IndexServer.invalidateSegmentCache(dataMapFormat.getCarbonTable, 
dataMapFormat
-      .getValidSegmentIds.asScala.toArray)
+      .getValidSegmentIds.asScala.toArray, isFallBack = true)
     spark.sparkContext.setLocalProperty("spark.job.description", 
originalJobDesc)
     splits
   }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 926d2bd..b1d0e43 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -25,9 +25,10 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
+import org.apache.hadoop.ipc.{ProtocolInfo, RPC, Server}
 import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, 
UserGroupInformation}
+import org.apache.hadoop.security.{KerberosInfo, UserGroupInformation}
+import org.apache.hadoop.security.authorize.{PolicyProvider, Service}
 import org.apache.spark.SparkConf
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.{CarbonSession, SparkSession}
@@ -38,10 +39,13 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DistributableDataMapFormat
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapperContainer
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{IndexServerEvent, OperationContext, 
OperationListenerBus}
 
-@ProtocolInfo(protocolName = "Server", protocolVersion = 1)
+@ProtocolInfo(protocolName = 
"org.apache.carbondata.indexserver.ServerInterface",
+  protocolVersion = 1)
 @KerberosInfo(serverPrincipal = "spark.carbon.indexserver.principal",
   clientPrincipal = "spark.carbon.indexserver.principal")
 trait ServerInterface {
@@ -59,7 +63,7 @@ trait ServerInterface {
    * Invalidate the cache for the specified segments only. Used in case of 
compaction/Update/Delete.
    */
   def invalidateSegmentCache(carbonTable: CarbonTable,
-      segmentIds: Array[String], jobGroupId: String = ""): Unit
+  segmentIds: Array[String], jobGroupId: String = "", isFallBack: Boolean = 
false): Unit
 
   def getCount(request: DistributableDataMapFormat): LongWritable
 
@@ -96,7 +100,7 @@ object IndexServer extends ServerInterface {
   private val isExecutorLRUConfigured: Boolean =
     CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE) 
!= null
-
+  private val operationContext: OperationContext = new OperationContext
   /**
    * Getting sparkSession from ActiveSession because in case of embedded mode 
the session would
    * have already been created whereas in case of distributed mode the session 
would be
@@ -104,6 +108,9 @@ object IndexServer extends ServerInterface {
    */
   private lazy val sparkSession: SparkSession = SparkSQLUtil.getSparkSession
 
+  /**
+   * Perform the operation 'f' on behalf of the login user.
+   */
   private def doAs[T](f: => T): T = {
     UserGroupInformation.getLoginUser.doAs(new PrivilegedAction[T] {
       override def run(): T = {
@@ -134,6 +141,10 @@ object IndexServer extends ServerInterface {
         }
         new LongWritable(splits.map(_._2.toLong).sum)
       }
+      // Fire Generic Event like ACLCheck..etc
+      val indexServerEvent = IndexServerEvent(sparkSession, 
request.getCarbonTable,
+        Server.getRemoteUser.getShortUserName)
+      OperationListenerBus.getInstance().fireEvent(indexServerEvent, 
operationContext)
       if (request.ifAsyncCall) {
         submitAsyncTask(getCountTask)
         new LongWritable(0)
@@ -149,6 +160,10 @@ object IndexServer extends ServerInterface {
         sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", 
request.getTaskGroupId)
         sparkSession.sparkContext
           .setLocalProperty("spark.job.description", request.getTaskGroupDesc)
+        // Fire Generic Event like ACLCheck..etc
+        val indexServerEvent = IndexServerEvent(sparkSession, 
request.getCarbonTable,
+          Server.getRemoteUser.getShortUserName)
+        OperationListenerBus.getInstance().fireEvent(indexServerEvent, 
operationContext)
       }
       if (!request.getInvalidSegments.isEmpty) {
         DistributedRDDUtils
@@ -167,18 +182,26 @@ object IndexServer extends ServerInterface {
   }
 
   override def invalidateSegmentCache(carbonTable: CarbonTable,
-      segmentIds: Array[String], jobGroupId: String = ""): Unit = doAs {
-    val databaseName = carbonTable.getDatabaseName
-    val tableName = carbonTable.getTableName
-    val jobgroup: String = " Invalided Segment Cache for " + databaseName + 
"." + tableName
-    sparkSession.sparkContext.setLocalProperty("spark.job.description", 
jobgroup)
-    sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", jobGroupId)
-    new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList)
-      .collect()
-    if (segmentIds.nonEmpty) {
-      DistributedRDDUtils
-        .invalidateSegmentMapping(s"${databaseName}_$tableName",
-          segmentIds)
+      segmentIds: Array[String], jobGroupId: String = "", isFallBack: Boolean 
= false): Unit = {
+    doAs {
+      val databaseName = carbonTable.getDatabaseName
+      val tableName = carbonTable.getTableName
+      val jobgroup: String = " Invalided Segment Cache for " + databaseName + 
"." + tableName
+      sparkSession.sparkContext.setLocalProperty("spark.job.description", 
jobgroup)
+      sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", 
jobGroupId)
+      if (!isFallBack) {
+        val indexServerEvent = IndexServerEvent(sparkSession,
+          carbonTable,
+          Server.getRemoteUser.getShortUserName)
+        OperationListenerBus.getInstance().fireEvent(indexServerEvent, 
operationContext)
+      }
+      new InvalidateSegmentCacheRDD(sparkSession, carbonTable, 
segmentIds.toList)
+        .collect()
+      if (segmentIds.nonEmpty) {
+        DistributedRDDUtils
+          .invalidateSegmentMapping(s"${databaseName}_$tableName",
+            segmentIds)
+      }
     }
   }
 
@@ -208,8 +231,8 @@ object IndexServer extends ServerInterface {
         .setNumHandlers(numHandlers)
         .setProtocol(classOf[ServerInterface]).build
       server.start()
-      
SecurityUtil.login(sparkSession.asInstanceOf[CarbonSession].sessionState.newHadoopConf(),
-        "spark.carbon.indexserver.keytab", 
"spark.carbon.indexserver.principal")
+      // Define the Authorization Policy provider
+      server.refreshServiceAcl(conf, new IndexServerPolicyProvider)
       sparkSession.sparkContext.addSparkListener(new SparkListener {
         override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd): Unit = {
           LOGGER.info("Spark Application has ended. Stopping the Index Server")
@@ -244,16 +267,22 @@ object IndexServer extends ServerInterface {
   def getClient: ServerInterface = {
     val configuration = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
     import org.apache.hadoop.ipc.RPC
-    val indexServerUser = sparkSession.sparkContext.getConf
-      .get("spark.carbon.indexserver.principal", "")
-    val indexServerKeyTab = sparkSession.sparkContext.getConf
-      .get("spark.carbon.indexserver.keytab", "")
-    val ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(indexServerUser,
-      indexServerKeyTab)
-    LOGGER.info("Login successful for user " + indexServerUser)
-    RPC.getProxy(classOf[ServerInterface],
+    RPC.getProtocolProxy(classOf[ServerInterface],
       RPC.getProtocolVersion(classOf[ServerInterface]),
-      new InetSocketAddress(serverIp, serverPort), ugi,
-      FileFactory.getConfiguration, 
NetUtils.getDefaultSocketFactory(configuration))
+      new InetSocketAddress(serverIp, serverPort),
+      UserGroupInformation.getLoginUser, configuration,
+      NetUtils.getDefaultSocketFactory(configuration)).getProxy
+  }
+
+  /**
+   * This class to define the acl for indexserver ,similar to 
HDFSPolicyProvider.
+   * key in Service can be configured in hadoop-policy.xml or in  
Configuration().This ACL
+   * will be used for Authorization in
+   * org.apache.hadoop.security.authorize.ServiceAuthorizationManager#authorize
+   */
+  class IndexServerPolicyProvider extends PolicyProvider {
+    override def getServices: Array[Service] = {
+      Array(new Service("security.indexserver.protocol.acl", 
classOf[ServerInterface]))
+    }
   }
 }

Reply via email to