This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4baf65d4ec [spark connector] replace the use of circe with jackson 
(#15321)
4baf65d4ec is described below

commit 4baf65d4ecc63698cd52bf3c52cf705e5c2c9d44
Author: Tse Kit Yam <[email protected]>
AuthorDate: Fri Mar 21 05:29:45 2025 +0800

    [spark connector] replace the use of circe with jackson (#15321)
---
 LICENSE-binary                                     |  5 ---
 pinot-connectors/pinot-spark-common/pom.xml        |  8 ++---
 .../spark/common/PinotClusterClient.scala          | 37 +++++++++++++---------
 .../pinot/connector/spark/common/package.scala     | 18 +++++++----
 pom.xml                                            | 11 -------
 5 files changed, 35 insertions(+), 44 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 2e416d7bd4..74fc70e686 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -295,11 +295,6 @@ commons-io:commons-io:2.16.1
 commons-pool:commons-pool:1.6
 info.picocli:picocli:4.7.6
 io.airlift:aircompressor:0.27
-io.circe:circe-core_2.12:0.14.10
-io.circe:circe-generic_2.12:0.14.10
-io.circe:circe-jawn_2.12:0.14.10
-io.circe:circe-numbers_2.12:0.14.10
-io.circe:circe-parser_2.12:0.14.10
 io.confluent:common-utils:7.7.0
 io.confluent:kafka-avro-serializer:7.7.0
 io.confluent:kafka-protobuf-provider:7.7.0
diff --git a/pinot-connectors/pinot-spark-common/pom.xml 
b/pinot-connectors/pinot-spark-common/pom.xml
index 2f585cfeee..f0bce81dfb 100644
--- a/pinot-connectors/pinot-spark-common/pom.xml
+++ b/pinot-connectors/pinot-spark-common/pom.xml
@@ -49,12 +49,8 @@
           <artifactId>scala-xml_${scala.compat.version}</artifactId>
         </dependency>
         <dependency>
-          <groupId>io.circe</groupId>
-          <artifactId>circe-parser_${scala.compat.version}</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>io.circe</groupId>
-          <artifactId>circe-generic_${scala.compat.version}</artifactId>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
         </dependency>
         <dependency>
           <groupId>org.scala-lang</groupId>
diff --git 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
index 1c5dafe2a5..fb7cb2d9d5 100644
--- 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
+++ 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
@@ -19,8 +19,6 @@
 package org.apache.pinot.connector.spark.common
 
 import java.net.{URI, URLEncoder}
-import io.circe.Decoder
-import io.circe.generic.auto._
 import org.apache.pinot.connector.spark.common.query.ScanQuery
 import org.apache.pinot.spi.config.table.TableType
 import org.apache.pinot.spi.data.Schema
@@ -28,6 +26,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder
 
 import scala.util.{Failure, Success, Try}
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+
 /**
  * PinotCusterClient reads metadata from Pinot controller.
  */
@@ -64,13 +64,16 @@ private[pinot] object PinotClusterClient extends Logging {
     Try {
       val uri = new URI(String.format(TABLE_BROKER_INSTANCES_TEMPLATE, 
controllerUrl, tableName))
       val response = HttpUtils.sendGetRequest(uri)
-      implicit val decodeIntOrString: Decoder[Either[Int, String]] =
-        Decoder[Int].map(Left(_)).or(Decoder[String].map(Right(_)))
-      val brokerUrls = decodeTo[List[Map[String, Either[Int, 
String]]]](response).map {
-        brokerEntry =>
-          val host = brokerEntry.get("host").get.right.get
-          val port = brokerEntry.get("port").get.left.get
-          s"$host:$port"
+
+      // Define a case class to represent the broker entry
+      case class BrokerEntry(host: String, port: Int)
+
+      // Decode the JSON response into a list of BrokerEntry objects
+      val brokerEntries = decodeTo(response, 
classOf[Array[BrokerEntry]]).toList
+
+      // Map the broker entries to "host:port" strings
+      val brokerUrls = brokerEntries.map { brokerEntry =>
+        s"${brokerEntry.host}:${brokerEntry.port}"
       }
 
       if (brokerUrls.isEmpty) {
@@ -103,7 +106,7 @@ private[pinot] object PinotClusterClient extends Logging {
       // pinot converts the given table name to the offline table name 
automatically
       val uri = new URI(String.format(TIME_BOUNDARY_TEMPLATE, brokerUrl, 
rawTableName))
       val response = HttpUtils.sendGetRequest(uri)
-      decodeTo[TimeBoundaryInfo](response)
+      decodeTo(response, classOf[TimeBoundaryInfo])
     } match {
       case Success(decodedResponse) =>
         logDebug(s"Received time boundary for table $tableName, 
$decodedResponse")
@@ -180,13 +183,15 @@ private[pinot] object PinotClusterClient extends Logging {
     Try {
       val uri = new URI(String.format(INSTANCES_API_TEMPLATE, controllerUrl, 
instance))
       val response = HttpUtils.sendGetRequest(uri)
-      decodeTo[InstanceInfo](response)
+
+      // Use the updated decodeTo function with Jackson
+      decodeTo(response, classOf[InstanceInfo])
     } match {
-      case Success(decodedReponse) =>
-        decodedReponse
+      case Success(decodedResponse) =>
+        decodedResponse
       case Failure(exception) =>
         throw PinotException(
-          s"An error occured while reading instance info for: '$instance'",
+          s"An error occurred while reading instance info for: '$instance'",
           exception
         )
     }
@@ -197,7 +202,8 @@ private[pinot] object PinotClusterClient extends Logging {
       val encodedSqlQueryParam = URLEncoder.encode(sql, "UTF-8")
       val uri = new URI(String.format(ROUTING_TABLE_TEMPLATE, brokerUrl, 
encodedSqlQueryParam))
       val response = HttpUtils.sendGetRequest(uri)
-      decodeTo[Map[String, List[String]]](response)
+
+      decodeTo(response, classOf[Map[String, List[String]]])
     } match {
       case Success(decodedResponse) =>
         logDebug(s"Received routing table for query $sql, $decodedResponse")
@@ -218,6 +224,7 @@ private[pinot] case class TimeBoundaryInfo(timeColumn: 
String, timeValue: String
   def getRealtimePredicate: String = s""""$timeColumn" >= $timeValue"""
 }
 
+@JsonIgnoreProperties(ignoreUnknown = true)
 private[pinot] case class InstanceInfo(instanceName: String,
                                        hostName: String,
                                        port: String,
diff --git 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
index 0c3fecb80d..d253f1673f 100644
--- 
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
+++ 
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
@@ -20,17 +20,21 @@ package org.apache.pinot.connector.spark
 
 import java.util.Optional
 
-import io.circe.{Decoder, parser}
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
 package object common {
 
+  // Define a singleton ObjectMapper instance
+  private val objectMapper: ObjectMapper = new 
ObjectMapper().registerModule(DefaultScalaModule)
+
   /** Parse json string to given model. */
-  def decodeTo[A: Decoder](jsonString: String): A = {
-    parser.decode[A](jsonString) match {
-      case Left(error) =>
-        throw new IllegalStateException(s"Error occurred while parsing json 
string, $error")
-      case Right(value) =>
-        value
+  def decodeTo[A](jsonString: String, clazz: Class[A]): A = {
+    try {
+      objectMapper.readValue(jsonString, clazz)
+    } catch {
+      case e: Exception =>
+        throw new IllegalStateException(s"Error occurred while parsing JSON 
string: ${e.getMessage}", e)
     }
   }
 
diff --git a/pom.xml b/pom.xml
index e8612834cd..4f1973bbd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,7 +184,6 @@
     <jbcrypt.version>0.4</jbcrypt.version>
     <plexus-classworlds.version>2.8.0</plexus-classworlds.version>
     <scala-xml.version>2.3.0</scala-xml.version>
-    <circe.version>0.14.12</circe.version>
 
     <spark2.version>2.4.8</spark2.version>
     <spark3.version>3.5.3</spark3.version>
@@ -1629,16 +1628,6 @@
         <artifactId>scala-xml_${scala.compat.version}</artifactId>
         <version>${scala-xml.version}</version>
       </dependency>
-      <dependency>
-        <groupId>io.circe</groupId>
-        <artifactId>circe-parser_${scala.compat.version}</artifactId>
-        <version>${circe.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>io.circe</groupId>
-        <artifactId>circe-generic_${scala.compat.version}</artifactId>
-        <version>${circe.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.codehaus.plexus</groupId>
         <artifactId>plexus-classworlds</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to