stoty commented on code in PR #139: URL: https://github.com/apache/phoenix-connectors/pull/139#discussion_r1751341141
########## phoenix5-spark/README.md: ########## @@ -45,12 +57,14 @@ val spark = SparkSession val df = spark.sqlContext .read .format("phoenix") - .options(Map("table" -> "TABLE1")) + .option("table", "TABLE1") Review Comment: Nit: Is the is the preferred way to add options in spark ? Are you supposed to call the method multiple times if you want to add multiple options ? ########## phoenix5-spark/README.md: ########## @@ -118,15 +181,16 @@ val spark = SparkSession val df = spark.sqlContext .read .format("phoenix") - .options(Map("table" -> "INPUT_TABLE")) + .option("table", "INPUT_TABLE") .load // Save to OUTPUT_TABLE df.write .format("phoenix") - .mode(SaveMode.Overwrite) - .options(Map("table" -> "OUTPUT_TABLE")) + .mode(SaveMode.Append) Review Comment: Even if both work now, let's not change this in the example. You may want to add a comment to indicate that either works now. ########## phoenix5-spark/README.md: ########## @@ -300,7 +363,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho .sqlContext .read .format("phoenix") - .options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true")) + .options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:zkHost:zkport", "doNotMapColumnFamily" -> "true")) Review Comment: better remove the explicit URL mention from these examples too. ########## phoenix5-spark/README.md: ########## @@ -270,13 +333,13 @@ it falls back to using connection defined by hbase-site.xml. - `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or "jdbc:phoenix:zkHost:zkport", while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort" - If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` -instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. -The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` parameter, -only `"zkUrl"` + instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. Review Comment: This was probably my mistake, but there has never been a connectors-1.0.0 release, and there will never be. The next release will be 6.0.0. I'd suggest simply not mentioning the version, ########## phoenix5-spark/src/main/scala/org/apache/phoenix/spark/datasource/v2/PhoenixSparkSqlRelation.scala: ########## @@ -0,0 +1,58 @@ +package org.apache.phoenix.spark.datasource.v2 + +import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class PhoenixSparkSqlRelation( + @transient sparkSession: SparkSession, + params: Map[String, String] + ) extends BaseRelation with PrunedFilteredScan with InsertableRelation { + + override def schema: StructType = dataSourceReader.readSchema() + + override def sqlContext: SQLContext = sparkSession.sqlContext + + private def dataSourceReader: PhoenixDataSourceReader = new PhoenixDataSourceReader(dataSourceOptions) + + private def dataSourceOptions = new DataSourceOptions(params.asJava) + + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + val requiredSchema = StructType(requiredColumns.flatMap(c => schema.fields.find(_.name == c))) + + val reader: PhoenixDataSourceReader = dataSourceReader + reader.pushFilters(filters) + reader.pruneColumns(requiredSchema) + val rdd = new DataSourceRDD( + sqlContext.sparkContext, + reader.planInputPartitions().asScala + ) + rdd.map(ir => { + new GenericRowWithSchema(ir.toSeq(requiredSchema).toArray, requiredSchema) + }) + } + + + override def insert(data: DataFrame, overwrite: Boolean): Unit = { + data + .write + .format("phoenix") + .option(PhoenixDataSource.TABLE, params(PhoenixDataSource.TABLE)) + .option(PhoenixDataSource.JDBC_URL, PhoenixDataSource.getJdbcUrlFromOptions(dataSourceOptions)) Review Comment: thanks. ########## phoenix5-spark/README.md: ########## @@ -249,14 +312,14 @@ public class PhoenixSparkWriteFromRDDWithSchema { } // Create a DataFrame from the rows and the specified schema - df = spark.createDataFrame(rows, schema); + Dataset<Row> df = spark.createDataFrame(rows, schema); df.write() .format("phoenix") .mode(SaveMode.Overwrite) .option("table", "OUTPUT_TABLE") .save(); - - jsc.stop(); + Review Comment: Nit: I can't see in this editor. Is this extra whitespace ? If yes, then please delete ########## phoenix5-spark/README.md: ########## @@ -18,6 +18,18 @@ limitations under the License. phoenix-spark extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames, and enables persisting DataFrames back to Phoenix. +## Configuration properties + +| Name | Default | Usage | Description | +| table | empty | R/W | table name as `namespace.table_name` | +| zrUrl | empty | R/W | (Optional) List of zookeeper hosts. Deprecated, use `jdbcUrl` instead. Recommended not to set, value will be taken from hbase-site.xml | Review Comment: typo, zkUrl. ########## phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala: ########## @@ -66,29 +66,6 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { results.toList shouldEqual checkResults } Review Comment: I suggest leaving the disabled insert test in. Hopefully it will inspire someone to implement the feature. ########## phoenix5-spark/README.md: ########## @@ -337,13 +401,13 @@ df.show ### Load as an RDD, using a Zookeeper URL ```scala +import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ import org.apache.spark.rdd.RDD -val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false") -val sc = new SparkContext("local", "phoenix-test", sparkConf) +val sc = new SparkContext("local", "phoenix-test") Review Comment: Nit: You removed the showConsoleProgress setting here, but kept it below. TBH I don't know what that does, but shouldn't it be consistent ? ########## phoenix5-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkDatasourceV1IT.scala: ########## @@ -0,0 +1,807 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.phoenix.spark + +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil +import org.apache.phoenix.query.QueryServices +import org.apache.phoenix.schema.types.{PSmallintArray, PVarchar} +import org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader +import org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter +import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, PhoenixTestingDataSource, SparkSchemaUtil} +import org.apache.phoenix.util.{ColumnInfo, SchemaUtil} +import org.apache.spark.SparkException +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SaveMode} + +import java.sql.DriverManager +import java.util.Date +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** + * Note: If running directly from an IDE, these are the recommended VM parameters: + * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + */ +class PhoenixSparkDatasourceV1IT extends AbstractPhoenixSparkIT { Review Comment: thx ########## phoenix5-spark3/README.md: ########## @@ -28,6 +28,18 @@ Apart from the shaded connector JAR, you also need to add the hbase mapredcp lib (add the exact paths as appropiate to your system) Both the `spark.driver.extraClassPath` and `spark.executor.extraClassPath` properties need to be set the above classpath. You may add them spark-defaults.conf, or specify them on the spark-shell or spark-submit command line. +## Configuration properties Review Comment: thanks ########## phoenix5-spark3/README.md: ########## @@ -28,6 +28,18 @@ Apart from the shaded connector JAR, you also need to add the hbase mapredcp lib (add the exact paths as appropiate to your system) Both the `spark.driver.extraClassPath` and `spark.executor.extraClassPath` properties need to be set the above classpath. You may add them spark-defaults.conf, or specify them on the spark-shell or spark-submit command line. +## Configuration properties + +| Name | Default | Usage | Description | +| table | empty | R/W | table name as `namespace.table_name` | +| zrUrl | empty | R/W | (Optional) List of zookeeper hosts. Deprecated, use `jdbcUrl` instead. Recommended not to set, value will be taken from hbase-site.xml | Review Comment: typo here too. I will not repeat the comments for the rest of the duplicated files. ########## phoenix5-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java: ########## @@ -207,6 +208,27 @@ public Filter[] pushedFilters() { @Override public void pruneColumns(StructType schema) { - this.schema = schema; + if (schema.fields() != null && schema.fields().length != 0) + this.schema = schema; + } + + //TODO Method PhoenixRuntime.generateColumnInfo skip only salt column, add skip tenant_id column. + private List<ColumnInfo> generateColumnInfo(Connection conn, String tableName) throws SQLException { Review Comment: thanks. ########## phoenix5-spark/README.md: ########## @@ -249,14 +307,13 @@ public class PhoenixSparkWriteFromRDDWithSchema { } // Create a DataFrame from the rows and the specified schema - df = spark.createDataFrame(rows, schema); + Dataset<Row> df = spark.createDataFrame(rows, schema); df.write() .format("phoenix") .mode(SaveMode.Overwrite) .option("table", "OUTPUT_TABLE") + .option("jdbcUrl", "jdbc:phoenix:zkHost:zkport") .save(); - Review Comment: thx ########## phoenix5-spark/README.md: ########## @@ -118,15 +181,16 @@ val spark = SparkSession val df = spark.sqlContext .read .format("phoenix") - .options(Map("table" -> "INPUT_TABLE")) + .option("table", "INPUT_TABLE") .load // Save to OUTPUT_TABLE df.write .format("phoenix") - .mode(SaveMode.Overwrite) - .options(Map("table" -> "OUTPUT_TABLE")) + .mode(SaveMode.Append) Review Comment: Also use the other examples still use Overwrite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org