rejeb commented on code in PR #139:
URL: 
https://github.com/apache/phoenix-connectors/pull/139#discussion_r1736647542


##########
phoenix5-spark/src/main/scala/org/apache/phoenix/spark/datasource/v2/PhoenixSparkSqlRelation.scala:
##########
@@ -0,0 +1,58 @@
+package org.apache.phoenix.spark.datasource.v2
+
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode, 
SparkSession}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, 
PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class PhoenixSparkSqlRelation(
+                                    @transient sparkSession: SparkSession,
+                                    params: Map[String, String]
+                                  ) extends BaseRelation with 
PrunedFilteredScan with InsertableRelation {
+
+  override def schema: StructType = dataSourceReader.readSchema()
+
+  override def sqlContext: SQLContext = sparkSession.sqlContext
+
+  private def dataSourceReader: PhoenixDataSourceReader = new 
PhoenixDataSourceReader(dataSourceOptions)
+
+  private def dataSourceOptions = new DataSourceOptions(params.asJava)
+
+  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    val requiredSchema =    StructType(requiredColumns.flatMap(c => 
schema.fields.find(_.name == c)))
+
+    val reader: PhoenixDataSourceReader = dataSourceReader
+    reader.pushFilters(filters)
+    reader.pruneColumns(requiredSchema)
+    val rdd = new DataSourceRDD(
+      sqlContext.sparkContext,
+      reader.planInputPartitions().asScala
+    )
+    rdd.map(ir => {
+      new GenericRowWithSchema(ir.toSeq(requiredSchema).toArray, 
requiredSchema)
+    })
+  }
+
+
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    data
+      .write
+      .format("phoenix")
+      .option(PhoenixDataSource.TABLE, params(PhoenixDataSource.TABLE))
+      .option(PhoenixDataSource.JDBC_URL, 
PhoenixDataSource.getJdbcUrlFromOptions(dataSourceOptions))

Review Comment:
   Yes, the PhoenixDataSource.getJdbcUrlFromOptions method handle passing 
jdbcUrl, zkUrl and also when none is specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to