Copilot commented on code in PR #16666:
URL: https://github.com/apache/pinot/pull/16666#discussion_r2322539020


##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala:
##########
@@ -179,10 +188,16 @@ private[pinot] object PinotClusterClient extends Logging {
    *
    * @return InstanceInfo
    */
-  def getInstanceInfo(controllerUrl: String, instance: String): InstanceInfo = 
{
+  def getInstanceInfo(controllerUrl: String, instance: String, useHttps: 
Boolean = false, authHeader: Option[String] = None, authToken: Option[String] = 
None, proxyEnabled: Boolean = false): InstanceInfo = {
     Try {
-      val uri = new URI(String.format(INSTANCES_API_TEMPLATE, controllerUrl, 
instance))
-      val response = HttpUtils.sendGetRequest(uri)
+      val scheme = if (useHttps) "https" else "http"
+      val targetUrl = if (proxyEnabled) controllerUrl else controllerUrl

Review Comment:
   This conditional assignment is redundant since both branches assign the same 
value (controllerUrl). The logic appears incomplete or the condition should be 
removed.
   ```suggestion
         val targetUrl = controllerUrl
   ```



##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/HttpUtils.scala:
##########
@@ -41,17 +47,151 @@ private[pinot] object HttpUtils extends Logging {
       .setResponseTimeout(Timeout.of(GET_REQUEST_SOCKET_TIMEOUT_MS, 
TimeUnit.MILLISECONDS))
       .build()
 
+  // Thread-safe clients for HTTP and HTTPS
   private val httpClient = HttpClients.custom()
     .setDefaultRequestConfig(requestConfig)
     .build()
 
+  private var httpsClient: 
Option[org.apache.hc.client5.http.impl.classic.CloseableHttpClient] = None
+
+  /**
+   * Configure HTTPS client with SSL/TLS settings
+   */
+  def configureHttpsClient(keystorePath: Option[String],
+                          keystorePassword: Option[String],
+                          truststorePath: Option[String],
+                          truststorePassword: Option[String]): Unit = {
+    try {
+      val sslContextBuilder = SSLContexts.custom()
+
+      // Configure keystore if provided
+      (keystorePath, keystorePassword) match {
+        case (Some(ksPath), Some(ksPassword)) =>
+          val keystore = KeyStore.getInstance(KeyStore.getDefaultType)
+          keystore.load(new FileInputStream(new File(ksPath)), 
ksPassword.toCharArray)
+          sslContextBuilder.loadKeyMaterial(keystore, ksPassword.toCharArray)
+          logInfo(s"Configured keystore: $ksPath")
+        case (Some(_), None) =>
+          throw new IllegalArgumentException("Keystore password is required 
when keystore path is provided")
+        case _ => // No keystore configuration
+      }
+
+      // Configure truststore if provided, otherwise trust all certificates
+      (truststorePath, truststorePassword) match {
+        case (Some(tsPath), Some(tsPassword)) =>
+          val truststore = KeyStore.getInstance(KeyStore.getDefaultType)
+          truststore.load(new FileInputStream(new File(tsPath)), 
tsPassword.toCharArray)
+          sslContextBuilder.loadTrustMaterial(truststore, null)
+          logInfo(s"Configured truststore: $tsPath")
+        case (Some(_), None) =>
+          throw new IllegalArgumentException("Truststore password is required 
when truststore path is provided")
+        case _ =>
+          // If no truststore is provided, trust all certificates (not 
recommended for production)
+          sslContextBuilder.loadTrustMaterial(null, TrustAllStrategy.INSTANCE)
+          logWarning("No truststore configured, trusting all certificates (not 
recommended for production)")
+      }
+
+      val sslContext = sslContextBuilder.build()
+      val sslSocketFactory = new SSLConnectionSocketFactory(sslContext)
+
+      // Create a connection manager with SSL socket factory
+      val connectionManager = 
org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder.create()
+        .setSSLSocketFactory(sslSocketFactory)
+        .build()
+
+      httpsClient = Some(HttpClients.custom()
+        .setDefaultRequestConfig(requestConfig)
+        .setConnectionManager(connectionManager)
+        .build())
+      
+      logInfo("HTTPS client configured successfully")
+    } catch {
+      case e: Exception =>
+        logError("Failed to configure HTTPS client", e)
+        throw new RuntimeException("HTTPS configuration failed", e)
+    }
+  }
+
   def sendGetRequest(uri: URI): String = {
+    sendGetRequest(uri, None, None)
+  }
+
+  def sendGetRequest(uri: URI, authHeader: Option[String], authToken: 
Option[String]): String = {
+    val requestBuilder = ClassicRequestBuilder.get(uri)
+    
+    // Add authentication header if provided
+    (authHeader, authToken) match {
+      case (Some(header), Some(token)) =>
+        requestBuilder.addHeader(header, token)
+      case (Some(header), None) =>
+        logWarning(s"Authentication header '$header' provided but no token 
specified")
+      case (None, Some(token)) =>
+        // Default to Authorization Bearer if only token is provided.
+        // If token already starts with 'Bearer ', use as-is to avoid double 
prefixing.
+        val value = if (token.startsWith("Bearer ")) token else s"Bearer 
$token"
+        requestBuilder.addHeader("Authorization", value)

Review Comment:
   The token processing logic is duplicated across multiple methods (lines 131, 
159, 94-95) with slight variations in Bearer prefix handling. This duplication 
could lead to inconsistent behavior and should be extracted into a common 
utility method.



##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala:
##########
@@ -98,7 +153,29 @@ private[reader] class PinotServerDataFetcher(
     val instanceConfig = new InstanceConfig(nullZkId)
     instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
     instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort)
-    // TODO: support netty-sec
+    
+    // Configure TLS for server instance if HTTPS is enabled
+    if (dataSourceOptions.useHttps) {
+      instanceConfig.getRecord.setSimpleField("TLS_PORT", 
pinotSplit.serverAndSegments.serverPort)

Review Comment:
   The TLS_PORT should use a different port than the regular HTTP port. Setting 
TLS_PORT to the same value as the regular serverPort could cause connection 
issues since HTTPS typically uses a different port.
   ```suggestion
         instanceConfig.getRecord.setSimpleField("TLS_PORT", 
dataSourceOptions.tlsPort.toString)
   ```



##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotGrpcServerDataFetcher.scala:
##########
@@ -18,34 +18,94 @@
  */
 package org.apache.pinot.connector.spark.common.reader
 
-import io.grpc.ManagedChannelBuilder
+import io.grpc.{ManagedChannelBuilder, Metadata}
+import io.grpc.stub.MetadataUtils
 import org.apache.pinot.common.datatable.{DataTable, DataTableFactory}
 import org.apache.pinot.common.proto.PinotQueryServerGrpc
 import org.apache.pinot.common.proto.Server.ServerRequest
-import org.apache.pinot.connector.spark.common.Logging
+import org.apache.pinot.connector.spark.common.{Logging, 
PinotDataSourceReadOptions}
 import org.apache.pinot.connector.spark.common.partition.PinotSplit
 import org.apache.pinot.spi.config.table.TableType
 
 import java.io.Closeable
+import java.net.URI
 import scala.collection.JavaConverters._
 
 /**
  * Data fetcher from Pinot Grpc server for specific segments.
  * Eg: offline-server1: segment1, segment2, segment3
  */
-private[reader] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
+private[reader] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit, 
readOptions: PinotDataSourceReadOptions)
   extends Logging with Closeable {
 
-  private val channel = ManagedChannelBuilder
-    .forAddress(pinotSplit.serverAndSegments.serverHost, 
pinotSplit.serverAndSegments.serverGrpcPort)
-    .usePlaintext()
-    .maxInboundMessageSize(Int.MaxValue)
-    .asInstanceOf[ManagedChannelBuilder[_]].build()
-  private val pinotServerBlockingStub = 
PinotQueryServerGrpc.newBlockingStub(channel)
+  private val (channelHost, channelPort) = {
+    if (readOptions.proxyEnabled && readOptions.grpcProxyUri.nonEmpty) {
+      val proxyUri = readOptions.grpcProxyUri.get
+      val (host, port) = proxyUri.split(":", 2) match {
+        case Array(h, p) if p.forall(_.isDigit) =>
+          (h, p.toInt)
+        case Array(h) =>
+          val defaultPort = if (readOptions.useHttps) 443 else 80
+          (h, defaultPort)
+        case _ =>
+          throw new IllegalArgumentException(s"Invalid grpcProxyUri: 
$proxyUri")
+      }

Review Comment:
   Using HTTP/HTTPS default ports (80/443) as fallback for gRPC proxy URI is 
incorrect. gRPC typically uses different ports (like 8094 as mentioned in the 
PR description). This could cause connection failures when no port is specified 
in grpcProxyUri.



##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala:
##########
@@ -98,7 +153,29 @@ private[reader] class PinotServerDataFetcher(
     val instanceConfig = new InstanceConfig(nullZkId)
     instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
     instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort)
-    // TODO: support netty-sec
+    
+    // Configure TLS for server instance if HTTPS is enabled
+    if (dataSourceOptions.useHttps) {
+      instanceConfig.getRecord.setSimpleField("TLS_PORT", 
pinotSplit.serverAndSegments.serverPort)
+    }
+    
+    // Configure gRPC port
+    instanceConfig.getRecord.setSimpleField("GRPC_PORT", 
dataSourceOptions.grpcPort.toString)
+    
+    // Configure proxy forwarding if enabled
+    if (dataSourceOptions.proxyEnabled && 
dataSourceOptions.grpcProxyUri.isDefined) {
+      // When using proxy, the server instance should point to the proxy
+      val proxyUri = dataSourceOptions.grpcProxyUri.get
+      val Array(proxyHost, proxyPort) = proxyUri.split(":")

Review Comment:
   This split operation assumes the proxy URI always contains a colon. If 
grpcProxyUri contains only a hostname without a port, this will throw an 
ArrayIndexOutOfBoundsException. The code should handle the case where no port 
is specified.
   ```suggestion
         val proxyParts = proxyUri.split(":")
         val proxyHost = proxyParts(0)
         val proxyPort = if (proxyParts.length > 1) proxyParts(1) else {
           // If no port is specified, use a default or throw an exception
           throw new IllegalArgumentException(s"Proxy URI '$proxyUri' must be 
in the format host:port")
         }
   ```



##########
pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/DataExtractor.scala:
##########
@@ -116,6 +118,13 @@ private[pinot] object DataExtractor {
       dataTable.getFloat(rowIndex, colIndex)
     case ColumnDataType.DOUBLE =>
       dataTable.getDouble(rowIndex, colIndex)
+    case ColumnDataType.BIG_DECIMAL =>
+      val bd = dataTable.getBigDecimal(rowIndex, colIndex)
+      // Map to Spark Decimal(38,18) by rescaling if needed; null-safe
+      if (bd == null) null else Decimal(bd.setScale(18, 
java.math.RoundingMode.HALF_UP), 38, 18)

Review Comment:
   Hardcoding the scale to 18 and precision to 38 for all BigDecimal values may 
cause precision loss or overflow for decimal values that don't fit this format. 
The scale and precision should be configurable or derived from the actual data.
   ```suggestion
         // Map to Spark Decimal using actual precision and scale; null-safe
         if (bd == null) null else Decimal(bd, bd.precision(), bd.scale())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to