This is an automated email from the ASF dual-hosted git repository.
tdsilva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 8be7602 PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory
to get the config in PhoenixRDD (addendum)
8be7602 is described below
commit 8be7602f954c352d255601a54a36e58b0b0f08c1
Author: Thomas D'Silva <[email protected]>
AuthorDate: Thu Feb 28 16:52:36 2019 -0800
PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory to get the
config in PhoenixRDD (addendum)
---
.../main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala | 7 +++++--
.../main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala | 8 ++++----
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index d555954..9377986 100644
---
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver
import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder,
PhoenixConfigurationUtil}
+import org.apache.phoenix.query.HBaseFactoryProvider
import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime}
import scala.collection.JavaConversions._
@@ -28,8 +29,8 @@ object ConfigurationUtil extends Serializable {
// Create an HBaseConfiguration object from the passed in config, if
present
val config = conf match {
- case Some(c) => HBaseConfiguration.create(c)
- case _ => HBaseConfiguration.create()
+ case Some(c) =>
HBaseFactoryProvider.getConfigurationFactory.getConfiguration(c)
+ case _ => HBaseFactoryProvider.getConfigurationFactory.getConfiguration()
}
// Set the tenantId in the config if present
@@ -41,6 +42,8 @@ object ConfigurationUtil extends Serializable {
// Set the table to save to
PhoenixConfigurationUtil.setOutputTableName(config, tableName)
PhoenixConfigurationUtil.setPhysicalTableName(config, tableName)
+ // disable property provider evaluation
+ PhoenixConfigurationUtil.setPropertyPolicyProviderDisabled(config);
// Infer column names from the DataFrame schema
PhoenixConfigurationUtil.setUpsertColumnNames(config, Array(columns : _*))
diff --git
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index 3b0289d..85a6d8a 100644
---
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -28,7 +28,7 @@ class DataFrameFunctions(data: DataFrame) extends
Serializable {
saveToPhoenix(parameters("table"), zkUrl =
parameters.get("zkUrl"), tenantId = parameters.get("TenantId"),
skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
}
- def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
+ def saveToPhoenix(tableName: String, conf: Option[Configuration] = None,
zkUrl: Option[String] = None, tenantId: Option[String] =
None, skipNormalizingIdentifier: Boolean = false): Unit = {
// Retrieve the schema field names and normalize to Phoenix, need to do
this outside of mapPartitions
@@ -36,7 +36,7 @@ class DataFrameFunctions(data: DataFrame) extends
Serializable {
// Create a configuration object to use for saving
- @transient val outConfig =
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl,
tenantId, Some(conf))
+ @transient val outConfig =
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl,
tenantId, conf)
// Retrieve the zookeeper URL
val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
@@ -45,9 +45,9 @@ class DataFrameFunctions(data: DataFrame) extends
Serializable {
val phxRDD = data.rdd.mapPartitions{ rows =>
// Create a within-partition config to retrieve the ColumnInfo list
- @transient val partitionConfig =
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal,
tenantId)
+ @transient val partitionConfig = ConfigurationUtil.getOutputCon
figuration(tableName, fieldArray, zkUrlFinal, tenantId)
@transient val columns =
PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
-
+
rows.map { row =>
val rec = new PhoenixRecordWritable(columns)
row.toSeq.foreach { e => rec.add(e) }