fx19880617 commented on a change in pull request #5787:
URL: https://github.com/apache/incubator-pinot/pull/5787#discussion_r464071943



##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
##########
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.net.{URI, URLEncoder}
+import java.util.regex.Pattern
+
+import org.apache.pinot.connector.spark.decodeTo
+import org.apache.pinot.connector.spark.exceptions.{HttpStatusCodeException, 
PinotException}
+import org.apache.pinot.connector.spark.utils.{HttpUtils, Logging}
+
+import scala.util.{Failure, Success, Try}
+import io.circe.generic.auto._
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
+import org.apache.pinot.spi.data.Schema
+
+/**
+ * Client that read/write/prepare required data from/to Pinot.
+ */
+private[pinot] object PinotClusterClient extends Logging {
+
+  def getTableSchema(controllerUrl: String, tableName: String): Schema = {
+    val rawTableName = PinotUtils.getRawTableName(tableName)
+    Try {
+      val uri = new URI(s"http://$controllerUrl/tables/$rawTableName/schema";)
+      val response = HttpUtils.sendGetRequest(uri)
+      Schema.fromString(response)
+    } match {
+      case Success(response) =>
+        logDebug(s"Pinot schema received successfully for table 
'$rawTableName'")
+        response
+      case Failure(exception) =>
+        throw PinotException(
+          s"An error occurred while getting Pinot schema for table 
'$rawTableName'",
+          exception
+        )
+    }
+  }
+
+  /**
+   * Get available broker urls(host:port) for given table.
+   * This method is used when if broker instances not defined in the 
datasource options.
+   */
+  def getBrokerInstances(controllerUrl: String, tableName: String): 
List[String] = {
+    val brokerPattern = Pattern.compile("Broker_(.*)_(\\d+)")
+    val rawTableName = PinotUtils.getRawTableName(tableName)
+    Try {
+      val uri = new 
URI(s"http://$controllerUrl/tables/$rawTableName/instances";)
+      val response = HttpUtils.sendGetRequest(uri)
+      val brokerUrls = decodeTo[PinotInstances](response).brokers
+        .flatMap(_.instances)
+        .distinct
+        .map(brokerPattern.matcher)
+        .filter(matcher => matcher.matches() && matcher.groupCount() == 2)
+        .map { matcher =>
+          val host = matcher.group(1)
+          val port = matcher.group(2)
+          s"$host:$port"
+        }
+
+      if (brokerUrls.isEmpty) {
+        throw new IllegalStateException(s"Not found broker instance for table 
'$rawTableName'")
+      }
+
+      brokerUrls
+    } match {
+      case Success(result) =>
+        logDebug(s"Broker instances received successfully for table 
'$tableName'")
+        result
+      case Failure(exception) =>
+        throw PinotException(
+          s"An error occurred while getting broker instances for table 
'$rawTableName'",
+          exception
+        )
+    }
+  }
+
+  /**
+   * Get time boundary info of specified table.
+   * This method is used when table is hybrid to ensure that the overlap
+   * between realtime and offline segment data is queried exactly once.
+   *
+   * @return time boundary info if table exist and segments push type is 
'append' or None otherwise
+   */
+  def getTimeBoundaryInfo(brokerUrl: String, tableName: String): 
Option[TimeBoundaryInfo] = {
+    val rawTableName = PinotUtils.getRawTableName(tableName)
+    Try {
+      // pinot converts the given table name to the offline table name 
automatically
+      val uri = new URI(s"http://$brokerUrl/debug/timeBoundary/$rawTableName";)

Review comment:
       Suggest to make all those URI patterns to be constant.

##########
File path: config/.scalafmt.conf
##########
@@ -0,0 +1,17 @@
+version = "2.4.0"

Review comment:
       What's this file for?

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+    partitionId: Int,
+    pinotSplit: PinotSplit,
+    dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()
+  private val brokerId = "apache_spark"

Review comment:
       possible to get more granular info about the executor?

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.{List => JList, Map => JMap}
+
+import com.yammer.metrics.core.MetricsRegistry
+import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.request.BrokerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, 
ServerInstance}
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler
+
+import scala.collection.JavaConverters._
+
+/**
+ * Fetch data from specified Pinot server.
+ */
+private[pinot] class PinotServerDataFetcher(
+    partitionId: Int,
+    pinotSplit: PinotSplit,
+    dataSourceOptions: PinotDataSourceReadOptions)
+  extends Logging {
+  private val sqlCompiler = new CalciteSqlCompiler()

Review comment:
       Since we are using CalciteSqlCompiler here, I assume we are always using 
new SQL endpoint, could you change all the `PQL` references  to `SQL`? 

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/query/SQLSelectionQueryGenerator.scala
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector.query
+
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+import org.apache.pinot.connector.spark.connector.{PinotUtils, 
TimeBoundaryInfo}
+
+/**
+ * Generate realtime and offline SQL queries for specified table with given 
columns and filters.
+ */
+private[pinot] class SQLSelectionQueryGenerator(
+    tableNameWithType: String,
+    timeBoundaryInfo: Option[TimeBoundaryInfo],
+    columns: Array[String],
+    whereClause: Option[String]) {
+  private val columnsExpression = columnsAsExpression()
+  private val rawTableName = PinotUtils.getRawTableName(tableNameWithType)
+  private val tableType = PinotUtils.getTableType(tableNameWithType)
+
+  def generatePQLs(): GeneratedSQLs = {

Review comment:
       `generateSQLs`?

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/query/SQLSelectionQueryGenerator.scala
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector.query
+
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+import org.apache.pinot.connector.spark.connector.{PinotUtils, 
TimeBoundaryInfo}
+
+/**
+ * Generate realtime and offline SQL queries for specified table with given 
columns and filters.
+ */
+private[pinot] class SQLSelectionQueryGenerator(
+    tableNameWithType: String,
+    timeBoundaryInfo: Option[TimeBoundaryInfo],
+    columns: Array[String],
+    whereClause: Option[String]) {
+  private val columnsExpression = columnsAsExpression()
+  private val rawTableName = PinotUtils.getRawTableName(tableNameWithType)
+  private val tableType = PinotUtils.getTableType(tableNameWithType)
+
+  def generatePQLs(): GeneratedSQLs = {
+    val offlineSelectQuery = buildSelectQuery(PinotTableTypes.OFFLINE)
+    val realtimeSelectQuery = buildSelectQuery(PinotTableTypes.REALTIME)
+    GeneratedSQLs(
+      rawTableName,
+      tableType,
+      offlineSelectQuery,
+      realtimeSelectQuery
+    )
+  }
+
+  /**
+   * Get all columns if selecting columns empty(eg: resultDataFrame.count())
+   */
+  private def columnsAsExpression(): String = {
+    if (columns.isEmpty) "*" else columns.mkString(",")
+  }
+
+  /**
+   * Build realtime or offline PQL selection query.
+   */
+  private def buildSelectQuery(tableType: PinotTableType): String = {
+    val tableNameWithType = s"${rawTableName}_$tableType"
+    val queryBuilder = new StringBuilder(s"SELECT $columnsExpression FROM 
$tableNameWithType")
+
+    // add where clause if exists
+    whereClause.foreach { x =>
+      queryBuilder.append(s" WHERE $x")
+    }
+
+    // add time boundary filter if exists
+    timeBoundaryInfo.foreach { tbi =>
+      val timeBoundaryFilter =
+        if (tableType == PinotTableTypes.OFFLINE) {
+          tbi.getOfflinePredicate
+        } else {
+          tbi.getRealtimePredicate
+        }
+
+      if (whereClause.isEmpty) {
+        queryBuilder.append(s" WHERE $timeBoundaryFilter")
+      } else {
+        queryBuilder.append(s" AND $timeBoundaryFilter")
+      }
+    }
+
+    // query will be converted to Pinot 'BrokerRequest' with PQL compiler
+    // pinot set limit to 10 automatically
+    // to prevent this add limit to query
+    queryBuilder.append(s" LIMIT ${Int.MaxValue}")

Review comment:
       shall we check if there is existing `LIMIT`?

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
##########
@@ -0,0 +1 @@
+org.apache.pinot.connector.spark.datasource.PinotDataSourceV2

Review comment:
       new line

##########
File path: pinot-connectors/pinot-spark-connector/documentation/read_model.md
##########
@@ -0,0 +1,145 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# Read Model
+
+Connector can scan offline, hybrid and realtime tables. `table` parameter have 
to given like below;

Review comment:
       Worth to mention that if the table is OFFLINE or REALTIME only table, 
user can also use `tbl` to query. 

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/Constants.scala
##########
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+private[pinot] object Constants {
+  type PinotTableType = String
+
+  object PinotTableTypes {

Review comment:
       `pinot-spi` has already defined enum: 
`org.apache.pinot.spi.config.table.TableType`
   Also you may want to check some utils for table name: 
`org.apache.pinot.spi.utils.builder.TableNameBuilder`

##########
File path: pinot-connectors/pinot-spark-connector/documentation/read_model.md
##########
@@ -0,0 +1,145 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# Read Model
+
+Connector can scan offline, hybrid and realtime tables. `table` parameter have 
to given like below;
+- For offline table `tbl_OFFLINE`
+- For realtime table `tbl_REALTIME`
+- For hybrid table `tbl`
+
+An example scan;
+
+```scala
+val df = spark.read
+      .format("pinot")
+      .option("table", "airlineStats")
+      .load()
+```
+
+Custom schema can be specified directly. If schema is not specified, connector 
read table schema from Pinot controller, and then convert to the Spark schema. 
+
+
+### Architecture
+
+Connector reads data from `Pinot Servers` directly. For this operation, 
firstly, connector creates query with given filters(if filter push down is 
enabled) and columns, then finds routing table for created query. It creates 
pinot splits that contains **ONE PINOT SERVER and ONE OR MORE SEGMENT per spark 
partition**, based on the routing table and `segmentsPerSplit`(detailed explain 
is defined below). Lastly, each partition read data from specified pinot server 
in parallel.
+
+![Spark-Pinot Connector 
Architecture](images/spark-pinot-connector-executor-server-interaction.jpg)
+
+
+Each Spark partition open connection with Pinot server, and read data. For 
example, assume that routing table informations for specified query is like 
that:
+
+```
+- realtime ->
+   - realtimeServer1 -> (segment1, segment2, segment3)
+   - realtimeServer2 -> (segment4)
+- offline ->
+   - offlineServer10 -> (segment10, segment20)
+```
+
+If `segmentsPerSplit` is equal to 3, there will be created 3 Spark partition 
like below;
+
+| Spark Partition  | Queried Pinot Server/Segments |
+| ------------- | ------------- |
+| partition1  | realtimeServer1 / segment1, segment2, segment3  |
+| partition2  | realtimeServer2 / segment4  |
+| partition3  | offlineServer10 / segment10, segment20 |
+
+
+If `segmentsPerSplit` is equal to 1, there will be created 6 Spark partition;
+
+| Spark Partition  | Queried Pinot Server/Segments |
+| ------------- | ------------- |
+| partition1  | realtimeServer1 / segment1 |
+| partition2  | realtimeServer1 / segment2  |
+| partition3  | realtimeServer1 / segment3 |
+| partition4  | realtimeServer2 / segment4 |
+| partition5  | offlineServer10 / segment10 |
+| partition6  | offlineServer10 / segment20 |
+
+
+If `segmentsPerSplit` value is too low, that means more parallelism. But this 
also mean that a lot of connection will be opened with Pinot servers, and will 
increase QPS on the Pinot servers. 
+
+If `segmetnsPerSplit` value is too high, that means less parallelism. Each 
Pinot server will scan more segments per request.  
+
+**Note:** Pinot servers prunes segments based on the segment metadata when 
query comes. In some cases(for example filtering based on the some columns), 
some servers may not return data. Therefore, some Spark partitions will be 
empty. In this cases, `repartition()` may be applied for efficient data 
analysis after loading data to Spark.
+
+
+### Filter And Column Push Down
+Connector supports filter and column push down. Filters and columns are pushed 
to the pinot servers. Filter and column push down improves the performance 
while reading data because of its minimizing data transfer between Pinot and 
Spark. In default, filter push down enabled. If filters are desired to be 
applied in Spark, `usePushDownFilters` should be set as `false`.
+
+Connector supports `Equal, In, LessThan, LessThanOrEqual, Greater, 
GreaterThan, Not, TEXT_MATCH, And, Or` filters for now.

Review comment:
       For filter push down do we support just column with filters like 
(`columnA > 5`) or we can support more like transform function on columns like 
(`columnA > columnB`, `columnA * 10 > columnB`) 

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
##########
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.net.{URI, URLEncoder}
+import java.util.regex.Pattern
+
+import org.apache.pinot.connector.spark.decodeTo
+import org.apache.pinot.connector.spark.exceptions.{HttpStatusCodeException, 
PinotException}
+import org.apache.pinot.connector.spark.utils.{HttpUtils, Logging}
+
+import scala.util.{Failure, Success, Try}
+import io.circe.generic.auto._
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
+import org.apache.pinot.spi.data.Schema
+
+/**
+ * Client that read/write/prepare required data from/to Pinot.
+ */
+private[pinot] object PinotClusterClient extends Logging {
+
+  def getTableSchema(controllerUrl: String, tableName: String): Schema = {
+    val rawTableName = PinotUtils.getRawTableName(tableName)
+    Try {
+      val uri = new URI(s"http://$controllerUrl/tables/$rawTableName/schema";)
+      val response = HttpUtils.sendGetRequest(uri)
+      Schema.fromString(response)

Review comment:
       If you can get schema name from the table config, then you can use 
method `getSchema(host, port, schema)` in 
`org.apache.pinot.common.utils.SchemaUtils`

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotUtils.scala
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.spi.data.{FieldSpec, Schema}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+
+private[pinot] object PinotUtils {
+  private val OFFLINE_TABLE_SUFFIX = s"_${PinotTableTypes.OFFLINE}"
+  private val REALTIME_TABLE_SUFFIX = s"_${PinotTableTypes.REALTIME}"
+
+  /** Extract raw pinot table name. */
+  def getRawTableName(tableName: String): String = {
+    if (tableName.endsWith(OFFLINE_TABLE_SUFFIX)) {
+      tableName.substring(0, tableName.length - OFFLINE_TABLE_SUFFIX.length)
+    } else if (tableName.endsWith(REALTIME_TABLE_SUFFIX)) {
+      tableName.substring(0, tableName.length - REALTIME_TABLE_SUFFIX.length)
+    } else {
+      tableName
+    }
+  }
+
+  /** Return offline/realtime table type, or None if table is hybrid. */
+  def getTableType(tableName: String): Option[PinotTableType] = {
+    if (tableName.endsWith(OFFLINE_TABLE_SUFFIX)) {
+      Some(PinotTableTypes.OFFLINE)
+    } else if (tableName.endsWith(REALTIME_TABLE_SUFFIX)) {
+      Some(PinotTableTypes.REALTIME)
+    } else {
+      None
+    }
+  }
+
+  /** Convert a Pinot schema to Spark schema. */
+  def pinotSchemaToSparkSchema(schema: Schema): StructType = {
+    val structFields = schema.getAllFieldSpecs.asScala.map { field =>
+      val sparkDataType = pinotDataTypeToSparkDataType(field.getDataType)
+      if (field.isSingleValueField) {
+        StructField(field.getName, sparkDataType)
+      } else {
+        StructField(field.getName, ArrayType(sparkDataType))
+      }
+    }
+    StructType(structFields.toList)
+  }
+
+  private def pinotDataTypeToSparkDataType(dataType: FieldSpec.DataType): 
DataType =
+    dataType match {
+      case FieldSpec.DataType.INT => IntegerType
+      case FieldSpec.DataType.LONG => LongType
+      case FieldSpec.DataType.FLOAT => FloatType
+      case FieldSpec.DataType.DOUBLE => DoubleType
+      case FieldSpec.DataType.STRING => StringType
+      case _ =>
+        throw PinotException(s"Unsupported pinot data type '$dataType")
+    }
+
+  /** Convert Pinot DataTable to Seq of InternalRow */
+  def pinotDataTableToInternalRows(
+      dataTable: DataTable,
+      sparkSchema: StructType): Seq[InternalRow] = {
+    val dataTableColumnNames = dataTable.getDataSchema.getColumnNames
+    (0 until dataTable.getNumberOfRows).map { rowIndex =>
+      // spark schema is used to ensure columns order
+      val columns = sparkSchema.fields.map { field =>
+        val colIndex = dataTableColumnNames.indexOf(field.name)
+        if (colIndex < 0) {
+          throw PinotException(s"'${field.name}' not found in Pinot server 
response")
+        } else {
+          // pinot column data type can be used directly,
+          // because all of them is supported in spark schema
+          val columnDataType = 
dataTable.getDataSchema.getColumnDataType(colIndex)
+          readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
+        }
+      }
+      InternalRow.fromSeq(columns)
+    }
+  }
+
+  private def readPinotColumnData(
+      dataTable: DataTable,
+      columnDataType: ColumnDataType,
+      rowIndex: Int,
+      colIndex: Int): Any = columnDataType match {
+    // single column types
+    case ColumnDataType.STRING =>

Review comment:
       what about `BYTES`?

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotUtils.scala
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.spi.data.{FieldSpec, Schema}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+
+private[pinot] object PinotUtils {
+  private val OFFLINE_TABLE_SUFFIX = s"_${PinotTableTypes.OFFLINE}"
+  private val REALTIME_TABLE_SUFFIX = s"_${PinotTableTypes.REALTIME}"
+
+  /** Extract raw pinot table name. */
+  def getRawTableName(tableName: String): String = {
+    if (tableName.endsWith(OFFLINE_TABLE_SUFFIX)) {

Review comment:
       Try to see if you can reuse the methods in 
`org.apache.pinot.spi.utils.builder.TableNameBuilder`

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotUtils.scala
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.spi.data.{FieldSpec, Schema}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+
+private[pinot] object PinotUtils {
+  private val OFFLINE_TABLE_SUFFIX = s"_${PinotTableTypes.OFFLINE}"
+  private val REALTIME_TABLE_SUFFIX = s"_${PinotTableTypes.REALTIME}"
+
+  /** Extract raw pinot table name. */
+  def getRawTableName(tableName: String): String = {
+    if (tableName.endsWith(OFFLINE_TABLE_SUFFIX)) {
+      tableName.substring(0, tableName.length - OFFLINE_TABLE_SUFFIX.length)
+    } else if (tableName.endsWith(REALTIME_TABLE_SUFFIX)) {
+      tableName.substring(0, tableName.length - REALTIME_TABLE_SUFFIX.length)
+    } else {
+      tableName
+    }
+  }
+
+  /** Return offline/realtime table type, or None if table is hybrid. */
+  def getTableType(tableName: String): Option[PinotTableType] = {

Review comment:
       `TableNameBuilder .getTableTypeFromTableName()`

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.util.regex.{Matcher, Pattern}
+
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableType
+import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+
+private[pinot] object PinotSplitter extends Logging {
+  private val PINOT_SERVER_PATTERN = Pattern.compile("Server_(.*)_(\\d+)")
+
+  def generatePinotSplits(
+      generatedSQLs: GeneratedSQLs,
+      routingTable: Map[String, Map[String, List[String]]],
+      segmentsPerSplit: Int): List[PinotSplit] = {
+    routingTable.flatMap {
+      case (tableType, serversToSegments) =>
+        serversToSegments
+          .map { case (server, segments) => parseServerInput(server, segments) 
}
+          .flatMap {
+            case (matcher, segments) =>
+              createPinotSplitsFromSubSplits(
+                tableType,
+                generatedSQLs,
+                matcher,
+                segments,
+                segmentsPerSplit
+              )
+          }
+    }.toList
+  }
+
+  private def parseServerInput(server: String, segments: List[String]): 
(Matcher, List[String]) = {
+    val matcher = PINOT_SERVER_PATTERN.matcher(server)
+    if (matcher.matches() && matcher.groupCount() == 2) matcher -> segments
+    else throw PinotException(s"'$server' did not match!?")
+  }
+
+  private def createPinotSplitsFromSubSplits(
+      tableType: PinotTableType,
+      generatedSQLs: GeneratedSQLs,
+      serverMatcher: Matcher,
+      segments: List[String],
+      segmentsPerSplit: Int): Iterator[PinotSplit] = {
+    val serverHost = serverMatcher.group(1)
+    val serverPort = serverMatcher.group(2)
+    val maxSegmentCount = Math.min(segments.size, segmentsPerSplit)
+    segments.grouped(maxSegmentCount).map { subSegments =>
+      val serverAndSegments =
+        PinotServerAndSegments(serverHost, serverPort, subSegments, tableType)
+      PinotSplit(generatedSQLs, serverAndSegments)
+    }
+  }
+}
+
+private[pinot] case class PinotSplit(
+    generatedSQLs: GeneratedSQLs,
+    serverAndSegments: PinotServerAndSegments)
+
+private[pinot] case class PinotServerAndSegments(
+    serverHost: String,
+    serverPort: String,
+    segments: List[String],
+    serverType: PinotTableType) {

Review comment:
       Why we need this ? Ideally it should just be inside the `GeneratedSQLs`? 
So we will send generated query to the given server for some segments.

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
##########
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import java.net.{URI, URLEncoder}
+import java.util.regex.Pattern
+
+import org.apache.pinot.connector.spark.decodeTo
+import org.apache.pinot.connector.spark.exceptions.{HttpStatusCodeException, 
PinotException}
+import org.apache.pinot.connector.spark.utils.{HttpUtils, Logging}
+
+import scala.util.{Failure, Success, Try}
+import io.circe.generic.auto._
+import org.apache.pinot.connector.spark.connector.Constants.PinotTableTypes
+import org.apache.pinot.connector.spark.connector.query.GeneratedPQLs
+import org.apache.pinot.spi.data.Schema
+
+/**
+ * Client that read/write/prepare required data from/to Pinot.
+ */
+private[pinot] object PinotClusterClient extends Logging {
+
+  def getTableSchema(controllerUrl: String, tableName: String): Schema = {
+    val rawTableName = PinotUtils.getRawTableName(tableName)
+    Try {
+      val uri = new URI(s"http://$controllerUrl/tables/$rawTableName/schema";)
+      val response = HttpUtils.sendGetRequest(uri)
+      Schema.fromString(response)
+    } match {
+      case Success(response) =>
+        logDebug(s"Pinot schema received successfully for table 
'$rawTableName'")
+        response
+      case Failure(exception) =>
+        throw PinotException(
+          s"An error occurred while getting Pinot schema for table 
'$rawTableName'",
+          exception
+        )
+    }
+  }
+
+  /**
+   * Get available broker urls(host:port) for given table.
+   * This method is used when if broker instances not defined in the 
datasource options.
+   */
+  def getBrokerInstances(controllerUrl: String, tableName: String): 
List[String] = {
+    val brokerPattern = Pattern.compile("Broker_(.*)_(\\d+)")
+    val rawTableName = PinotUtils.getRawTableName(tableName)
+    Try {
+      val uri = new 
URI(s"http://$controllerUrl/tables/$rawTableName/instances";)

Review comment:
       here: https://github.com/apache/incubator-pinot/pull/5685

##########
File path: 
pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotUtils.scala
##########
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.connector.Constants.{PinotTableType, 
PinotTableTypes}
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.spi.data.{FieldSpec, Schema}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+
+private[pinot] object PinotUtils {
+  private val OFFLINE_TABLE_SUFFIX = s"_${PinotTableTypes.OFFLINE}"
+  private val REALTIME_TABLE_SUFFIX = s"_${PinotTableTypes.REALTIME}"
+
+  /** Extract raw pinot table name. */
+  def getRawTableName(tableName: String): String = {
+    if (tableName.endsWith(OFFLINE_TABLE_SUFFIX)) {

Review comment:
       This method is already implemented: 
   `TableNameBuilder.extractRawTableName()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to