This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new a1c298c  PHOENIX-7377 Add option not to add CF to the Spark Column 
name in Spark Connector (#134)
a1c298c is described below

commit a1c298c2dc35c2db9f0ebf8235891564e9b88d2b
Author: rejeb ben rejeb <[email protected]>
AuthorDate: Thu Aug 15 17:50:11 2024 +0200

    PHOENIX-7377 Add option not to add CF to the Spark Column name in Spark 
Connector (#134)
---
 phoenix5-spark/README.md                           | 11 +++++
 phoenix5-spark/src/it/resources/globalSetup.sql    |  3 ++
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala  | 54 +++++++++++++++++++++
 .../v2/reader/PhoenixDataSourceReader.java         |  4 +-
 .../org/apache/phoenix/spark/SparkSchemaUtil.scala | 11 +++--
 phoenix5-spark3/README.md                          | 10 ++++
 .../sql/connector/PhoenixTestingDataSource.java    |  2 +-
 phoenix5-spark3/src/it/resources/globalSetup.sql   |  3 ++
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala  | 55 +++++++++++++++++++++-
 .../spark/sql/connector/PhoenixDataSource.java     |  3 +-
 .../org/apache/phoenix/spark/SparkSchemaUtil.scala | 11 +++--
 11 files changed, 153 insertions(+), 14 deletions(-)

diff --git a/phoenix5-spark/README.md b/phoenix5-spark/README.md
index 73d68c2..3542a19 100644
--- a/phoenix5-spark/README.md
+++ b/phoenix5-spark/README.md
@@ -293,6 +293,17 @@ to executors as a comma-separated list against the key 
`phoenixConfigs` i.e (Pho
     Note that the same property values will be used for both the driver and 
all executors and
     these configurations are used each time a connection is made (both on the 
driver and executors).
 
+- As of [PHOENIX-7377](https://issues.apache.org/jira/browse/PHOENIX-7377), 
you can pass boolean parameter to avoid mapping
+  non default family columns to `columnFamily.columnName` by setting the key 
`doNotMapColumnFamily` to `true` (default value: `false`), for ex:
+    ```scala
+    df = spark
+      .sqlContext
+      .read
+      .format("phoenix")
+      .options(Map("table" -> "Table1", "jdbcUrl" -> 
"jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true"))
+      .load;
+    ```
+  
 ## Limitations
 
 - Basic support for column and predicate pushdown using the Data Source API
diff --git a/phoenix5-spark/src/it/resources/globalSetup.sql 
b/phoenix5-spark/src/it/resources/globalSetup.sql
index 8a3a4c2..aa5a81f 100644
--- a/phoenix5-spark/src/it/resources/globalSetup.sql
+++ b/phoenix5-spark/src/it/resources/globalSetup.sql
@@ -64,3 +64,6 @@ CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT 
NULL, ORGANIZATION_ID
 CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id 
UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id 
TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id 
UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id 
DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id 
BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id 
UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED [...]
 CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY 
KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id 
TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id 
UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id 
DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id 
BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id 
UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id U [...]
 UPSERT INTO GIGANTIC_TABLE 
VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This
 is random textA','a','a','a')
+
+CREATE TABLE table_with_col_family (id BIGINT NOT NULL PRIMARY KEY, data.col1 
VARCHAR)
+UPSERT INTO table_with_col_family (id, col1) VALUES (1, 'test_row_1')
diff --git 
a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala 
b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 464a588..f1a09d6 100644
--- a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -913,4 +913,58 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     df.count() shouldEqual 1
   }
 
+  test("Skip column family name when converting schema") {
+    val phoenixSchema = List(new ColumnInfo("columFamily.columnName", 
PVarchar.INSTANCE.getSqlType))
+
+    val catalystSchema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema, 
doNotMapColumnFamily = true)
+
+    val expected = new StructType(List(StructField("columnName", StringType, 
nullable = true)).toArray)
+
+    catalystSchema shouldEqual expected
+  }
+
+  test("Do not skip column family name when converting schema\"") {
+    val phoenixSchema = List(new ColumnInfo("columFamily.columnName", 
PVarchar.INSTANCE.getSqlType))
+
+    val catalystSchema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)
+
+    val expected = new StructType(List(StructField("columFamily.columnName", 
StringType, nullable = true)).toArray)
+
+    catalystSchema shouldEqual expected
+  }
+
+  test("Can read data and map column to columnName") {
+    val df = spark.read.format("phoenix")
+      .options(
+        Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
+          "doNotMapColumnFamily" -> "true",
+          PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+    val schema = df.schema
+
+    val expected = new StructType(List(
+      StructField("ID", LongType, nullable = true),
+      StructField("COL1", StringType, nullable = true)
+    ).toArray)
+
+    schema shouldEqual expected
+  }
+
+  test("Can read data and map column to colFamily.columnName") {
+    val df = spark.read.format("phoenix")
+      .options(
+        Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
+          "doNotMapColumnFamily" -> "false",
+          PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+    val schema = df.schema
+
+    val expected = new StructType(List(
+      StructField("ID", LongType, nullable = true),
+      StructField("DATA.COL1", StringType, nullable = true)
+    ).toArray)
+
+    schema shouldEqual expected
+  }
+
 }
diff --git 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 84d83f3..22e0cfb 100644
--- 
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ 
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -68,6 +68,7 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
     private final String jdbcUrl;
     private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private final boolean doNotMapColumnFamily;
 
     private StructType schema;
     private Filter[] pushedFilters = new Filter[]{};
@@ -83,6 +84,7 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
         this.tableName = options.tableName().get();
         this.jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options);
         this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.doNotMapColumnFamily = options.getBoolean("doNotMapColumnFamily", 
false);
         this.overriddenProps = 
PhoenixDataSource.extractPhoenixHBaseConfFromOptions(options);
         setSchema();
     }
@@ -94,7 +96,7 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
         try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
             Seq<ColumnInfo> columnInfoSeq = 
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+            schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, 
doNotMapColumnFamily);
         }
         catch (SQLException e) {
             throw new RuntimeException(e);
diff --git 
a/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala 
b/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index 26461ac..19535bf 100644
--- 
a/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++ 
b/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -33,15 +33,16 @@ StructType, TimestampType}
 
 object SparkSchemaUtil {
 
-  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], 
dateAsTimestamp: Boolean = false) : StructType = {
+  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], 
dateAsTimestamp: Boolean = false, doNotMapColumnFamily: Boolean = false): 
StructType = {
     val structFields = columnList.map(ci => {
       val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
-      StructField(normalizeColumnName(ci.getColumnName), structType)
+      val columnName = normalizeColumnName(ci.getColumnName, 
doNotMapColumnFamily)
+      StructField(columnName, structType)
     })
     new StructType(structFields.toArray)
   }
 
-  def normalizeColumnName(columnName: String) = {
+  private def normalizeColumnName(columnName: String, doNotMapColumnFamily: 
Boolean ) = {
     val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
     var normalizedColumnName = ""
     if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
@@ -50,14 +51,14 @@ object SparkSchemaUtil {
     else {
       // split by separator to get the column family and column name
       val tokens = 
unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
-      normalizedColumnName = if (tokens(0) == "0") tokens(1) else 
unescapedColumnName
+      normalizedColumnName = if (tokens(0) == "0" || doNotMapColumnFamily) 
tokens(1) else unescapedColumnName
     }
     normalizedColumnName
   }
 
 
   // Lookup table for Phoenix types to Spark catalyst types
-  def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: 
Boolean): DataType = columnInfo.getPDataType match {
+  private def phoenixTypeToCatalystType(columnInfo: ColumnInfo, 
dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
     case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
     case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => 
LongType
     case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => 
IntegerType
diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md
index ec7684a..8954e16 100644
--- a/phoenix5-spark3/README.md
+++ b/phoenix5-spark3/README.md
@@ -302,6 +302,16 @@ to executors as a comma-separated list against the key 
`phoenixConfigs` i.e (Pho
     Note that the same property values will be used for both the driver and 
all executors and
     these configurations are used each time a connection is made (both on the 
driver and executors).
 
+- As of [PHOENIX-7377](https://issues.apache.org/jira/browse/PHOENIX-7377), 
you can pass boolean parameter to avoid mapping
+  non default family columns to `columnFamily.columnName` by setting the key 
`doNotMapColumnFamily` to `true` (default value: `false`), for ex:
+    ```scala
+    df = spark
+      .sqlContext
+      .read
+      .format("phoenix")
+      .options(Map("table" -> "Table1", "jdbcUrl" -> 
"jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true"))
+      .load;
+    ```
 ## Limitations
 
 - Basic support for column and predicate pushdown using the Data Source API
diff --git 
a/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
 
b/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
index 4860154..adfbe73 100644
--- 
a/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
+++ 
b/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
@@ -48,7 +48,7 @@ public class PhoenixTestingDataSource extends 
PhoenixDataSource {
         try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
             Seq<ColumnInfo> columnInfoSeq = 
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+            schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, 
false);
         }
         catch (SQLException e) {
             throw new RuntimeException(e);
diff --git a/phoenix5-spark3/src/it/resources/globalSetup.sql 
b/phoenix5-spark3/src/it/resources/globalSetup.sql
index 8a3a4c2..6082727 100644
--- a/phoenix5-spark3/src/it/resources/globalSetup.sql
+++ b/phoenix5-spark3/src/it/resources/globalSetup.sql
@@ -64,3 +64,6 @@ CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT 
NULL, ORGANIZATION_ID
 CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id 
UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id 
TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id 
UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id 
DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id 
BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id 
UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED [...]
 CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY 
KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id 
TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id 
UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id 
DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id 
BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id 
UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id U [...]
 UPSERT INTO GIGANTIC_TABLE 
VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This
 is random textA','a','a','a')
+
+CREATE TABLE table_with_col_family (id BIGINT NOT NULL PRIMARY KEY, data.col1 
VARCHAR)
+UPSERT INTO table_with_col_family (id, col1) VALUES (1, 'test_row_1')
\ No newline at end of file
diff --git 
a/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala 
b/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 85f590d..7a16b00 100644
--- a/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,7 +13,6 @@
  */
 package org.apache.phoenix.spark
 
-import org.apache.omid.tso.client.AbortException
 
 import java.sql.DriverManager
 import java.util.Date
@@ -916,4 +915,58 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     df.count() shouldEqual 1
   }
 
+  test("Skip column family name when converting schema") {
+    val phoenixSchema = List(new ColumnInfo("columFamily.columnName", 
PVarchar.INSTANCE.getSqlType))
+
+    val catalystSchema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema, 
doNotMapColumnFamily = true)
+
+    val expected = new StructType(List(StructField("columnName", StringType, 
nullable = true)).toArray)
+
+    catalystSchema shouldEqual expected
+  }
+
+  test("Do not skip column family name when converting schema\"") {
+    val phoenixSchema = List(new ColumnInfo("columFamily.columnName", 
PVarchar.INSTANCE.getSqlType))
+
+    val catalystSchema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)
+
+    val expected = new StructType(List(StructField("columFamily.columnName", 
StringType, nullable = true)).toArray)
+
+    catalystSchema shouldEqual expected
+  }
+
+  test("Can read data and map column to columnName") {
+    val df = spark.read.format("phoenix")
+      .options(
+        Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
+          "doNotMapColumnFamily" -> "true",
+          PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+    val schema = df.schema
+
+    val expected = new StructType(List(
+      StructField("ID", LongType, nullable = true),
+      StructField("COL1", StringType, nullable = true)
+    ).toArray)
+
+    schema shouldEqual expected
+  }
+
+  test("Can read data and map column to colFamily.columnName") {
+    val df = spark.read.format("phoenix")
+      .options(
+        Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
+          "doNotMapColumnFamily" -> "false",
+          PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+    val schema = df.schema
+
+    val expected = new StructType(List(
+      StructField("ID", LongType, nullable = true),
+    StructField("DATA.COL1", StringType, nullable = true)
+    ).toArray)
+
+    schema shouldEqual expected
+  }
+
 }
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
index c3c0ade..d2a7410 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
@@ -64,6 +64,7 @@ public class PhoenixDataSource implements TableProvider, 
DataSourceRegister {
         String tableName = options.get("table");
         String tenant = options.get(PhoenixRuntime.TENANT_ID_ATTRIB);
         boolean dateAsTimestamp = 
Boolean.parseBoolean(options.getOrDefault("dateAsTimestamp", 
Boolean.toString(false)));
+        boolean doNotMapColumnFamily = 
Boolean.parseBoolean(options.getOrDefault("doNotMapColumnFamily", 
Boolean.toString(false)));
         Properties overriddenProps = 
extractPhoenixHBaseConfFromOptions(options);
         if (tenant != null) {
             overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenant);
@@ -75,7 +76,7 @@ public class PhoenixDataSource implements TableProvider, 
DataSourceRegister {
         try (Connection conn = DriverManager.getConnection(jdbcUrl, 
overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
             Seq<ColumnInfo> columnInfoSeq = 
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+            schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp, 
doNotMapColumnFamily);
         }
         catch (SQLException e) {
             throw new RuntimeException(e);
diff --git 
a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala 
b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index 26461ac..19535bf 100644
--- 
a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++ 
b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -33,15 +33,16 @@ StructType, TimestampType}
 
 object SparkSchemaUtil {
 
-  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], 
dateAsTimestamp: Boolean = false) : StructType = {
+  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], 
dateAsTimestamp: Boolean = false, doNotMapColumnFamily: Boolean = false): 
StructType = {
     val structFields = columnList.map(ci => {
       val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
-      StructField(normalizeColumnName(ci.getColumnName), structType)
+      val columnName = normalizeColumnName(ci.getColumnName, 
doNotMapColumnFamily)
+      StructField(columnName, structType)
     })
     new StructType(structFields.toArray)
   }
 
-  def normalizeColumnName(columnName: String) = {
+  private def normalizeColumnName(columnName: String, doNotMapColumnFamily: 
Boolean ) = {
     val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
     var normalizedColumnName = ""
     if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
@@ -50,14 +51,14 @@ object SparkSchemaUtil {
     else {
       // split by separator to get the column family and column name
       val tokens = 
unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
-      normalizedColumnName = if (tokens(0) == "0") tokens(1) else 
unescapedColumnName
+      normalizedColumnName = if (tokens(0) == "0" || doNotMapColumnFamily) 
tokens(1) else unescapedColumnName
     }
     normalizedColumnName
   }
 
 
   // Lookup table for Phoenix types to Spark catalyst types
-  def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: 
Boolean): DataType = columnInfo.getPDataType match {
+  private def phoenixTypeToCatalystType(columnInfo: ColumnInfo, 
dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
     case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
     case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => 
LongType
     case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => 
IntegerType

Reply via email to