This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new a1c298c PHOENIX-7377 Add option not to add CF to the Spark Column
name in Spark Connector (#134)
a1c298c is described below
commit a1c298c2dc35c2db9f0ebf8235891564e9b88d2b
Author: rejeb ben rejeb <[email protected]>
AuthorDate: Thu Aug 15 17:50:11 2024 +0200
PHOENIX-7377 Add option not to add CF to the Spark Column name in Spark
Connector (#134)
---
phoenix5-spark/README.md | 11 +++++
phoenix5-spark/src/it/resources/globalSetup.sql | 3 ++
.../org/apache/phoenix/spark/PhoenixSparkIT.scala | 54 +++++++++++++++++++++
.../v2/reader/PhoenixDataSourceReader.java | 4 +-
.../org/apache/phoenix/spark/SparkSchemaUtil.scala | 11 +++--
phoenix5-spark3/README.md | 10 ++++
.../sql/connector/PhoenixTestingDataSource.java | 2 +-
phoenix5-spark3/src/it/resources/globalSetup.sql | 3 ++
.../org/apache/phoenix/spark/PhoenixSparkIT.scala | 55 +++++++++++++++++++++-
.../spark/sql/connector/PhoenixDataSource.java | 3 +-
.../org/apache/phoenix/spark/SparkSchemaUtil.scala | 11 +++--
11 files changed, 153 insertions(+), 14 deletions(-)
diff --git a/phoenix5-spark/README.md b/phoenix5-spark/README.md
index 73d68c2..3542a19 100644
--- a/phoenix5-spark/README.md
+++ b/phoenix5-spark/README.md
@@ -293,6 +293,17 @@ to executors as a comma-separated list against the key
`phoenixConfigs` i.e (Pho
Note that the same property values will be used for both the driver and
all executors and
these configurations are used each time a connection is made (both on the
driver and executors).
+- As of [PHOENIX-7377](https://issues.apache.org/jira/browse/PHOENIX-7377),
you can pass boolean parameter to avoid mapping
+ non default family columns to `columnFamily.columnName` by setting the key
`doNotMapColumnFamily` to `true` (default value: `false`), for ex:
+ ```scala
+ df = spark
+ .sqlContext
+ .read
+ .format("phoenix")
+ .options(Map("table" -> "Table1", "jdbcUrl" ->
"jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true"))
+ .load;
+ ```
+
## Limitations
- Basic support for column and predicate pushdown using the Data Source API
diff --git a/phoenix5-spark/src/it/resources/globalSetup.sql
b/phoenix5-spark/src/it/resources/globalSetup.sql
index 8a3a4c2..aa5a81f 100644
--- a/phoenix5-spark/src/it/resources/globalSetup.sql
+++ b/phoenix5-spark/src/it/resources/globalSetup.sql
@@ -64,3 +64,6 @@ CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT
NULL, ORGANIZATION_ID
CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id
UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id
TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id
UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id
DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id
BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id
UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED [...]
CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY
KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id
TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id
UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id
DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id
BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id
UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id U [...]
UPSERT INTO GIGANTIC_TABLE
VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This
is random textA','a','a','a')
+
+CREATE TABLE table_with_col_family (id BIGINT NOT NULL PRIMARY KEY, data.col1
VARCHAR)
+UPSERT INTO table_with_col_family (id, col1) VALUES (1, 'test_row_1')
diff --git
a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 464a588..f1a09d6 100644
--- a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -913,4 +913,58 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
df.count() shouldEqual 1
}
+ test("Skip column family name when converting schema") {
+ val phoenixSchema = List(new ColumnInfo("columFamily.columnName",
PVarchar.INSTANCE.getSqlType))
+
+ val catalystSchema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema,
doNotMapColumnFamily = true)
+
+ val expected = new StructType(List(StructField("columnName", StringType,
nullable = true)).toArray)
+
+ catalystSchema shouldEqual expected
+ }
+
+ test("Do not skip column family name when converting schema\"") {
+ val phoenixSchema = List(new ColumnInfo("columFamily.columnName",
PVarchar.INSTANCE.getSqlType))
+
+ val catalystSchema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)
+
+ val expected = new StructType(List(StructField("columFamily.columnName",
StringType, nullable = true)).toArray)
+
+ catalystSchema shouldEqual expected
+ }
+
+ test("Can read data and map column to columnName") {
+ val df = spark.read.format("phoenix")
+ .options(
+ Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
+ "doNotMapColumnFamily" -> "true",
+ PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+ val schema = df.schema
+
+ val expected = new StructType(List(
+ StructField("ID", LongType, nullable = true),
+ StructField("COL1", StringType, nullable = true)
+ ).toArray)
+
+ schema shouldEqual expected
+ }
+
+ test("Can read data and map column to colFamily.columnName") {
+ val df = spark.read.format("phoenix")
+ .options(
+ Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
+ "doNotMapColumnFamily" -> "false",
+ PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+ val schema = df.schema
+
+ val expected = new StructType(List(
+ StructField("ID", LongType, nullable = true),
+ StructField("DATA.COL1", StringType, nullable = true)
+ ).toArray)
+
+ schema shouldEqual expected
+ }
+
}
diff --git
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 84d83f3..22e0cfb 100644
---
a/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++
b/phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -68,6 +68,7 @@ public class PhoenixDataSourceReader implements
DataSourceReader, SupportsPushDo
private final String jdbcUrl;
private final boolean dateAsTimestamp;
private final Properties overriddenProps;
+ private final boolean doNotMapColumnFamily;
private StructType schema;
private Filter[] pushedFilters = new Filter[]{};
@@ -83,6 +84,7 @@ public class PhoenixDataSourceReader implements
DataSourceReader, SupportsPushDo
this.tableName = options.tableName().get();
this.jdbcUrl = PhoenixDataSource.getJdbcUrlFromOptions(options);
this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+ this.doNotMapColumnFamily = options.getBoolean("doNotMapColumnFamily",
false);
this.overriddenProps =
PhoenixDataSource.extractPhoenixHBaseConfFromOptions(options);
setSchema();
}
@@ -94,7 +96,7 @@ public class PhoenixDataSourceReader implements
DataSourceReader, SupportsPushDo
try (Connection conn = DriverManager.getConnection(jdbcUrl,
overriddenProps)) {
List<ColumnInfo> columnInfos =
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
Seq<ColumnInfo> columnInfoSeq =
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
- schema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+ schema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp,
doNotMapColumnFamily);
}
catch (SQLException e) {
throw new RuntimeException(e);
diff --git
a/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
b/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index 26461ac..19535bf 100644
---
a/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++
b/phoenix5-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -33,15 +33,16 @@ StructType, TimestampType}
object SparkSchemaUtil {
- def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo],
dateAsTimestamp: Boolean = false) : StructType = {
+ def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo],
dateAsTimestamp: Boolean = false, doNotMapColumnFamily: Boolean = false):
StructType = {
val structFields = columnList.map(ci => {
val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
- StructField(normalizeColumnName(ci.getColumnName), structType)
+ val columnName = normalizeColumnName(ci.getColumnName,
doNotMapColumnFamily)
+ StructField(columnName, structType)
})
new StructType(structFields.toArray)
}
- def normalizeColumnName(columnName: String) = {
+ private def normalizeColumnName(columnName: String, doNotMapColumnFamily:
Boolean ) = {
val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
var normalizedColumnName = ""
if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
@@ -50,14 +51,14 @@ object SparkSchemaUtil {
else {
// split by separator to get the column family and column name
val tokens =
unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
- normalizedColumnName = if (tokens(0) == "0") tokens(1) else
unescapedColumnName
+ normalizedColumnName = if (tokens(0) == "0" || doNotMapColumnFamily)
tokens(1) else unescapedColumnName
}
normalizedColumnName
}
// Lookup table for Phoenix types to Spark catalyst types
- def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp:
Boolean): DataType = columnInfo.getPDataType match {
+ private def phoenixTypeToCatalystType(columnInfo: ColumnInfo,
dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] =>
LongType
case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] =>
IntegerType
diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md
index ec7684a..8954e16 100644
--- a/phoenix5-spark3/README.md
+++ b/phoenix5-spark3/README.md
@@ -302,6 +302,16 @@ to executors as a comma-separated list against the key
`phoenixConfigs` i.e (Pho
Note that the same property values will be used for both the driver and
all executors and
these configurations are used each time a connection is made (both on the
driver and executors).
+- As of [PHOENIX-7377](https://issues.apache.org/jira/browse/PHOENIX-7377),
you can pass boolean parameter to avoid mapping
+ non default family columns to `columnFamily.columnName` by setting the key
`doNotMapColumnFamily` to `true` (default value: `false`), for ex:
+ ```scala
+ df = spark
+ .sqlContext
+ .read
+ .format("phoenix")
+ .options(Map("table" -> "Table1", "jdbcUrl" ->
"jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true"))
+ .load;
+ ```
## Limitations
- Basic support for column and predicate pushdown using the Data Source API
diff --git
a/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
b/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
index 4860154..adfbe73 100644
---
a/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
+++
b/phoenix5-spark3/src/it/java/org/apache/phoenix/spark/sql/connector/PhoenixTestingDataSource.java
@@ -48,7 +48,7 @@ public class PhoenixTestingDataSource extends
PhoenixDataSource {
try (Connection conn = DriverManager.getConnection(jdbcUrl,
overriddenProps)) {
List<ColumnInfo> columnInfos =
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
Seq<ColumnInfo> columnInfoSeq =
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
- schema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+ schema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp,
false);
}
catch (SQLException e) {
throw new RuntimeException(e);
diff --git a/phoenix5-spark3/src/it/resources/globalSetup.sql
b/phoenix5-spark3/src/it/resources/globalSetup.sql
index 8a3a4c2..6082727 100644
--- a/phoenix5-spark3/src/it/resources/globalSetup.sql
+++ b/phoenix5-spark3/src/it/resources/globalSetup.sql
@@ -64,3 +64,6 @@ CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT
NULL, ORGANIZATION_ID
CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id
UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id
TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id
UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id
DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id
BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id
UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED [...]
CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY
KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id
TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id
UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id
DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id
BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id
UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id U [...]
UPSERT INTO GIGANTIC_TABLE
VALUES(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This
is random textA','a','a','a')
+
+CREATE TABLE table_with_col_family (id BIGINT NOT NULL PRIMARY KEY, data.col1
VARCHAR)
+UPSERT INTO table_with_col_family (id, col1) VALUES (1, 'test_row_1')
\ No newline at end of file
diff --git
a/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
b/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 85f590d..7a16b00 100644
--- a/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix5-spark3/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,7 +13,6 @@
*/
package org.apache.phoenix.spark
-import org.apache.omid.tso.client.AbortException
import java.sql.DriverManager
import java.util.Date
@@ -916,4 +915,58 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
df.count() shouldEqual 1
}
+ test("Skip column family name when converting schema") {
+ val phoenixSchema = List(new ColumnInfo("columFamily.columnName",
PVarchar.INSTANCE.getSqlType))
+
+ val catalystSchema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema,
doNotMapColumnFamily = true)
+
+ val expected = new StructType(List(StructField("columnName", StringType,
nullable = true)).toArray)
+
+ catalystSchema shouldEqual expected
+ }
+
+ test("Do not skip column family name when converting schema\"") {
+ val phoenixSchema = List(new ColumnInfo("columFamily.columnName",
PVarchar.INSTANCE.getSqlType))
+
+ val catalystSchema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)
+
+ val expected = new StructType(List(StructField("columFamily.columnName",
StringType, nullable = true)).toArray)
+
+ catalystSchema shouldEqual expected
+ }
+
+ test("Can read data and map column to columnName") {
+ val df = spark.read.format("phoenix")
+ .options(
+ Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
+ "doNotMapColumnFamily" -> "true",
+ PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+ val schema = df.schema
+
+ val expected = new StructType(List(
+ StructField("ID", LongType, nullable = true),
+ StructField("COL1", StringType, nullable = true)
+ ).toArray)
+
+ schema shouldEqual expected
+ }
+
+ test("Can read data and map column to colFamily.columnName") {
+ val df = spark.read.format("phoenix")
+ .options(
+ Map("table" -> SchemaUtil.getEscapedArgument("TABLE_WITH_COL_FAMILY"),
+ "doNotMapColumnFamily" -> "false",
+ PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+ val schema = df.schema
+
+ val expected = new StructType(List(
+ StructField("ID", LongType, nullable = true),
+ StructField("DATA.COL1", StringType, nullable = true)
+ ).toArray)
+
+ schema shouldEqual expected
+ }
+
}
diff --git
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
index c3c0ade..d2a7410 100644
---
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
+++
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixDataSource.java
@@ -64,6 +64,7 @@ public class PhoenixDataSource implements TableProvider,
DataSourceRegister {
String tableName = options.get("table");
String tenant = options.get(PhoenixRuntime.TENANT_ID_ATTRIB);
boolean dateAsTimestamp =
Boolean.parseBoolean(options.getOrDefault("dateAsTimestamp",
Boolean.toString(false)));
+ boolean doNotMapColumnFamily =
Boolean.parseBoolean(options.getOrDefault("doNotMapColumnFamily",
Boolean.toString(false)));
Properties overriddenProps =
extractPhoenixHBaseConfFromOptions(options);
if (tenant != null) {
overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenant);
@@ -75,7 +76,7 @@ public class PhoenixDataSource implements TableProvider,
DataSourceRegister {
try (Connection conn = DriverManager.getConnection(jdbcUrl,
overriddenProps)) {
List<ColumnInfo> columnInfos =
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
Seq<ColumnInfo> columnInfoSeq =
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
- schema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+ schema =
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp,
doNotMapColumnFamily);
}
catch (SQLException e) {
throw new RuntimeException(e);
diff --git
a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index 26461ac..19535bf 100644
---
a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++
b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -33,15 +33,16 @@ StructType, TimestampType}
object SparkSchemaUtil {
- def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo],
dateAsTimestamp: Boolean = false) : StructType = {
+ def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo],
dateAsTimestamp: Boolean = false, doNotMapColumnFamily: Boolean = false):
StructType = {
val structFields = columnList.map(ci => {
val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
- StructField(normalizeColumnName(ci.getColumnName), structType)
+ val columnName = normalizeColumnName(ci.getColumnName,
doNotMapColumnFamily)
+ StructField(columnName, structType)
})
new StructType(structFields.toArray)
}
- def normalizeColumnName(columnName: String) = {
+ private def normalizeColumnName(columnName: String, doNotMapColumnFamily:
Boolean ) = {
val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
var normalizedColumnName = ""
if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
@@ -50,14 +51,14 @@ object SparkSchemaUtil {
else {
// split by separator to get the column family and column name
val tokens =
unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
- normalizedColumnName = if (tokens(0) == "0") tokens(1) else
unescapedColumnName
+ normalizedColumnName = if (tokens(0) == "0" || doNotMapColumnFamily)
tokens(1) else unescapedColumnName
}
normalizedColumnName
}
// Lookup table for Phoenix types to Spark catalyst types
- def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp:
Boolean): DataType = columnInfo.getPDataType match {
+ private def phoenixTypeToCatalystType(columnInfo: ColumnInfo,
dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] =>
LongType
case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] =>
IntegerType