This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4baf65d4ec [spark connector] replace the use of circe with jackson
(#15321)
4baf65d4ec is described below
commit 4baf65d4ecc63698cd52bf3c52cf705e5c2c9d44
Author: Tse Kit Yam <[email protected]>
AuthorDate: Fri Mar 21 05:29:45 2025 +0800
[spark connector] replace the use of circe with jackson (#15321)
---
LICENSE-binary | 5 ---
pinot-connectors/pinot-spark-common/pom.xml | 8 ++---
.../spark/common/PinotClusterClient.scala | 37 +++++++++++++---------
.../pinot/connector/spark/common/package.scala | 18 +++++++----
pom.xml | 11 -------
5 files changed, 35 insertions(+), 44 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 2e416d7bd4..74fc70e686 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -295,11 +295,6 @@ commons-io:commons-io:2.16.1
commons-pool:commons-pool:1.6
info.picocli:picocli:4.7.6
io.airlift:aircompressor:0.27
-io.circe:circe-core_2.12:0.14.10
-io.circe:circe-generic_2.12:0.14.10
-io.circe:circe-jawn_2.12:0.14.10
-io.circe:circe-numbers_2.12:0.14.10
-io.circe:circe-parser_2.12:0.14.10
io.confluent:common-utils:7.7.0
io.confluent:kafka-avro-serializer:7.7.0
io.confluent:kafka-protobuf-provider:7.7.0
diff --git a/pinot-connectors/pinot-spark-common/pom.xml
b/pinot-connectors/pinot-spark-common/pom.xml
index 2f585cfeee..f0bce81dfb 100644
--- a/pinot-connectors/pinot-spark-common/pom.xml
+++ b/pinot-connectors/pinot-spark-common/pom.xml
@@ -49,12 +49,8 @@
<artifactId>scala-xml_${scala.compat.version}</artifactId>
</dependency>
<dependency>
- <groupId>io.circe</groupId>
- <artifactId>circe-parser_${scala.compat.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>io.circe</groupId>
- <artifactId>circe-generic_${scala.compat.version}</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
diff --git
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
index 1c5dafe2a5..fb7cb2d9d5 100644
---
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
+++
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala
@@ -19,8 +19,6 @@
package org.apache.pinot.connector.spark.common
import java.net.{URI, URLEncoder}
-import io.circe.Decoder
-import io.circe.generic.auto._
import org.apache.pinot.connector.spark.common.query.ScanQuery
import org.apache.pinot.spi.config.table.TableType
import org.apache.pinot.spi.data.Schema
@@ -28,6 +26,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder
import scala.util.{Failure, Success, Try}
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+
/**
* PinotCusterClient reads metadata from Pinot controller.
*/
@@ -64,13 +64,16 @@ private[pinot] object PinotClusterClient extends Logging {
Try {
val uri = new URI(String.format(TABLE_BROKER_INSTANCES_TEMPLATE,
controllerUrl, tableName))
val response = HttpUtils.sendGetRequest(uri)
- implicit val decodeIntOrString: Decoder[Either[Int, String]] =
- Decoder[Int].map(Left(_)).or(Decoder[String].map(Right(_)))
- val brokerUrls = decodeTo[List[Map[String, Either[Int,
String]]]](response).map {
- brokerEntry =>
- val host = brokerEntry.get("host").get.right.get
- val port = brokerEntry.get("port").get.left.get
- s"$host:$port"
+
+ // Define a case class to represent the broker entry
+ case class BrokerEntry(host: String, port: Int)
+
+ // Decode the JSON response into a list of BrokerEntry objects
+ val brokerEntries = decodeTo(response,
classOf[Array[BrokerEntry]]).toList
+
+ // Map the broker entries to "host:port" strings
+ val brokerUrls = brokerEntries.map { brokerEntry =>
+ s"${brokerEntry.host}:${brokerEntry.port}"
}
if (brokerUrls.isEmpty) {
@@ -103,7 +106,7 @@ private[pinot] object PinotClusterClient extends Logging {
// pinot converts the given table name to the offline table name
automatically
val uri = new URI(String.format(TIME_BOUNDARY_TEMPLATE, brokerUrl,
rawTableName))
val response = HttpUtils.sendGetRequest(uri)
- decodeTo[TimeBoundaryInfo](response)
+ decodeTo(response, classOf[TimeBoundaryInfo])
} match {
case Success(decodedResponse) =>
logDebug(s"Received time boundary for table $tableName,
$decodedResponse")
@@ -180,13 +183,15 @@ private[pinot] object PinotClusterClient extends Logging {
Try {
val uri = new URI(String.format(INSTANCES_API_TEMPLATE, controllerUrl,
instance))
val response = HttpUtils.sendGetRequest(uri)
- decodeTo[InstanceInfo](response)
+
+ // Use the updated decodeTo function with Jackson
+ decodeTo(response, classOf[InstanceInfo])
} match {
- case Success(decodedReponse) =>
- decodedReponse
+ case Success(decodedResponse) =>
+ decodedResponse
case Failure(exception) =>
throw PinotException(
- s"An error occured while reading instance info for: '$instance'",
+ s"An error occurred while reading instance info for: '$instance'",
exception
)
}
@@ -197,7 +202,8 @@ private[pinot] object PinotClusterClient extends Logging {
val encodedSqlQueryParam = URLEncoder.encode(sql, "UTF-8")
val uri = new URI(String.format(ROUTING_TABLE_TEMPLATE, brokerUrl,
encodedSqlQueryParam))
val response = HttpUtils.sendGetRequest(uri)
- decodeTo[Map[String, List[String]]](response)
+
+ decodeTo(response, classOf[Map[String, List[String]]])
} match {
case Success(decodedResponse) =>
logDebug(s"Received routing table for query $sql, $decodedResponse")
@@ -218,6 +224,7 @@ private[pinot] case class TimeBoundaryInfo(timeColumn:
String, timeValue: String
def getRealtimePredicate: String = s""""$timeColumn" >= $timeValue"""
}
+@JsonIgnoreProperties(ignoreUnknown = true)
private[pinot] case class InstanceInfo(instanceName: String,
hostName: String,
port: String,
diff --git
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
index 0c3fecb80d..d253f1673f 100644
---
a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
+++
b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/package.scala
@@ -20,17 +20,21 @@ package org.apache.pinot.connector.spark
import java.util.Optional
-import io.circe.{Decoder, parser}
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
package object common {
+ // Define a singleton ObjectMapper instance
+ private val objectMapper: ObjectMapper = new
ObjectMapper().registerModule(DefaultScalaModule)
+
/** Parse json string to given model. */
- def decodeTo[A: Decoder](jsonString: String): A = {
- parser.decode[A](jsonString) match {
- case Left(error) =>
- throw new IllegalStateException(s"Error occurred while parsing json
string, $error")
- case Right(value) =>
- value
+ def decodeTo[A](jsonString: String, clazz: Class[A]): A = {
+ try {
+ objectMapper.readValue(jsonString, clazz)
+ } catch {
+ case e: Exception =>
+ throw new IllegalStateException(s"Error occurred while parsing JSON
string: ${e.getMessage}", e)
}
}
diff --git a/pom.xml b/pom.xml
index e8612834cd..4f1973bbd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,7 +184,6 @@
<jbcrypt.version>0.4</jbcrypt.version>
<plexus-classworlds.version>2.8.0</plexus-classworlds.version>
<scala-xml.version>2.3.0</scala-xml.version>
- <circe.version>0.14.12</circe.version>
<spark2.version>2.4.8</spark2.version>
<spark3.version>3.5.3</spark3.version>
@@ -1629,16 +1628,6 @@
<artifactId>scala-xml_${scala.compat.version}</artifactId>
<version>${scala-xml.version}</version>
</dependency>
- <dependency>
- <groupId>io.circe</groupId>
- <artifactId>circe-parser_${scala.compat.version}</artifactId>
- <version>${circe.version}</version>
- </dependency>
- <dependency>
- <groupId>io.circe</groupId>
- <artifactId>circe-generic_${scala.compat.version}</artifactId>
- <version>${circe.version}</version>
- </dependency>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-classworlds</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]