This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new 946106a  PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory 
to get the config in PhoenixRDD (addendum)
946106a is described below

commit 946106a40375244f7eb3cb1a5a36a6e7b16615a7
Author: Thomas D'Silva <tdsi...@apache.org>
AuthorDate: Thu Feb 28 16:52:36 2019 -0800

    PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory to get the 
config in PhoenixRDD (addendum)
---
 .../main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala   | 7 +++++--
 .../main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala  | 8 ++++----
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index d555954..9377986 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver
 import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, 
PhoenixConfigurationUtil}
+import org.apache.phoenix.query.HBaseFactoryProvider
 import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime}
 
 import scala.collection.JavaConversions._
@@ -28,8 +29,8 @@ object ConfigurationUtil extends Serializable {
 
     // Create an HBaseConfiguration object from the passed in config, if 
present
     val config = conf match {
-      case Some(c) => HBaseConfiguration.create(c)
-      case _ => HBaseConfiguration.create()
+      case Some(c) => 
HBaseFactoryProvider.getConfigurationFactory.getConfiguration(c)
+      case _ => HBaseFactoryProvider.getConfigurationFactory.getConfiguration()
     }
 
     // Set the tenantId in the config if present
@@ -41,6 +42,8 @@ object ConfigurationUtil extends Serializable {
     // Set the table to save to
     PhoenixConfigurationUtil.setOutputTableName(config, tableName)
     PhoenixConfigurationUtil.setPhysicalTableName(config, tableName)
+    // disable property provider evaluation
+    PhoenixConfigurationUtil.setPropertyPolicyProviderDisabled(config);
 
     // Infer column names from the DataFrame schema
     PhoenixConfigurationUtil.setUpsertColumnNames(config, Array(columns : _*))
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index 3b0289d..85a6d8a 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -28,7 +28,7 @@ class DataFrameFunctions(data: DataFrame) extends 
Serializable {
                saveToPhoenix(parameters("table"), zkUrl = 
parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), 
                
skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
    }
-  def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
+  def saveToPhoenix(tableName: String, conf: Option[Configuration] = None,
                     zkUrl: Option[String] = None, tenantId: Option[String] = 
None, skipNormalizingIdentifier: Boolean = false): Unit = {
 
     // Retrieve the schema field names and normalize to Phoenix, need to do 
this outside of mapPartitions
@@ -36,7 +36,7 @@ class DataFrameFunctions(data: DataFrame) extends 
Serializable {
     
 
     // Create a configuration object to use for saving
-    @transient val outConfig = 
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, 
tenantId, Some(conf))
+    @transient val outConfig = 
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, 
tenantId, conf)
 
     // Retrieve the zookeeper URL
     val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
@@ -45,9 +45,9 @@ class DataFrameFunctions(data: DataFrame) extends 
Serializable {
     val phxRDD = data.rdd.mapPartitions{ rows =>
  
        // Create a within-partition config to retrieve the ColumnInfo list
-       @transient val partitionConfig = 
ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, 
tenantId)
+       @transient val partitionConfig = ConfigurationUtil.getOutputCon 
figuration(tableName, fieldArray, zkUrlFinal, tenantId)
        @transient val columns = 
PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
- 
+
        rows.map { row =>
          val rec = new PhoenixRecordWritable(columns)
          row.toSeq.foreach { e => rec.add(e) }

Reply via email to