This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push: new 5abb5db PHOENIX-6667 Spark3 connector requires that all columns are specified when writing 5abb5db is described below commit 5abb5db59a7683f557528bd16e8c12f463e22f2a Author: attilapiros <piros.attila.zs...@gmail.com> AuthorDate: Wed Apr 5 16:32:36 2023 -0700 PHOENIX-6667 Spark3 connector requires that all columns are specified when writing also fixes PHOENIX-6668 Spark3 connector cannot distinguish column name cases Co-authored-by: Istvan Toth <st...@apache.org> --- .../org/apache/phoenix/spark/PhoenixSparkIT.scala | 2 +- .../org/apache/phoenix/spark/PhoenixSparkIT.scala | 46 +++------------------- .../phoenix/spark/sql/connector/PhoenixTable.java | 17 +++++--- 3 files changed, 18 insertions(+), 47 deletions(-) diff --git a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index ebfd41c..464a588 100644 --- a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -363,7 +363,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { stringValue shouldEqual "test_row_1" } - test("Can save to phoenix table") { + test("Can save to phoenix table from Spark without specifying all the columns") { val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) val schema = StructType( diff --git a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index 181b77d..85f590d 100644 --- a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -363,10 +363,10 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { stringValue shouldEqual "test_row_1" } - // This works with Spark2, but Spark3 enforces specifying every column - ignore("Can save to phoenix table from Spark2") { + test("Can save to phoenix table from Spark without specifying all the columns") { val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) + // COL3 is missing both from the schema and from the dataset val schema = StructType( Seq(StructField("ID", LongType, nullable = false), StructField("COL1", StringType), @@ -390,42 +390,10 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { while (rs.next()) { results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3))) } - } - - test("Can save to phoenix table from Spark3") { - //We must specify every column for writing for Spark3 - val dataSet = List(Row(1L, "1", 1, null), Row(2L, "2", 2, null), Row(3L, "3", 3, null)) - //But partial reads are OK - val dataSetWoCol3 = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) - - val schema = StructType( - Seq(StructField("ID", LongType, nullable = false), - StructField("COL1", StringType), - StructField("COL2", IntegerType), - StructField("COL3", DateType))) - - val rowRDD = spark.sparkContext.parallelize(dataSet) - - // Apply the schema to the RDD. - val df = spark.sqlContext.createDataFrame(rowRDD, schema) - - df.write - .format("phoenix") - .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)) - .mode(SaveMode.Append) - .save() - - // Load the results back - val stmt = conn.createStatement() - val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE") - val results = ListBuffer[Row]() - while (rs.next()) { - results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3))) - } // Verify they match (0 to results.size - 1).foreach { i => - dataSetWoCol3(i) shouldEqual results(i) + dataSet(i) shouldEqual results(i) } } @@ -435,15 +403,14 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { val records = new mutable.MutableList[Row] for (x <- 1 to totalRecords) { - records += Row(x.toLong, x.toString, x, null) + records += Row(x.toLong, x.toString, x) } val dataSet = records.toList val schema = StructType( Seq(StructField("ID", LongType, nullable = false), StructField("COL1", StringType), - StructField("COL2", IntegerType), - StructField("COL3", DateType))) + StructField("COL2", IntegerType))) // Distribute the dataset into an RDD with just 1 partition so we use only 1 executor. // This makes it easy to deterministically count the batched commits from that executor @@ -616,8 +583,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { count shouldEqual 1L } - //Spark3 doesn't seem to be able to handle case sensitive column names - ignore("Ensure DataFrame field normalization (PHOENIX-2196)") { + test("Ensure DataFrame field normalization (PHOENIX-2196)") { val rdd1 = spark.sparkContext .parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two"))) .map(p => Row(p._1, p._2, p._3)) diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java index b947771..fdd4217 100644 --- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java +++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/PhoenixTable.java @@ -17,9 +17,16 @@ */ package org.apache.phoenix.spark.sql.connector; -import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet; +import static org.apache.spark.sql.connector.catalog.TableCapability.ACCEPT_ANY_SCHEMA; +import static org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ; +import static org.apache.spark.sql.connector.catalog.TableCapability.BATCH_WRITE; + +import java.util.Map; +import java.util.Set; + import org.apache.phoenix.spark.sql.connector.reader.PhoenixScanBuilder; import org.apache.phoenix.spark.sql.connector.writer.PhoenixWriteBuilder; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.TableCapability; @@ -29,16 +36,14 @@ import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -import java.util.Map; -import java.util.Set; - public class PhoenixTable implements SupportsRead, SupportsWrite{ private final Map<String,String> options; private final String tableName; private final StructType schema; - private static final Set<TableCapability> capabilities = ImmutableSet.of(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE); + private static final Set<TableCapability> CAPABILITIES = + ImmutableSet.of(BATCH_READ, BATCH_WRITE, ACCEPT_ANY_SCHEMA); public PhoenixTable(StructType schema, Map<String,String> options) { this.options = options; @@ -63,7 +68,7 @@ public class PhoenixTable implements SupportsRead, SupportsWrite{ @Override public Set<TableCapability> capabilities() { - return capabilities; + return CAPABILITIES; } public Map<String, String> getOptions() {