Repository: carbondata Updated Branches: refs/heads/master 0668e7d71 -> 3262230cb
[CARBONDATA-2260] CarbonThriftServer should support store carbon table on S3 CarbonThriftServer should support store carbon table on S3 Add config for AK,SK,EndPoint when start carbonsession This closes #2073 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3262230c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3262230c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3262230c Branch: refs/heads/master Commit: 3262230cb3e099c20b2a14d9d55cad4b9fe91e2e Parents: 0668e7d Author: root <[email protected]> Authored: Sat Mar 17 17:45:34 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Thu Apr 26 21:59:21 2018 +0800 ---------------------------------------------------------------------- .../spark/thriftserver/CarbonThriftServer.scala | 45 +++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3262230c/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala index 34ac940..ce46af3 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala @@ -19,14 +19,21 @@ package org.apache.carbondata.spark.thriftserver import java.io.File +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 +import org.slf4j.{Logger, LoggerFactory} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +/** + * CarbonThriftServer support different modes: + * 1. read/write data from/to HDFS or local,it only needs configurate storePath + * 2. read/write data from/to S3, it needs provide access-key, secret-key, s3-endpoint + */ object CarbonThriftServer { def main(args: Array[String]): Unit = { @@ -34,6 +41,13 @@ object CarbonThriftServer { import org.apache.spark.sql.CarbonSession._ val sparkConf = new SparkConf(loadDefaults = true) + + val logger: Logger = LoggerFactory.getLogger(this.getClass) + if (args.length != 0 && args.length != 1 && args.length != 4) { + logger.error("parameters: storePath [access-key] [secret-key] [s3-endpoint]") + System.exit(0) + } + val builder = SparkSession .builder() .config(sparkConf) @@ -55,7 +69,16 @@ object CarbonThriftServer { val storePath = if (args.length > 0) args.head else null - val spark = builder.getOrCreateCarbonSession(storePath) + val spark = if (args.length <= 1) { + builder.getOrCreateCarbonSession(storePath) + } else { + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(0)) + builder.config(accessKey, args(1)) + .config(secretKey, args(2)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession(storePath) + } + val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000") try { Thread.sleep(Integer.parseInt(warmUpTime)) @@ -70,4 +93,24 @@ object CarbonThriftServer { HiveThriftServer2.startWithContext(spark.sqlContext) } + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) args(3) + else "" + } + }
