This is an automated email from the ASF dual-hosted git repository.
cbalci pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 41f36160b08 Use /table/livebrokers/ api in pinot spark connector
(#14802)
41f36160b08 is described below
commit 41f36160b08b39b11201aaeaaec022eac378bc86
Author: rohit <[email protected]>
AuthorDate: Fri Sep 12 23:45:20 2025 +0530
Use /table/livebrokers/ api in pinot spark connector (#14802)
---
pinot-connectors/pinot-spark-common/pom.xml | 5 ++
.../spark/common/PinotClusterClient.scala | 21 +++--
.../pinot/connector/spark/common/package.scala | 11 +++
.../spark/common/PinotClusterClientTest.scala | 91 ++++++++++++++++++++++
4 files changed, 121 insertions(+), 7 deletions(-)
diff --git a/pinot-connectors/pinot-spark-common/pom.xml
b/pinot-connectors/pinot-spark-common/pom.xml
index be5de508e72..8f9ecba2a59 100644
--- a/pinot-connectors/pinot-spark-common/pom.xml
+++ b/pinot-connectors/pinot-spark-common/pom.xml
@@ -63,6 +63,11 @@
<artifactId>scalatest_${scala.compat.version}</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
index 03c5ff185d3..eb0939ff50a 100644
---
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
+++
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.connector.spark.common
+import com.fasterxml.jackson.core.`type`.TypeReference
import java.net.{URI, URLEncoder}
import org.apache.pinot.connector.spark.common.query.ScanQuery
import org.apache.pinot.spi.config.table.TableType
@@ -33,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties
*/
private[pinot] object PinotClusterClient extends Logging {
private val TABLE_SCHEMA_TEMPLATE = "%s://%s/tables/%s/schema"
- private val TABLE_BROKER_INSTANCES_TEMPLATE = "%s://%s/v2/brokers/tables/%s"
+ private val TABLE_BROKER_INSTANCES_TEMPLATE =
"%s://%s/tables/livebrokers?tables=%s"
private val TIME_BOUNDARY_TEMPLATE = "%s://%s/debug/timeBoundary/%s"
private val ROUTING_TABLE_TEMPLATE =
"%s://%s/debug/routingTable/sql?query=%s"
private val ROUTING_TABLE_SIMPLE_TEMPLATE = "%s://%s/debug/routingTable/%s"
@@ -71,12 +72,16 @@ private[pinot] object PinotClusterClient extends Logging {
val uri = new URI(String.format(TABLE_BROKER_INSTANCES_TEMPLATE, scheme,
targetUrl, tableName))
val response = HttpUtils.sendGetRequest(uri, authHeader, authToken)
- // Decode the JSON response into a list of BrokerEntry objects
- val brokerEntries = decodeTo(response,
classOf[Array[BrokerEntry]]).toList
+ // Decode the JSON response into a map of table name to list of
LiveBrokerInfo objects
+ val typeRef = new TypeReference[Map[String, List[BrokerInstanceInfo]]] {}
+ val tableToInstancesMap = decodeTo(response, typeRef)
- // Map the broker entries to "host:port" strings
- val brokerUrls = brokerEntries.map { brokerEntry =>
- s"${brokerEntry.host}:${brokerEntry.port}"
+ // Extract the LiveBrokerInfo list for the requested table
+ val brokerInfos = tableToInstancesMap.getOrElse(tableName,
List.empty[BrokerInstanceInfo])
+
+ // Map the broker info objects to "host:port" strings
+ val brokerUrls = brokerInfos.map { brokerInfo =>
+ s"${brokerInfo.host}:${brokerInfo.port}"
}
if (brokerUrls.isEmpty) {
@@ -262,7 +267,9 @@ private[pinot] case class TimeBoundaryInfo(timeColumn:
String, timeValue: String
}
@JsonIgnoreProperties(ignoreUnknown = true)
-private[pinot] case class BrokerEntry(host: String, port: Int)
+private[pinot] case class BrokerInstanceInfo(host: String,
+ port: Int,
+ instanceName: String)
@JsonIgnoreProperties(ignoreUnknown = true)
private[pinot] case class InstanceInfo(instanceName: String,
diff --git
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
index d253f1673f1..ed93f33197a 100644
---
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
+++
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
@@ -20,6 +20,7 @@ package org.apache.pinot.connector.spark
import java.util.Optional
+import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -38,6 +39,16 @@ package object common {
}
}
+ /** Parse json string to given model using TypeReference for complex generic
types. */
+ def decodeTo[A](jsonString: String, typeReference: TypeReference[A]): A = {
+ try {
+ objectMapper.readValue(jsonString, typeReference)
+ } catch {
+ case e: Exception =>
+ throw new IllegalStateException(s"Error occurred while parsing JSON
string: ${e.getMessage}", e)
+ }
+ }
+
def scalafyOptional[A](value: Optional[A]): Option[A] = {
if (value.isPresent) Some(value.get()) else None
}
diff --git
a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotClusterClientTest.scala
b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotClusterClientTest.scala
new file mode 100644
index 00000000000..919a1624c13
--- /dev/null
+++
b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotClusterClientTest.scala
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.common
+
+import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
+import java.io.OutputStream
+import java.net.InetSocketAddress
+
+class PinotClusterClientTest extends BaseTest {
+
+ test("getBrokerInstances should successfully parse livebrokers API
response") {
+ // Create a test HTTP server to provide controlled response
+ val testServer = HttpServer.create(new InetSocketAddress(0), 0)
+ val serverPort = testServer.getAddress.getPort
+
+ val tableName = "testTable"
+ val mockResponse = s"""{
+ "$tableName": [
+ {
+ "host": "dca24-3fs",
+ "port": 29041,
+ "instanceName": "Broker_abfa3b77-a1e7-4525-97f9-3114898606e5"
+ },
+ {
+ "host": "dca50-a72",
+ "port": 25607,
+ "instanceName": "Broker_e41eecea-4f3d-4528-9510-7e63e282b441"
+ }
+ ]
+ }"""
+
+ testServer.createContext("/tables/livebrokers", new HttpHandler {
+ override def handle(exchange: HttpExchange): Unit = {
+ val query = exchange.getRequestURI.getQuery
+ if (query != null && query.contains(s"tables=$tableName")) {
+ val response = mockResponse.getBytes("UTF-8")
+ exchange.sendResponseHeaders(200, response.length)
+ val os: OutputStream = exchange.getResponseBody
+ os.write(response)
+ os.close()
+ } else {
+ exchange.sendResponseHeaders(404, 0)
+ exchange.getResponseBody.close()
+ }
+ }
+ })
+
+ testServer.start()
+
+ try {
+ val controllerUrl = s"localhost:$serverPort"
+ val brokerUrls = PinotClusterClient.getBrokerInstances(controllerUrl,
tableName)
+
+ brokerUrls should have size 2
+ brokerUrls should contain("dca24-3fs:29041")
+ brokerUrls should contain("dca50-a72:25607")
+
+ } finally {
+ testServer.stop(0)
+ }
+ }
+
+ test("getBrokerInstances should throw PinotException on HttpUtils failure") {
+ // Test that getBrokerInstances handles failures by using an unreachable
endpoint
+ val tableName = "testTable"
+ val controllerUrl = "localhost:99999" // Unreachable port
+
+ val exception = intercept[PinotException] {
+ PinotClusterClient.getBrokerInstances(controllerUrl, tableName)
+ }
+
+ // Verify the exception message
+ exception.getMessage should include(s"An error occurred while getting
broker instances for table '$tableName'")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]