This is an automated email from the ASF dual-hosted git repository.

cbalci pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 41f36160b08 Use /table/livebrokers/ api in pinot spark connector 
(#14802)
41f36160b08 is described below

commit 41f36160b08b39b11201aaeaaec022eac378bc86
Author: rohit <[email protected]>
AuthorDate: Fri Sep 12 23:45:20 2025 +0530

    Use /table/livebrokers/ api in pinot spark connector (#14802)
---
 pinot-connectors/pinot-spark-common/pom.xml        |  5 ++
 .../spark/common/PinotClusterClient.scala          | 21 +++--
 .../pinot/connector/spark/common/package.scala     | 11 +++
 .../spark/common/PinotClusterClientTest.scala      | 91 ++++++++++++++++++++++
 4 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/pinot-connectors/pinot-spark-common/pom.xml 
b/pinot-connectors/pinot-spark-common/pom.xml
index be5de508e72..8f9ecba2a59 100644
--- a/pinot-connectors/pinot-spark-common/pom.xml
+++ b/pinot-connectors/pinot-spark-common/pom.xml
@@ -63,6 +63,11 @@
           <artifactId>scalatest_${scala.compat.version}</artifactId>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>org.mockito</groupId>
+          <artifactId>mockito-core</artifactId>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
       <build>
         <plugins>
diff --git 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
index 03c5ff185d3..eb0939ff50a 100644
--- 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
+++ 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.connector.spark.common
 
+import com.fasterxml.jackson.core.`type`.TypeReference
 import java.net.{URI, URLEncoder}
 import org.apache.pinot.connector.spark.common.query.ScanQuery
 import org.apache.pinot.spi.config.table.TableType
@@ -33,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties
  */
 private[pinot] object PinotClusterClient extends Logging {
   private val TABLE_SCHEMA_TEMPLATE = "%s://%s/tables/%s/schema"
-  private val TABLE_BROKER_INSTANCES_TEMPLATE = "%s://%s/v2/brokers/tables/%s"
+  private val TABLE_BROKER_INSTANCES_TEMPLATE = 
"%s://%s/tables/livebrokers?tables=%s"
   private val TIME_BOUNDARY_TEMPLATE = "%s://%s/debug/timeBoundary/%s"
   private val ROUTING_TABLE_TEMPLATE = 
"%s://%s/debug/routingTable/sql?query=%s"
   private val ROUTING_TABLE_SIMPLE_TEMPLATE = "%s://%s/debug/routingTable/%s"
@@ -71,12 +72,16 @@ private[pinot] object PinotClusterClient extends Logging {
       val uri = new URI(String.format(TABLE_BROKER_INSTANCES_TEMPLATE, scheme, 
targetUrl, tableName))
       val response = HttpUtils.sendGetRequest(uri, authHeader, authToken)
 
-      // Decode the JSON response into a list of BrokerEntry objects
-      val brokerEntries = decodeTo(response, 
classOf[Array[BrokerEntry]]).toList
+      // Decode the JSON response into a map of table name to list of 
LiveBrokerInfo objects
+      val typeRef = new TypeReference[Map[String, List[BrokerInstanceInfo]]] {}
+      val tableToInstancesMap = decodeTo(response, typeRef)
 
-      // Map the broker entries to "host:port" strings
-      val brokerUrls = brokerEntries.map { brokerEntry =>
-        s"${brokerEntry.host}:${brokerEntry.port}"
+      // Extract the LiveBrokerInfo list for the requested table
+      val brokerInfos = tableToInstancesMap.getOrElse(tableName, 
List.empty[BrokerInstanceInfo])
+
+      // Map the broker info objects to "host:port" strings
+      val brokerUrls = brokerInfos.map { brokerInfo =>
+        s"${brokerInfo.host}:${brokerInfo.port}"
       }
 
       if (brokerUrls.isEmpty) {
@@ -262,7 +267,9 @@ private[pinot] case class TimeBoundaryInfo(timeColumn: 
String, timeValue: String
 }
 
 @JsonIgnoreProperties(ignoreUnknown = true)
-private[pinot] case class BrokerEntry(host: String, port: Int)
+private[pinot] case class BrokerInstanceInfo(host: String,
+                                             port: Int,
+                                             instanceName: String)
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 private[pinot] case class InstanceInfo(instanceName: String,
diff --git 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
index d253f1673f1..ed93f33197a 100644
--- 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
+++ 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
@@ -20,6 +20,7 @@ package org.apache.pinot.connector.spark
 
 import java.util.Optional
 
+import com.fasterxml.jackson.core.`type`.TypeReference
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
@@ -38,6 +39,16 @@ package object common {
     }
   }
 
+  /** Parse json string to given model using TypeReference for complex generic 
types. */
+  def decodeTo[A](jsonString: String, typeReference: TypeReference[A]): A = {
+    try {
+      objectMapper.readValue(jsonString, typeReference)
+    } catch {
+      case e: Exception =>
+        throw new IllegalStateException(s"Error occurred while parsing JSON 
string: ${e.getMessage}", e)
+    }
+  }
+
   def scalafyOptional[A](value: Optional[A]): Option[A] = {
     if (value.isPresent) Some(value.get()) else None
   }
diff --git 
a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotClusterClientTest.scala
 
b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotClusterClientTest.scala
new file mode 100644
index 00000000000..919a1624c13
--- /dev/null
+++ 
b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotClusterClientTest.scala
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.common
+
+import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
+import java.io.OutputStream
+import java.net.InetSocketAddress
+
+class PinotClusterClientTest extends BaseTest {
+
+  test("getBrokerInstances should successfully parse livebrokers API 
response") {
+    // Create a test HTTP server to provide controlled response
+    val testServer = HttpServer.create(new InetSocketAddress(0), 0)
+    val serverPort = testServer.getAddress.getPort
+
+    val tableName = "testTable"
+    val mockResponse = s"""{
+      "$tableName": [
+        {
+          "host": "dca24-3fs",
+          "port": 29041,
+          "instanceName": "Broker_abfa3b77-a1e7-4525-97f9-3114898606e5"
+        },
+        {
+          "host": "dca50-a72",
+          "port": 25607,
+          "instanceName": "Broker_e41eecea-4f3d-4528-9510-7e63e282b441"
+        }
+      ]
+    }"""
+
+    testServer.createContext("/tables/livebrokers", new HttpHandler {
+      override def handle(exchange: HttpExchange): Unit = {
+        val query = exchange.getRequestURI.getQuery
+        if (query != null && query.contains(s"tables=$tableName")) {
+          val response = mockResponse.getBytes("UTF-8")
+          exchange.sendResponseHeaders(200, response.length)
+          val os: OutputStream = exchange.getResponseBody
+          os.write(response)
+          os.close()
+        } else {
+          exchange.sendResponseHeaders(404, 0)
+          exchange.getResponseBody.close()
+        }
+      }
+    })
+
+    testServer.start()
+
+    try {
+      val controllerUrl = s"localhost:$serverPort"
+      val brokerUrls = PinotClusterClient.getBrokerInstances(controllerUrl, 
tableName)
+
+      brokerUrls should have size 2
+      brokerUrls should contain("dca24-3fs:29041")
+      brokerUrls should contain("dca50-a72:25607")
+
+    } finally {
+      testServer.stop(0)
+    }
+  }
+
+  test("getBrokerInstances should throw PinotException on HttpUtils failure") {
+    // Test that getBrokerInstances handles failures by using an unreachable 
endpoint
+    val tableName = "testTable"
+    val controllerUrl = "localhost:99999"  // Unreachable port
+
+    val exception = intercept[PinotException] {
+      PinotClusterClient.getBrokerInstances(controllerUrl, tableName)
+    }
+
+    // Verify the exception message
+    exception.getMessage should include(s"An error occurred while getting 
broker instances for table '$tableName'")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to