This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1d86a67 [Improve] Use Spark's Logging instead of explicit usage of
log4j (#84)
1d86a67 is described below
commit 1d86a6794ab9b2fbd94930deb522250fb8e6103a
Author: Bowen Liang <[email protected]>
AuthorDate: Fri Mar 31 18:16:14 2023 +0800
[Improve] Use Spark's Logging instead of explicit usage of log4j (#84)
* replace org.apache.log4j.Logger by using spark's Logging
---
.../apache/doris/spark/rdd/ScalaValueReader.scala | 23 +++++++++++-----------
1 file changed, 11 insertions(+), 12 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 03643b2..b196355 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -34,7 +34,7 @@ import org.apache.doris.spark.sql.SchemaUtils
import org.apache.doris.spark.util.ErrorMessages
import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams,
TScanOpenParams, TScanOpenResult}
-import org.apache.log4j.Logger
+import org.apache.spark.internal.Logging
import scala.util.control.Breaks
@@ -43,8 +43,7 @@ import scala.util.control.Breaks
* @param partition Doris RDD partition
* @param settings request configuration
*/
-class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
- private val logger = Logger.getLogger(classOf[ScalaValueReader])
+class ScalaValueReader(partition: PartitionDefinition, settings: Settings)
extends Logging{
protected val client = new BackendClient(new
Routing(partition.getBeAddress), settings)
protected val clientLock =
@@ -57,7 +56,7 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) {
protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC,
DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
} getOrElse {
- logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE,
DORIS_DESERIALIZE_ARROW_ASYNC,
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))
+ logWarning(String.format(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE,
DORIS_DESERIALIZE_ARROW_ASYNC,
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)))
DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
}
@@ -65,7 +64,7 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) {
val blockingQueueSize = Try {
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE,
DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt
} getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_DESERIALIZE_QUEUE_SIZE,
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE))
+ logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_DESERIALIZE_QUEUE_SIZE,
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)))
DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
}
@@ -89,21 +88,21 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) {
val batchSize = Try {
settings.getProperty(DORIS_BATCH_SIZE,
DORIS_BATCH_SIZE_DEFAULT.toString).toInt
} getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE))
- DORIS_BATCH_SIZE_DEFAULT
+ logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE)))
+ DORIS_BATCH_SIZE_DEFAULT
}
val queryDorisTimeout = Try {
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S,
DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt
} getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_REQUEST_QUERY_TIMEOUT_S,
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))
+ logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_REQUEST_QUERY_TIMEOUT_S,
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)))
DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
}
val execMemLimit = Try {
settings.getProperty(DORIS_EXEC_MEM_LIMIT,
DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong
} getOrElse {
- logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT))
+ logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT)))
DORIS_EXEC_MEM_LIMIT_DEFAULT
}
@@ -113,7 +112,7 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) {
params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, ""))
params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""))
- logger.debug(s"Open scan params is, " +
+ logDebug(s"Open scan params is, " +
s"cluster: ${params.getCluster}, " +
s"database: ${params.getDatabase}, " +
s"table: ${params.getTable}, " +
@@ -159,7 +158,7 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) {
started
}
- logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.")
+ logDebug(s"Open scan result is, contextId: $contextId, schema: $schema.")
/**
* read data and cached in rowBatch.
@@ -213,7 +212,7 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) {
*/
def next: AnyRef = {
if (!hasNext) {
- logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
+ logError(SHOULD_NOT_HAPPEN_MESSAGE)
throw new ShouldNeverHappenException
}
rowBatch.next
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]