This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new ec627119b [Bug] Fix impala plugin for client creation (#4776)
ec627119b is described below
commit ec627119bad6d3469cffa5503743f73c80b9c2f0
Author: Knypys <[email protected]>
AuthorDate: Mon Jul 17 16:07:38 2023 +0800
[Bug] Fix impala plugin for client creation (#4776)
* fix usage docs
* fix engine for linkis-storage type change
* fix impala executer bugs
* fix impala client creation
---
.../client/thrift/ImpalaThriftSessionFactory.java | 2 +-
.../impala/executor/ImpalaEngineConnExecutor.scala | 67 ++++++++--------------
2 files changed, 24 insertions(+), 45 deletions(-)
diff --git
a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
index fd6eb0e9f..472564253 100644
---
a/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
+++
b/linkis-engineconn-plugins/impala/src/main/java/org/apache/linkis/engineplugin/impala/client/thrift/ImpalaThriftSessionFactory.java
@@ -90,7 +90,7 @@ public class ImpalaThriftSessionFactory {
}
if (sessions.isEmpty()) {
- throw new IllegalArgumentException("invalid hosts: " +
StringUtils.join(hosts, ','));
+ throw new IllegalArgumentException("Invalid hosts: " +
StringUtils.join(hosts, ','));
}
this.socketFactory = socketFactory;
diff --git
a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
index d9cf13a3c..23cd1a0e6 100644
---
a/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/impala/src/main/scala/org/apache/linkis/engineplugin/impala/executor/ImpalaEngineConnExecutor.scala
@@ -17,73 +17,44 @@
package org.apache.linkis.engineplugin.impala.executor
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
-import org.apache.linkis.engineconn.common.password.{
- CommandPasswordCallback,
- StaticPasswordCallback
-}
-import org.apache.linkis.engineconn.computation.executor.execute.{
- ConcurrentComputationExecutor,
- EngineExecutionContext
-}
+import org.apache.linkis.engineconn.common.password.{CommandPasswordCallback,
StaticPasswordCallback}
+import
org.apache.linkis.engineconn.computation.executor.execute.{ConcurrentComputationExecutor,
EngineExecutionContext}
import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineplugin.impala.client.{
- ExecutionListener,
- ImpalaClient,
- ImpalaResultSet
-}
import org.apache.linkis.engineplugin.impala.client.ImpalaResultSet.Row
-import org.apache.linkis.engineplugin.impala.client.exception.{
- ImpalaEngineException,
- ImpalaErrorCodeSummary
-}
+import
org.apache.linkis.engineplugin.impala.client.exception.{ImpalaEngineException,
ImpalaErrorCodeSummary}
import org.apache.linkis.engineplugin.impala.client.protocol.{ExecProgress,
ExecStatus}
-import org.apache.linkis.engineplugin.impala.client.thrift.{
- ImpalaThriftClient,
- ImpalaThriftSessionFactory
-}
+import
org.apache.linkis.engineplugin.impala.client.thrift.{ImpalaThriftClient,
ImpalaThriftSessionFactory}
+import org.apache.linkis.engineplugin.impala.client.{ExecutionListener,
ImpalaClient, ImpalaResultSet}
import org.apache.linkis.engineplugin.impala.conf.ImpalaConfiguration._
import org.apache.linkis.engineplugin.impala.conf.ImpalaEngineConfig
import org.apache.linkis.governance.common.paser.SQLCodeParser
-import org.apache.linkis.manager.common.entity.resource.{
- CommonNodeResource,
- LoadResource,
- NodeResource
-}
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource,
LoadResource, NodeResource}
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{EngineTypeLabel,
UserCreatorLabel}
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
- CompletedExecuteResponse,
- ErrorExecuteResponse,
- ExecuteResponse,
- SuccessExecuteResponse
-}
+import org.apache.linkis.scheduler.executer.{CompletedExecuteResponse,
ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
import org.apache.linkis.storage.domain.Column
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-
import org.springframework.util.CollectionUtils
-import javax.net.SocketFactory
-import javax.net.ssl._
-import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback,
PasswordCallback}
-
import java.io.FileInputStream
import java.security.KeyStore
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
-
+import javax.net.SocketFactory
+import javax.net.ssl._
+import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback,
PasswordCallback}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
class ImpalaEngineConnExecutor(override val outputPrintLimit: Int, val id: Int)
extends ConcurrentComputationExecutor(outputPrintLimit) {
@@ -308,7 +279,7 @@ class ImpalaEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
engineExecutionContext.getLabels.find(_.isInstanceOf[EngineTypeLabel]).get
var configMap: util.Map[String, String] = null
if (userCreatorLabel != null && engineTypeLabel != null) {
- configMap = Utils.tryAndError(
+ configMap = Utils.tryAndWarn(
ImpalaEngineConfig.getCacheMap(
(
userCreatorLabel.asInstanceOf[UserCreatorLabel],
@@ -317,6 +288,14 @@ class ImpalaEngineConnExecutor(override val
outputPrintLimit: Int, val id: Int)
)
)
}
+ if (configMap == null) {
+ configMap = new util.HashMap[String, String]()
+ }
+
+ val properties =
engineExecutionContext.getProperties.asInstanceOf[util.Map[String, String]]
+ if (MapUtils.isNotEmpty(properties)) {
+ configMap.putAll(properties)
+ }
val impalaServers = IMPALA_SERVERS.getValue(configMap)
val impalaMaxConnections = IMPALA_MAX_CONNECTIONS.getValue(configMap)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]