PHOENIX-2817 Phoenix-Spark plugin doesn't work in secured env(Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b8427937 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b8427937 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b8427937 Branch: refs/heads/calcite Commit: b8427937df52b4e4156e00a816048a427d82ad33 Parents: 3420a97 Author: Ankit Singhal <[email protected]> Authored: Wed Apr 6 12:07:18 2016 +0530 Committer: Ankit Singhal <[email protected]> Committed: Wed Apr 6 12:07:18 2016 +0530 ---------------------------------------------------------------------- .../apache/phoenix/spark/PhoenixSparkIT.scala | 7 +---- .../phoenix/spark/ConfigurationUtil.scala | 27 ++++++++++++++++---- .../org/apache/phoenix/spark/PhoenixRDD.scala | 13 +++++----- 3 files changed, 30 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8427937/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index 08c123a..ad4791d 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -53,16 +53,11 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { lazy val hbaseConfiguration = { val conf = PhoenixSparkITHelper.getTestClusterConfig - // The zookeeper quorum address defaults to "localhost" which is incorrect, let's fix it - val quorum = conf.get("hbase.zookeeper.quorum") - val clientPort = conf.get("hbase.zookeeper.property.clientPort") - val znodeParent = conf.get("zookeeper.znode.parent") - conf.set(HConstants.ZOOKEEPER_QUORUM, s"$quorum:$clientPort:$znodeParent") conf } lazy val quorumAddress = { - hbaseConfiguration.get(HConstants.ZOOKEEPER_QUORUM) + ConfigurationUtil.getZookeeperURL(hbaseConfiguration).get } override def beforeAll() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8427937/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala index 2f306f0..2f4311f 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala @@ -15,6 +15,7 @@ package org.apache.phoenix.spark import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants} +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil} import org.apache.phoenix.util.ColumnInfo import scala.collection.JavaConversions._ @@ -38,20 +39,29 @@ object ConfigurationUtil extends Serializable { // Override the Zookeeper URL if present. Throw exception if no address given. zkUrl match { - case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url ) + case Some(url) => setZookeeperURL(config, url) case _ => { - if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) { + if (ConfigurationUtil.getZookeeperURL(config).isEmpty) { throw new UnsupportedOperationException( s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided" ) } } } - // Return the configuration object config } + def setZookeeperURL(conf: Configuration, zkUrl: String) = { + val info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl) + conf.set(HConstants.ZOOKEEPER_QUORUM, info.getZookeeperQuorum) + if (info.getPort != null) + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort) + if (info.getRootNode != null) + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode) + + } + // Return a serializable representation of the columns def encodeColumns(conf: Configuration) = { ColumnInfoToStringEncoderDecoder.encode(conf, PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf) @@ -62,8 +72,15 @@ object ConfigurationUtil extends Serializable { def decodeColumns(conf: Configuration): List[ColumnInfo] = { ColumnInfoToStringEncoderDecoder.decode(conf).toList } - + def getZookeeperURL(conf: Configuration): Option[String] = { - Option(conf.get(HConstants.ZOOKEEPER_QUORUM)) + List( + Option(conf.get(HConstants.ZOOKEEPER_QUORUM)), + Option(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)), + Option(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)) + ).flatten match { + case Nil => None + case x: List[String] => Some(x.mkString(":")) + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8427937/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala index a8877fa..6560fd3 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala @@ -13,12 +13,12 @@ */ package org.apache.phoenix.spark -import java.sql.{Timestamp, DriverManager} +import java.sql.DriverManager import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants} import org.apache.hadoop.io.NullWritable -import org.apache.phoenix.jdbc.PhoenixDriver +import org.apache.phoenix.jdbc.{PhoenixDriver, PhoenixEmbeddedDriver} import org.apache.phoenix.mapreduce.PhoenixInputFormat import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil import org.apache.phoenix.schema.types._ @@ -26,8 +26,9 @@ import org.apache.phoenix.util.ColumnInfo import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.types._ + import scala.collection.JavaConverters._ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], @@ -86,9 +87,9 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], // Override the Zookeeper URL if present. Throw exception if no address given. zkUrl match { - case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url ) + case Some(url) => ConfigurationUtil.setZookeeperURL(config, url) case _ => { - if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) { + if(ConfigurationUtil.getZookeeperURL(config).isEmpty) { throw new UnsupportedOperationException( s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided" ) @@ -165,4 +166,4 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String], case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true) case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true) } -} \ No newline at end of file +}
