This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 4323c0ea6 [Bug] Fix outdated info method (#3958)
4323c0ea6 is described below
commit 4323c0ea67b08b07c758831b2f8d931665b06da2
Author: GuoPhilipse <[email protected]>
AuthorDate: Mon Dec 5 18:52:21 2022 +0800
[Bug] Fix outdated info method (#3958)
---
.../executor/client/impl/ElasticSearchExecutorImpl.scala | 2 +-
.../executor/SeatunnelFlinkOnceCodeExecutor.scala | 13 +++++++------
.../executor/SeatunnelFlinkSQLOnceCodeExecutor.scala | 13 +++++++------
.../executor/SeatunnelSparkOnceCodeExecutor.scala | 15 ++++++++-------
.../service/impl/JobHistoryQueryServiceImpl.scala | 2 +-
5 files changed, 24 insertions(+), 21 deletions(-)
diff --git
a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala
b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala
index 8d910c023..86d10660b 100644
---
a/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala
+++
b/linkis-engineconn-plugins/elasticsearch/src/main/scala/org/apache/linkis/engineplugin/elasticsearch/executor/client/impl/ElasticSearchExecutorImpl.scala
@@ -67,7 +67,7 @@ class ElasticSearchExecutorImpl(runType: String, properties:
util.Map[String, St
override def executeLine(code: String): ElasticSearchResponse = {
val realCode = code.trim()
- info(s"es client begins to run $runType code:\n ${realCode.trim}")
+ logger.info(s"es client begins to run $runType code:\n ${realCode.trim}")
val countDown = new CountDownLatch(1)
var executeResponse: ElasticSearchResponse =
ElasticSearchErrorResponse("INCOMPLETE")
cancelable = client.execute(
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
index 3e9cb1cda..22538e0a8 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
@@ -80,7 +80,7 @@ class SeatunnelFlinkOnceCodeExecutor(
.asInstanceOf[util.Map[String, String]]
future = Utils.defaultScheduler.submit(new Runnable {
override def run(): Unit = {
- info("Try to execute codes." + code)
+ logger.info("Try to execute codes." + code)
if (runCode(code) != 0) {
isFailed = true
setResponse(
@@ -91,7 +91,7 @@ class SeatunnelFlinkOnceCodeExecutor(
)
tryFailed()
}
- info("All codes completed, now stop SeatunnelEngineConn.")
+ logger.info("All codes completed, now stop SeatunnelEngineConn.")
closeDaemon()
if (!isFailed) {
trySucceed()
@@ -102,7 +102,7 @@ class SeatunnelFlinkOnceCodeExecutor(
}
protected def runCode(code: String): Int = {
- info("Execute SeatunnelFlink Process")
+ logger.info("Execute SeatunnelFlink Process")
var args: Array[String] = Array.empty
val flinkRunMode = LINKIS_FLINK_RUNMODE.getValue
@@ -135,17 +135,17 @@ class SeatunnelFlinkOnceCodeExecutor(
new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) +
"/seatunnel").toPath,
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
- info(s"Execute SeatunnelFlink Process end args:${args.mkString(" ")}")
+ logger.info(s"Execute SeatunnelFlink Process end args:${args.mkString("
")}")
LinkisSeatunnelFlinkClient.main(args)
}
override protected def waitToRunning(): Unit = {
- if (!isCompleted)
+ if (!isCompleted) {
daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = {
if (!(future.isDone || future.isCancelled)) {
- info("The Seatunnel Flink Process In Running")
+ logger.info("The Seatunnel Flink Process In Running")
}
}
},
@@ -153,6 +153,7 @@ class SeatunnelFlinkOnceCodeExecutor(
SeatunnelEnvConfiguration.SEATUNNEL_STATUS_FETCH_INTERVAL.getValue.toLong,
TimeUnit.MILLISECONDS
)
+ }
}
override def getCurrentNodeResource(): NodeResource = {
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
index a61c70648..b5651a5d9 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
@@ -80,7 +80,7 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
.asInstanceOf[util.Map[String, String]]
future = Utils.defaultScheduler.submit(new Runnable {
override def run(): Unit = {
- info("Try to execute codes." + code)
+ logger.info("Try to execute codes." + code)
if (runCode(code) != 0) {
isFailed = true
setResponse(
@@ -91,7 +91,7 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
)
tryFailed()
}
- info("All codes completed, now stop SeatunnelEngineConn.")
+ logger.info("All codes completed, now stop SeatunnelEngineConn.")
closeDaemon()
if (!isFailed) {
trySucceed()
@@ -102,7 +102,7 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
}
protected def runCode(code: String): Int = {
- info("Execute SeatunnelFlink Process")
+ logger.info("Execute SeatunnelFlink Process")
var args: Array[String] = Array.empty
val flinkRunMode = LINKIS_FLINK_RUNMODE.getValue
@@ -135,17 +135,17 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) +
"/seatunnel").toPath,
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
- info(s"Execute SeatunnelFlinkSQL Process end args:${args.mkString(" ")}")
+ logger.info(s"Execute SeatunnelFlinkSQL Process end args:${args.mkString("
")}")
LinkisSeatunnelFlinkSQLClient.main(args)
}
override protected def waitToRunning(): Unit = {
- if (!isCompleted)
+ if (!isCompleted) {
daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = {
if (!(future.isDone || future.isCancelled)) {
- info("The SeatunnelFlinkSQL Process In Running")
+ logger.info("The SeatunnelFlinkSQL Process In Running")
}
}
},
@@ -153,6 +153,7 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
SeatunnelEnvConfiguration.SEATUNNEL_STATUS_FETCH_INTERVAL.getValue.toLong,
TimeUnit.MILLISECONDS
)
+ }
}
override def getCurrentNodeResource(): NodeResource = {
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
index 17728bea4..1e3bdd5d8 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
@@ -69,7 +69,7 @@ class SeatunnelSparkOnceCodeExecutor(
.asInstanceOf[util.Map[String, String]]
future = Utils.defaultScheduler.submit(new Runnable {
override def run(): Unit = {
- info("Try to execute codes." + code)
+ logger.info("Try to execute codes." + code)
if (runCode(code) != 0) {
isFailed = true
setResponse(
@@ -80,7 +80,7 @@ class SeatunnelSparkOnceCodeExecutor(
)
tryFailed()
}
- info("All codes completed, now stop SeatunnelEngineConn.")
+ logger.info("All codes completed, now stop SeatunnelEngineConn.")
closeDaemon()
if (!isFailed) {
trySucceed()
@@ -91,7 +91,7 @@ class SeatunnelSparkOnceCodeExecutor(
}
protected def runCode(code: String): Int = {
- info("Execute SeatunnelSpark Process")
+ logger.info("Execute SeatunnelSpark Process")
val masterKey = LINKIS_SPARK_MASTER.getValue
val deployModeKey = LINKIS_SPARK_DEPLOY_MODE.getValue
@@ -109,22 +109,22 @@ class SeatunnelSparkOnceCodeExecutor(
args = localArray(code)
}
- System.setProperty("SEATUNNEL_HOME",
System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue));
+ System.setProperty("SEATUNNEL_HOME",
System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue))
Files.createSymbolicLink(
new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) +
"/seatunnel").toPath,
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
- info(s"Execute SeatunnelSpark Process end args:${args.mkString(" ")}")
+ logger.info(s"Execute SeatunnelSpark Process end args:${args.mkString("
")}")
LinkisSeatunnelSparkClient.main(args)
}
override protected def waitToRunning(): Unit = {
- if (!isCompleted)
+ if (!isCompleted) {
daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = {
if (!(future.isDone || future.isCancelled)) {
- info("The Seatunnel Spark Process In Running")
+ logger.info("The Seatunnel Spark Process In Running")
}
}
},
@@ -132,6 +132,7 @@ class SeatunnelSparkOnceCodeExecutor(
SeatunnelEnvConfiguration.SEATUNNEL_STATUS_FETCH_INTERVAL.getValue.toLong,
TimeUnit.MILLISECONDS
)
+ }
}
override def getCurrentNodeResource(): NodeResource = {
diff --git
a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index 73e58b3ea..464d65b2b 100644
---
a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++
b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -286,7 +286,7 @@ class JobHistoryQueryServiceImpl extends
JobHistoryQueryService with Logging {
fakeLabel.setCreator(creator)
val userCreator = fakeLabel.getStringValue
Utils.tryCatch(fakeLabel.valueCheck(userCreator)) { t =>
- info("input user or creator is not correct", t)
+ logger.info("input user or creator is not correct", t)
throw t
}
jobHistoryMapper.searchWithUserCreator(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]