This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 15b88ef [CARBONDATA-3512]Index Server enhancement
15b88ef is described below
commit 15b88ef6a143f626a4969284582403597f4ca083
Author: BJangir <[email protected]>
AuthorDate: Wed Sep 4 23:33:54 2019 +0530
[CARBONDATA-3512]Index Server enhancement
What changes are proposed
1. Remove the keytab dependency for IndexServer. Currently IndexServer
needs to configure keytab and prinicipal for both
Client side and Server Side.But indexServer is super user and having super
user's keytab and principal in client is not
correct(specialy spark-submit). Since IndexServer is wrapped around spark
application so no need to ask Keytab from User
for IndexServer.
2. Authentication:-This happens in
org.apache.hadoop.security.SaslRpcClient#createSaslClient .it checks
getServerPrincipal
(spark.carbon.indexserver.principal) and Server protocol (UGI of
IndexServer). User need to configure spark.carbon.indexserver.principal
properly.
3. Authorization(ACL):- Support User who can access the IndexServer.
Authorization is controlled by hadoop.security.authorization
parameter. IndexServer has below scenarios.
1. Spark-submit,spark-shell,spark-Sql :-> These type of spark
Application has UGI where LoginUser and LoginUser will be same
either based on kinit or based on spark.yarn.principal. Authorization
is done in org.apache.hadoop.ipc.Server#authorize
using IndexServer ProtocolClass and ACL list which is prepared by
org.apache.hadoop.security.authorize.PolicyProvider
(generally hadoop-policy.xml with key security.indexserver.protocol.acl
, by default *).
2. Spark JDBCServer :- It has UGI based on ProxyUser like
user1(auth:PROXY)via spark//. where user1 is currentUser and spark is
LoginUser (JDBCServer started UGI).This type of Authorization happens
in org.apache.hadoop.security.authorize.ProxyUsers#authorize
with proxyUserAcl list prepared by
hadoop.proxyuser.<INDEXSERVER_UGI>.users
,hadoop.proxyuser.<INDEXSERVER_UGI>.hosts ,
hadoop.proxyuser.<INDEXSERVER_UGI>.groups.
TokenRenewer:- IndexServer is NOT Token based Hadoop Service. It does not
required Delegation Token as IndexServer does not connect to
KDC since it is inside SparkApplication(both, Indexclient and IndexServer)
so take advantage of it.
This closes #3375
---
.../carbondata/core/datamap/DataMapUtil.java | 16 +---
docs/index-server.md | 23 ++----
.../carbondata/events/IndexServerEvents.scala | 6 +-
.../carbondata/indexserver/DataMapJobs.scala | 2 +-
.../carbondata/indexserver/IndexServer.scala | 87 ++++++++++++++--------
5 files changed, 74 insertions(+), 60 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 8e63449..a9020e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -113,18 +113,10 @@ public class DataMapUtil {
for (Segment segment : validAndInvalidSegmentsInfo.getInvalidSegments()) {
invalidSegment.add(segment.getSegmentNo());
}
- DistributableDataMapFormat dataMapFormat = new
DistributableDataMapFormat(carbonTable,
- validAndInvalidSegmentsInfo.getValidSegments(), invalidSegment, true,
- dataMapToClear);
- try {
- dataMapJob.execute(dataMapFormat);
- } catch (Exception e) {
- if
(dataMapJob.getClass().getName().equalsIgnoreCase(DISTRIBUTED_JOB_NAME)) {
- LOGGER.warn("Failed to clear distributed cache.", e);
- } else {
- throw e;
- }
- }
+ DistributableDataMapFormat dataMapFormat =
+ new DistributableDataMapFormat(carbonTable,
validAndInvalidSegmentsInfo.getValidSegments(),
+ invalidSegment, true, dataMapToClear);
+ dataMapJob.execute(dataMapFormat);
}
public static void executeClearDataMapJob(CarbonTable carbonTable, String
jobClassName)
diff --git a/docs/index-server.md b/docs/index-server.md
index c743184..7aff6df 100644
--- a/docs/index-server.md
+++ b/docs/index-server.md
@@ -119,16 +119,6 @@ be written to file.
The user can set the location for these file by using
'carbon.indexserver.temp.path'. By default
table path would be used to write the files.
-## Security
-The security for the index server is controlled through
'spark.carbon.indexserver.keytab' and 'spark
-.carbon.indexserver.principal'. These allow the RPC framework to login using
the principal. It is
-recommended that the principal should be a super user, and the user should be
exclusive for index
-server so that it does not grant access to any other service. Internally the
operations would be
-executed as a Privileged Action using the login user.
-
-The Index Server is a long running service therefore the 'spark.yarn.keytab'
and 'spark.yarn
-.principal' should be configured.
-
## Configurations
##### carbon.properties(JDBCServer)
@@ -160,8 +150,6 @@ The Index Server is a long running service therefore the
'spark.yarn.keytab' and
| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
-| spark.carbon.indexserver.principal | NA | Used for authentication, whether
a valid service is trying to connect to the server or not. Set in both
IndexServer and JDBCServer. |
-| spark.carbon.indexserver.keytab | NA | Specify the path to the keytab
file through which authentication would happen. Set in both IndexServer and
JDBCServer. |
| spark.dynamicAllocation.enabled | true | Set to false, so that spark does
not kill the executor, If executors are killed, cache would be lost. Applicable
only for Index Server. |
| spark.yarn.principal | NA | Should be set to the same user used for
JDBCServer. Required only for IndexServer. |
|spark.yarn.keytab| NA | Should be set to the same as JDBCServer. |
@@ -180,6 +168,12 @@ that will authenticate the user to access the index server
and no other service.
| Name | Default Value | Description |
|:----------:|:-------------:|:------: |
| ipc.client.rpc-timeout.ms | NA | Set the above property to some appropriate
value based on your estimated query time. The best option is to set this to the
same value as spark.network.timeout. |
+| hadoop.security.authorization | false | Property to enable the hadoop
security which is required only on the server side. |
+| hadoop.proxyuser.<indexserver_user>.users | NA | Property to set Proxy User
list for which IndexServer permission were to be given ,check
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
+| hadoop.proxyuser.<indexserver_user>.hosts | NA | Property to set hosts list
for which IndexServer permission were to be given ,check
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
+| hadoop.proxyuser.<indexserver_user>.groups | NA | Property to set groups
list for which IndexServer permission to be given ,check
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html|
+| security.indexserver.protocol.acl | * | Property to set List of User to be
Authorized for Other than proxy Spark Application |
+
##### dynamic-properties(set command)
@@ -206,11 +200,6 @@ A. The exception would show the size of response it is
trying to send over the
network. Use ipc.maximum.response.length to a value bigger than the
response size.
-Q. **Index server is throwing Kerberos principal not set exception**
-
-A. Set spark.carbon.indexserver.principal to the correct principal in both
IndexServer and
-JDBCServer configurations.
-
Q. **Unable to connect to index server**
A. Check whether the carbon.properties configurations are set in JDBCServer as
well as the index
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
index fb86cd3..a8a8544 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
@@ -25,5 +25,9 @@ import
org.apache.carbondata.core.metadata.schema.table.CarbonTable
case class IndexServerLoadEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
segment: List[Segment],
- invalidsegment: List[String])
+ invalidsegment: List[String]) extends Event with IndexServerEventInfo
+
+case class IndexServerEvent(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ username: String)
extends Event with IndexServerEventInfo
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 1fee051..49ef8bb 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -130,7 +130,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
.getCarbonTable.getTablePath, dataMapFormat.getQueryId,
dataMapFormat.isCountStarJob)
// Fire a job to clear the cache from executors as Embedded mode does not
maintain the cache.
IndexServer.invalidateSegmentCache(dataMapFormat.getCarbonTable,
dataMapFormat
- .getValidSegmentIds.asScala.toArray)
+ .getValidSegmentIds.asScala.toArray, isFallBack = true)
spark.sparkContext.setLocalProperty("spark.job.description",
originalJobDesc)
splits
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 926d2bd..b1d0e43 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -25,9 +25,10 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
+import org.apache.hadoop.ipc.{ProtocolInfo, RPC, Server}
import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.{KerberosInfo, SecurityUtil,
UserGroupInformation}
+import org.apache.hadoop.security.{KerberosInfo, UserGroupInformation}
+import org.apache.hadoop.security.authorize.{PolicyProvider, Service}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.{CarbonSession, SparkSession}
@@ -38,10 +39,13 @@ import
org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DistributableDataMapFormat
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapperContainer
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{IndexServerEvent, OperationContext,
OperationListenerBus}
-@ProtocolInfo(protocolName = "Server", protocolVersion = 1)
+@ProtocolInfo(protocolName =
"org.apache.carbondata.indexserver.ServerInterface",
+ protocolVersion = 1)
@KerberosInfo(serverPrincipal = "spark.carbon.indexserver.principal",
clientPrincipal = "spark.carbon.indexserver.principal")
trait ServerInterface {
@@ -59,7 +63,7 @@ trait ServerInterface {
* Invalidate the cache for the specified segments only. Used in case of
compaction/Update/Delete.
*/
def invalidateSegmentCache(carbonTable: CarbonTable,
- segmentIds: Array[String], jobGroupId: String = ""): Unit
+ segmentIds: Array[String], jobGroupId: String = "", isFallBack: Boolean =
false): Unit
def getCount(request: DistributableDataMapFormat): LongWritable
@@ -96,7 +100,7 @@ object IndexServer extends ServerInterface {
private val isExecutorLRUConfigured: Boolean =
CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE)
!= null
-
+ private val operationContext: OperationContext = new OperationContext
/**
* Getting sparkSession from ActiveSession because in case of embedded mode
the session would
* have already been created whereas in case of distributed mode the session
would be
@@ -104,6 +108,9 @@ object IndexServer extends ServerInterface {
*/
private lazy val sparkSession: SparkSession = SparkSQLUtil.getSparkSession
+ /**
+ * Perform the operation 'f' on behalf of the login user.
+ */
private def doAs[T](f: => T): T = {
UserGroupInformation.getLoginUser.doAs(new PrivilegedAction[T] {
override def run(): T = {
@@ -134,6 +141,10 @@ object IndexServer extends ServerInterface {
}
new LongWritable(splits.map(_._2.toLong).sum)
}
+ // Fire Generic Event like ACLCheck..etc
+ val indexServerEvent = IndexServerEvent(sparkSession,
request.getCarbonTable,
+ Server.getRemoteUser.getShortUserName)
+ OperationListenerBus.getInstance().fireEvent(indexServerEvent,
operationContext)
if (request.ifAsyncCall) {
submitAsyncTask(getCountTask)
new LongWritable(0)
@@ -149,6 +160,10 @@ object IndexServer extends ServerInterface {
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id",
request.getTaskGroupId)
sparkSession.sparkContext
.setLocalProperty("spark.job.description", request.getTaskGroupDesc)
+ // Fire Generic Event like ACLCheck..etc
+ val indexServerEvent = IndexServerEvent(sparkSession,
request.getCarbonTable,
+ Server.getRemoteUser.getShortUserName)
+ OperationListenerBus.getInstance().fireEvent(indexServerEvent,
operationContext)
}
if (!request.getInvalidSegments.isEmpty) {
DistributedRDDUtils
@@ -167,18 +182,26 @@ object IndexServer extends ServerInterface {
}
override def invalidateSegmentCache(carbonTable: CarbonTable,
- segmentIds: Array[String], jobGroupId: String = ""): Unit = doAs {
- val databaseName = carbonTable.getDatabaseName
- val tableName = carbonTable.getTableName
- val jobgroup: String = " Invalided Segment Cache for " + databaseName +
"." + tableName
- sparkSession.sparkContext.setLocalProperty("spark.job.description",
jobgroup)
- sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", jobGroupId)
- new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList)
- .collect()
- if (segmentIds.nonEmpty) {
- DistributedRDDUtils
- .invalidateSegmentMapping(s"${databaseName}_$tableName",
- segmentIds)
+ segmentIds: Array[String], jobGroupId: String = "", isFallBack: Boolean
= false): Unit = {
+ doAs {
+ val databaseName = carbonTable.getDatabaseName
+ val tableName = carbonTable.getTableName
+ val jobgroup: String = " Invalided Segment Cache for " + databaseName +
"." + tableName
+ sparkSession.sparkContext.setLocalProperty("spark.job.description",
jobgroup)
+ sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id",
jobGroupId)
+ if (!isFallBack) {
+ val indexServerEvent = IndexServerEvent(sparkSession,
+ carbonTable,
+ Server.getRemoteUser.getShortUserName)
+ OperationListenerBus.getInstance().fireEvent(indexServerEvent,
operationContext)
+ }
+ new InvalidateSegmentCacheRDD(sparkSession, carbonTable,
segmentIds.toList)
+ .collect()
+ if (segmentIds.nonEmpty) {
+ DistributedRDDUtils
+ .invalidateSegmentMapping(s"${databaseName}_$tableName",
+ segmentIds)
+ }
}
}
@@ -208,8 +231,8 @@ object IndexServer extends ServerInterface {
.setNumHandlers(numHandlers)
.setProtocol(classOf[ServerInterface]).build
server.start()
-
SecurityUtil.login(sparkSession.asInstanceOf[CarbonSession].sessionState.newHadoopConf(),
- "spark.carbon.indexserver.keytab",
"spark.carbon.indexserver.principal")
+ // Define the Authorization Policy provider
+ server.refreshServiceAcl(conf, new IndexServerPolicyProvider)
sparkSession.sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd:
SparkListenerApplicationEnd): Unit = {
LOGGER.info("Spark Application has ended. Stopping the Index Server")
@@ -244,16 +267,22 @@ object IndexServer extends ServerInterface {
def getClient: ServerInterface = {
val configuration = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
import org.apache.hadoop.ipc.RPC
- val indexServerUser = sparkSession.sparkContext.getConf
- .get("spark.carbon.indexserver.principal", "")
- val indexServerKeyTab = sparkSession.sparkContext.getConf
- .get("spark.carbon.indexserver.keytab", "")
- val ugi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(indexServerUser,
- indexServerKeyTab)
- LOGGER.info("Login successful for user " + indexServerUser)
- RPC.getProxy(classOf[ServerInterface],
+ RPC.getProtocolProxy(classOf[ServerInterface],
RPC.getProtocolVersion(classOf[ServerInterface]),
- new InetSocketAddress(serverIp, serverPort), ugi,
- FileFactory.getConfiguration,
NetUtils.getDefaultSocketFactory(configuration))
+ new InetSocketAddress(serverIp, serverPort),
+ UserGroupInformation.getLoginUser, configuration,
+ NetUtils.getDefaultSocketFactory(configuration)).getProxy
+ }
+
+ /**
+ * This class to define the acl for indexserver ,similar to
HDFSPolicyProvider.
+ * key in Service can be configured in hadoop-policy.xml or in
Configuration().This ACL
+ * will be used for Authorization in
+ * org.apache.hadoop.security.authorize.ServiceAuthorizationManager#authorize
+ */
+ class IndexServerPolicyProvider extends PolicyProvider {
+ override def getServices: Array[Service] = {
+ Array(new Service("security.indexserver.protocol.acl",
classOf[ServerInterface]))
+ }
}
}