This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new 86664ec87 Opt format code (#2458)
86664ec87 is described below
commit 86664ec877ce32bf75e81802ee1be3a758b1936e
Author: Jack Xu <[email protected]>
AuthorDate: Sat Jul 16 21:50:37 2022 +0800
Opt format code (#2458)
* refactor: format somecode
---
.../linkis/rpc/message/method/MessageExecutor.java | 2 +-
.../rpc/message/method/MethodExecuteWrapper.java | 2 +-
.../message/utils/LoadBalancerOptionsUtils.java | 3 +-
.../rpc/serializer/ProtostuffSerializeUtil.java | 3 +-
.../apache/linkis/rpc/conf/RPCConfiguration.scala | 44 +++++++++---------
.../linkis/rpc/conf/RPCSpringConfiguration.scala | 8 ++--
.../sender/eureka/EurekaClientRefreshUtils.scala | 4 +-
.../rpc/transform/JavaCollectionSerializer.scala | 4 +-
.../apache/linkis/rpc/transform/RPCConsumer.scala | 9 +---
.../apache/linkis/rpc/transform/RPCProduct.scala | 20 +++------
.../org/apache/linkis/rpc/utils/RPCUtils.scala | 2 +-
linkis-commons/linkis-storage/pom.xml | 1 -
.../org/apache/linkis/storage/FSFactory.scala | 15 +++----
.../org/apache/linkis/storage/LineMetaData.scala | 4 +-
.../org/apache/linkis/storage/LineRecord.scala | 4 +-
.../linkis/storage/conf/LinkisStorageConf.scala | 4 +-
.../linkis/storage/csv/StorageCSVWriter.scala | 11 +++--
.../apache/linkis/storage/domain/DataType.scala | 34 +++++++-------
.../org/apache/linkis/storage/domain/Dolphin.scala | 20 ++++-----
.../linkis/storage/domain/MethodEntity.scala | 12 ++---
.../linkis/storage/excel/StorageExcelWriter.scala | 16 +++----
.../storage/excel/StorageMultiExcelWriter.scala | 2 +-
.../org/apache/linkis/storage/io/IOClient.scala | 20 +++++----
.../resultset/DefaultResultSetFactory.scala | 20 ++++-----
.../storage/resultset/ResultSetFactory.scala | 14 +++---
.../linkis/storage/resultset/ResultSetReader.scala | 7 ++-
.../storage/resultset/StorageResultSet.scala | 9 ++--
.../storage/resultset/StorageResultSetReader.scala | 37 ++++++++-------
.../linkis/storage/resultset/io/IOMetaData.scala | 2 +-
.../linkis/storage/resultset/io/IORecord.scala | 2 +-
.../storage/resultset/io/IOResultSerializer.scala | 2 +-
.../storage/resultset/table/TableRecord.scala | 6 +--
.../resultset/table/TableResultDeserializer.scala | 15 ++++---
.../resultset/table/TableResultSerializer.scala | 16 +++----
.../resultset/txt/TextResultSerializer.scala | 4 +-
.../linkis/storage/utils/FileSystemUtils.scala | 2 -
.../storage/utils/StorageConfiguration.scala | 3 +-
.../linkis/storage/utils/StorageHelper.scala | 22 ++++-----
.../apache/linkis/storage/utils/StorageUtils.scala | 52 +++++++++++-----------
.../computation/monitor/EngineConnMonitor.scala | 2 -
.../physical/CodeLogicalUnitExecTask.scala | 2 +-
41 files changed, 216 insertions(+), 245 deletions(-)
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/method/MessageExecutor.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/method/MessageExecutor.java
index 24dba552f..4b9750c0b 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/method/MessageExecutor.java
+++
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/method/MessageExecutor.java
@@ -72,7 +72,7 @@ public class MessageExecutor {
if (methodExecuteWrappers.size() == 1) {
MethodExecuteWrapper methodWrapper = methodExecuteWrappers.get(0);
try {
- if (!methodWrapper.shouldSkip) {
+ if (!methodWrapper.isShouldSkip()) {
Method method = methodWrapper.getMethod();
Object service = methodWrapper.getService();
if (methodWrapper.isHasSender()) {
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/method/MethodExecuteWrapper.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/method/MethodExecuteWrapper.java
index b8adde62b..8a609f165 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/method/MethodExecuteWrapper.java
+++
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/method/MethodExecuteWrapper.java
@@ -29,7 +29,7 @@ public class MethodExecuteWrapper {
private final ServiceMethod serviceMethod;
- public boolean shouldSkip;
+ private boolean shouldSkip;
public boolean isShouldSkip() {
return shouldSkip;
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java
index f4d3adfbc..abf2310d1 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java
+++
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java
@@ -30,7 +30,7 @@ public class LoadBalancerOptionsUtils {
private static Object locker = new Object();
public static Options getDefaultOptions() throws NoSuchFieldException,
IllegalAccessException {
- if (null == DEFAULT_OPTIONS)
+ if (null == DEFAULT_OPTIONS) {
synchronized (locker) {
Class<?> clazz = LoadBalancerFeignClient.class;
Field optionField = clazz.getDeclaredField("DEFAULT_OPTIONS");
@@ -38,6 +38,7 @@ public class LoadBalancerOptionsUtils {
Object o = optionField.get(clazz);
DEFAULT_OPTIONS = (Options) o;
}
+ }
return DEFAULT_OPTIONS;
}
}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/serializer/ProtostuffSerializeUtil.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/serializer/ProtostuffSerializeUtil.java
index 0dfed43cc..00bf25296 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/serializer/ProtostuffSerializeUtil.java
+++
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/serializer/ProtostuffSerializeUtil.java
@@ -41,8 +41,7 @@ public class ProtostuffSerializeUtil {
idStrategy.registerDelegate(TIMESTAMP_DELEGATE);
}
- private static Map<Class<?>, Schema<?>> schemaCache =
- new ConcurrentHashMap<Class<?>, Schema<?>>();
+ private static Map<Class<?>, Schema<?>> schemaCache = new
ConcurrentHashMap<>();
public static <T> String serialize(T obj) {
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala
index c3e7d38ee..ed441cdb2 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala
@@ -24,39 +24,39 @@ import org.reflections.scanners.{MethodAnnotationsScanner,
SubTypesScanner, Type
object RPCConfiguration {
- val BDP_RPC_BROADCAST_THREAD_SIZE =
CommonVars("wds.linkis.rpc.broadcast.thread.num", new Integer(25))
+ val BDP_RPC_BROADCAST_THREAD_SIZE: CommonVars[Integer] =
CommonVars("wds.linkis.rpc.broadcast.thread.num", new Integer(25))
- val BDP_RPC_EUREKA_SERVICE_REFRESH_INTERVAL =
CommonVars("wds.linkis.rpc.eureka.client.refresh.interval", new TimeType("1s"))
- val BDP_RPC_EUREKA_SERVICE_REFRESH_MAX_WAIT_TIME =
CommonVars("wds.linkis.rpc.eureka.client.refresh.wait.time.max", new
TimeType("30s"))
- val BDP_RPC_RECEIVER_ASYN_CONSUMER_THREAD_MAX =
CommonVars("wds.linkis.rpc.receiver.asyn.consumer.thread.max", 400)
- val BDP_RPC_RECEIVER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX =
CommonVars("wds.linkis.rpc.receiver.asyn.consumer.freeTime.max", new
TimeType("2m"))
- val BDP_RPC_RECEIVER_ASYN_QUEUE_CAPACITY =
CommonVars("wds.linkis.rpc.receiver.asyn.queue.size.max", 5000)
+ val BDP_RPC_EUREKA_SERVICE_REFRESH_INTERVAL: CommonVars[TimeType] =
CommonVars("wds.linkis.rpc.eureka.client.refresh.interval", new TimeType("1s"))
+ val BDP_RPC_EUREKA_SERVICE_REFRESH_MAX_WAIT_TIME: CommonVars[TimeType] =
CommonVars("wds.linkis.rpc.eureka.client.refresh.wait.time.max", new
TimeType("30s"))
+ val BDP_RPC_RECEIVER_ASYN_CONSUMER_THREAD_MAX: CommonVars[Int] =
CommonVars("wds.linkis.rpc.receiver.asyn.consumer.thread.max", 400)
+ val BDP_RPC_RECEIVER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX:
CommonVars[TimeType] =
CommonVars("wds.linkis.rpc.receiver.asyn.consumer.freeTime.max", new
TimeType("2m"))
+ val BDP_RPC_RECEIVER_ASYN_QUEUE_CAPACITY: CommonVars[Int] =
CommonVars("wds.linkis.rpc.receiver.asyn.queue.size.max", 5000)
- val BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX =
CommonVars("wds.linkis.rpc.sender.asyn.consumer.thread.max", 100)
- val BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX =
CommonVars("wds.linkis.rpc.sender.asyn.consumer.freeTime.max", new
TimeType("2m"))
- val BDP_RPC_SENDER_ASYN_QUEUE_CAPACITY =
CommonVars("wds.linkis.rpc.sender.asyn.queue.size.max", 2000)
+ val BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX: CommonVars[Int] =
CommonVars("wds.linkis.rpc.sender.asyn.consumer.thread.max", 100)
+ val BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX: CommonVars[TimeType]
= CommonVars("wds.linkis.rpc.sender.asyn.consumer.freeTime.max", new
TimeType("2m"))
+ val BDP_RPC_SENDER_ASYN_QUEUE_CAPACITY: CommonVars[Int] =
CommonVars("wds.linkis.rpc.sender.asyn.queue.size.max", 2000)
- val ENABLE_PUBLIC_SERVICE =
CommonVars("wds.linkis.gateway.conf.enable.publicservice", true)
- val PUBLIC_SERVICE_APPLICATION_NAME =
CommonVars("wds.linkis.gateway.conf.publicservice.name",
"linkis-ps-publicservice")
- val PUBLIC_SERVICE_LIST =
CommonVars("wds.linkis.gateway.conf.publicservice.list",
"query,jobhistory,application,configuration,filesystem,udf,variable,microservice,errorcode,bml,datasource").getValue.split(",")
+ val ENABLE_PUBLIC_SERVICE: CommonVars[Boolean] =
CommonVars("wds.linkis.gateway.conf.enable.publicservice", true)
+ val PUBLIC_SERVICE_APPLICATION_NAME: CommonVars[String] =
CommonVars("wds.linkis.gateway.conf.publicservice.name",
"linkis-ps-publicservice")
+ val PUBLIC_SERVICE_LIST: Array[String] =
CommonVars("wds.linkis.gateway.conf.publicservice.list",
"query,jobhistory,application,configuration,filesystem,udf,variable,microservice,errorcode,bml,datasource").getValue.split(",")
- val METADATAQUERY_SERVICE_APPLICATION_NAME =
CommonVars("wds.linkis.gateway.conf.publicservice.name",
"linkis-ps-metadataquery")
- val METADATAQUERY_SERVICE_LIST =
CommonVars("wds.linkis.gateway.conf.metadataquery.list",
"metadatamanager,metadataquery").getValue.split(",")
+ val METADATAQUERY_SERVICE_APPLICATION_NAME: CommonVars[String] =
CommonVars("wds.linkis.gateway.conf.publicservice.name",
"linkis-ps-metadataquery")
+ val METADATAQUERY_SERVICE_LIST: Array[String] =
CommonVars("wds.linkis.gateway.conf.metadataquery.list",
"metadatamanager,metadataquery").getValue.split(",")
- val PUBLIC_SERVICE_APP_PREFIX =
CommonVars("wds.linkis.gateway.conf.publicservice.name", "linkis-ps-").getValue
- val BDP_RPC_INSTANCE_ALIAS_SERVICE_REFRESH_INTERVAL =
CommonVars("wds.linkis.rpc.instancealias.refresh.interval", new TimeType("3s"))
+ val PUBLIC_SERVICE_APP_PREFIX: String =
CommonVars("wds.linkis.gateway.conf.publicservice.name", "linkis-ps-").getValue
+ val BDP_RPC_INSTANCE_ALIAS_SERVICE_REFRESH_INTERVAL: CommonVars[TimeType] =
CommonVars("wds.linkis.rpc.instancealias.refresh.interval", new TimeType("3s"))
- val CONTEXT_SERVICE_APPLICATION_NAME =
CommonVars("wds.linkis.gateway.conf.contextservice.name", "linkis-ps-cs")
+ val CONTEXT_SERVICE_APPLICATION_NAME: CommonVars[String] =
CommonVars("wds.linkis.gateway.conf.contextservice.name", "linkis-ps-cs")
- val ENABLE_LOCAL_MESSAGE =
CommonVars("wds.linkis.rpc.conf.enable.local.message", false)
- val LOCAL_APP_LIST = CommonVars("wds.linkis.rpc.conf.local.app.list",
"").getValue.split(",")
+ val ENABLE_LOCAL_MESSAGE: CommonVars[Boolean] =
CommonVars("wds.linkis.rpc.conf.enable.local.message", false)
+ val LOCAL_APP_LIST: Array[String] =
CommonVars("wds.linkis.rpc.conf.local.app.list", "").getValue.split(",")
- val SERVICE_SCAN_PACKAGE = CommonVars("wds.linkis.ms.service.scan.package",
"org.apache.linkis").getValue
+ val SERVICE_SCAN_PACKAGE: String =
CommonVars("wds.linkis.ms.service.scan.package", "org.apache.linkis").getValue
- val ENABLE_SPRING_PARAMS = CommonVars("wds.linkis.rpc.spring.params.enable",
false).getValue
+ val ENABLE_SPRING_PARAMS: Boolean =
CommonVars("wds.linkis.rpc.spring.params.enable", false).getValue
val REFLECTIONS = new Reflections(SERVICE_SCAN_PACKAGE, new
MethodAnnotationsScanner(), new TypeAnnotationsScanner(), new SubTypesScanner())
- val BDP_RPC_CACHE_CONF_EXPIRE_TIME =
CommonVars("wds.linkis.rpc.cache.expire.time", 120000L)
+ val BDP_RPC_CACHE_CONF_EXPIRE_TIME: CommonVars[Long] =
CommonVars("wds.linkis.rpc.cache.expire.time", 120000L)
}
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCSpringConfiguration.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCSpringConfiguration.scala
index 2d7b072f5..46c54db8c 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCSpringConfiguration.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCSpringConfiguration.scala
@@ -24,14 +24,13 @@ import org.apache.linkis.rpc.RPCReceiveRestful
import org.apache.linkis.rpc.interceptor.RPCServerLoader
import org.apache.linkis.rpc.sender.eureka.EurekaRPCServerLoader
import org.apache.linkis.server.conf.ServerConfiguration
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
import org.springframework.boot.autoconfigure.condition.{ConditionalOnClass,
ConditionalOnMissingBean}
import org.springframework.boot.context.event.ApplicationPreparedEvent
import org.springframework.cloud.openfeign.EnableFeignClients
import org.springframework.context.annotation.{Bean, Configuration}
import org.springframework.context.event.EventListener
-
@Configuration
@EnableFeignClients
class RPCSpringConfiguration extends Logging {
@@ -45,11 +44,12 @@ class RPCSpringConfiguration extends Logging {
def completeInitialize(applicationPreparedEvent: ApplicationPreparedEvent):
Unit = {
val restfulClasses =
ServerConfiguration.BDP_SERVER_RESTFUL_REGISTER_CLASSES.getValue
val rpcRestfulName =
applicationPreparedEvent.getApplicationContext.getBean(classOf[RPCReceiveRestful]).getClass.getName
- if(StringUtils.isEmpty(restfulClasses))
+ if (StringUtils.isEmpty(restfulClasses)) {
DataWorkCloudApplication.setProperty(ServerConfiguration.BDP_SERVER_RESTFUL_REGISTER_CLASSES.key,
rpcRestfulName)
- else
+ } else {
DataWorkCloudApplication.setProperty(ServerConfiguration.BDP_SERVER_RESTFUL_REGISTER_CLASSES.key,
restfulClasses +
"," + rpcRestfulName)
+ }
logger.info("DataWorkCloud RPC need register RPCReceiveRestful, now add it
to configuration.")
}
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/eureka/EurekaClientRefreshUtils.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/eureka/EurekaClientRefreshUtils.scala
index 7c73d94e9..73eafda99 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/eureka/EurekaClientRefreshUtils.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/eureka/EurekaClientRefreshUtils.scala
@@ -30,10 +30,10 @@ class EurekaClientRefreshUtils {
@Autowired
private var eurekaClient: EurekaClient = _
- private var eurekaClientLastRefreshTime = 0l
+ private var eurekaClientLastRefreshTime = 0L
val serviceRefreshInterval =
RPCConfiguration.BDP_RPC_EUREKA_SERVICE_REFRESH_INTERVAL.getValue.toLong
- private[eureka] def refreshEurekaClient(): Unit =
if(System.currentTimeMillis - eurekaClientLastRefreshTime >
serviceRefreshInterval) synchronized {
+ private[eureka] def refreshEurekaClient(): Unit = if
(System.currentTimeMillis - eurekaClientLastRefreshTime >
serviceRefreshInterval) synchronized {
if (System.currentTimeMillis - eurekaClientLastRefreshTime <
serviceRefreshInterval) return
eurekaClientLastRefreshTime = System.currentTimeMillis
eurekaClient match {
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/JavaCollectionSerializer.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/JavaCollectionSerializer.scala
index 30b2f1953..85a240e42 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/JavaCollectionSerializer.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/JavaCollectionSerializer.scala
@@ -24,7 +24,7 @@ import org.json4s.{CustomSerializer, JArray, JObject}
//TODO is now only the simplest implementation, and there is a need to
optimize it later.(TODO 现在只做最简单的实现,后续有需要再优化)
-object JavaCollectionSerializer extends
CustomSerializer[java.util.List[_]](implicit formats => ( {
+object JavaCollectionSerializer extends
CustomSerializer[java.util.List[_]](implicit formats => ({
case j: JArray => BDPJettyServerHelper.gson.fromJson(write(j),
classOf[java.util.List[_]])
}, {
case list: java.util.List[_] => parse(BDPJettyServerHelper.gson.toJson(list))
@@ -32,7 +32,7 @@ object JavaCollectionSerializer extends
CustomSerializer[java.util.List[_]](impl
)
)
-object JavaMapSerializer extends CustomSerializer[java.util.Map[_,
_]](implicit formats => ( {
+object JavaMapSerializer extends CustomSerializer[java.util.Map[_,
_]](implicit formats => ({
case j: JObject => BDPJettyServerHelper.gson.fromJson(write(j),
classOf[java.util.Map[_, _]])
}, {
case map: java.util.Map[_, _] => parse(BDPJettyServerHelper.gson.toJson(map))
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
index bbe5c40f7..5b1af63f6 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
@@ -50,14 +50,7 @@ private[linkis] object RPCConsumer {
exception
case t: Throwable => t
}
-// if (null != data.get(IS_REQUEST_PROTOCOL_CLASS) &&
data.get(IS_REQUEST_PROTOCOL_CLASS).toString.toBoolean) {
- ProtostuffSerializeUtil.deserialize(objectStr, clazz)
- /*} else if (data.get(IS_SCALA_CLASS).toString.toBoolean) {
- val realClass = getSerializableScalaClass(clazz)
- Serialization.read(objectStr)(formats,
ManifestFactory.classType(realClass))
- } else {
- BDPJettyServerHelper.gson.fromJson(objectStr, clazz)
- }*/
+ ProtostuffSerializeUtil.deserialize(objectStr, clazz)
case 4 =>
val errorMsg =
message.getData.get(EXCEPTION_MSG).asInstanceOf[JMap[String, Object]]
ExceptionManager.generateException(errorMsg)
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCProduct.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCProduct.scala
index 57ea35de2..5297680c6 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCProduct.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCProduct.scala
@@ -19,7 +19,6 @@ package org.apache.linkis.rpc.transform
import java.lang.reflect.{ParameterizedType, Type}
import java.util
-
import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.protocol.message.RequestProtocol
@@ -27,10 +26,10 @@ import org.apache.linkis.rpc.errorcode.RPCErrorConstants
import org.apache.linkis.rpc.exception.DWCURIException
import org.apache.linkis.rpc.serializer.ProtostuffSerializeUtil
import org.apache.linkis.server.{EXCEPTION_MSG, Message}
-import org.apache.commons.lang.ClassUtils
+import org.apache.commons.lang3.ClassUtils
import org.json4s.{DefaultFormats, Formats, Serializer}
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters.mapAsScalaMapConverter
private[linkis] trait RPCProduct {
@@ -42,6 +41,7 @@ private[linkis] trait RPCProduct {
def ok(): Message
}
+
private[linkis] object RPCProduct extends Logging {
private[rpc] val IS_REQUEST_PROTOCOL_CLASS = "rpc_is_request_protocol"
@@ -53,7 +53,7 @@ private[linkis] object RPCProduct extends Logging {
private val rpcProduct: RPCProduct = new RPCProduct {
private val rpcFormats =
DataWorkCloudApplication.getApplicationContext.getBeansOfType(classOf[RPCFormats])
if (rpcFormats != null && !rpcFormats.isEmpty) {
- val serializers =
JavaConversions.mapAsScalaMap(rpcFormats).map(_._2.getSerializers).toArray.flatMap(_.iterator)
+ val serializers =
rpcFormats.asScala.map(_._2.getSerializers).toArray.flatMap(_.iterator)
setFormats(serializers)
}
override def toMessage(t: Any): Message = {
@@ -64,14 +64,7 @@ private[linkis] object RPCProduct extends Logging {
} else {
message.data(IS_REQUEST_PROTOCOL_CLASS, "false")
}
- message.data(OBJECT_VALUE, ProtostuffSerializeUtil.serialize(t))
- /*} else if (isScalaClass(t)) {
- message.data(IS_SCALA_CLASS, "true")
- message.data(OBJECT_VALUE, Serialization.write(t.asInstanceOf[AnyRef]))
- } else {
- message.data(IS_SCALA_CLASS, "false")
- message.data(OBJECT_VALUE, BDPJettyServerHelper.gson.toJson(t))
- }*/
+ message.data(OBJECT_VALUE, ProtostuffSerializeUtil.serialize(t))
message.setMethod("/rpc/message")
message.data(CLASS_VALUE, t.getClass.getName)
}
@@ -89,7 +82,8 @@ private[linkis] object RPCProduct extends Logging {
message
}
}
- private[rpc] def setFormats(serializer: Array[Serializer[_]]): Unit ={
+
+ private[rpc] def setFormats(serializer: Array[Serializer[_]]): Unit = {
this.formats = (serializer :+ JavaCollectionSerializer :+
JavaMapSerializer).foldLeft(DefaultFormats.asInstanceOf[Formats])(_ + _)
serializerClasses = formats.customSerializers.map(s =>
getActualTypeClass(s.getClass.getGenericSuperclass))
.filter(_ != null) ++: List(classOf[util.List[_]], classOf[util.Map[_,
_]])
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/utils/RPCUtils.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/utils/RPCUtils.scala
index d0a0b19f3..bbcb4281a 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/utils/RPCUtils.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/utils/RPCUtils.scala
@@ -58,7 +58,7 @@ object RPCUtils {
val services = SpringCloudFeignConfigurationCache.getDiscoveryClient
.getServices.filter(_.toLowerCase.contains(parsedServiceId.toLowerCase)).toList
if(services.length == 1) Some(services.head)
- else if(services.length > 1) tooManyDeal(services)
+ else if (services.length > 1) tooManyDeal(services)
else None
}
diff --git a/linkis-commons/linkis-storage/pom.xml
b/linkis-commons/linkis-storage/pom.xml
index e9e3d67d4..38ec44e39 100644
--- a/linkis-commons/linkis-storage/pom.xml
+++ b/linkis-commons/linkis-storage/pom.xml
@@ -36,7 +36,6 @@
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-common</artifactId>
<version>${linkis.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/FSFactory.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/FSFactory.scala
index 478987f33..da96bb313 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/FSFactory.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/FSFactory.scala
@@ -23,18 +23,17 @@ import
org.apache.linkis.storage.exception.StorageFatalException
import org.apache.linkis.storage.factory.BuildFactory
import org.apache.linkis.storage.utils.{StorageConfiguration, StorageUtils}
-
object FSFactory extends Logging{
private val buildClasses: Map[String, BuildFactory] =
StorageUtils.loadClass[BuildFactory](StorageConfiguration.STORAGE_BUILD_FS_CLASSES.getValue,
t => t.fsName())
def getBuildFactory(fsName: String): BuildFactory = {
- if(! buildClasses.contains(fsName)) throw new StorageFatalException(50000,
s"Unsupported file system type(不支持的文件系统类型):$fsName" )
+ if (!buildClasses.contains(fsName)) throw new StorageFatalException(50000,
s"Unsupported file system type(不支持的文件系统类型):$fsName" )
buildClasses(fsName)
}
- def getFs(fsType: String, proxyUser:String): Fs = {
+ def getFs(fsType: String, proxyUser: String): Fs = {
val user = StorageUtils.getJvmUser
getBuildFactory(fsType).getFs(user, proxyUser)
}
@@ -46,8 +45,8 @@ object FSFactory extends Logging{
/**
* 1. If this machine has shared storage, the file:// type FS obtained here
is the FS of the process user.
- * 2, if this machine does not have shared storage, then the file:// type FS
obtained is the proxy to the Remote (shared storage machine root) FS
- * 3. If it is HDFS, it returns the FS of the process user.
+ * 2, if this machine does not have shared storage, then the file:// type
FS obtained is the proxy to the Remote (shared storage machine root) FS
+ * 3. If it is HDFS, it returns the FS of the process user.
* 1、如果这台机器装有共享存储则这里获得的file://类型的FS为该进程用户的FS
* 2、如果这台机器没有共享存储则获得的file://类型的FS为代理到Remote(共享存储机器root)的FS
* 3、如果是HDFS则返回的就是该进程用户的FS
@@ -60,9 +59,9 @@ object FSFactory extends Logging{
/**
* 1. If the process user is passed and the proxy user and the process user
are consistent, the file:// type FS is the FS of the process user (the shared
storage exists)
- * * 2, if the process user is passed and the proxy user and the process
user are consistent and there is no shared storage, the file:// type FS is the
proxy to the remote (shared storage machine root) FS
- * * 3. If the passed proxy user and process user are consistent, the
hdfs type is the FS of the process user.
- * * 4. If the proxy user and the process user are inconsistent, the hdfs
type is the FS after the proxy.
+ * 2, if the process user is passed and the proxy user and the process user
are consistent and there is no shared storage, the file:// type FS is the proxy
to the remote (shared storage machine root) FS
+ * 3. If the passed proxy user and process user are consistent, the hdfs
type is the FS of the process user.
+ * 4. If the proxy user and the process user are inconsistent, the hdfs
type is the FS after the proxy.
* 1、如果传了进程用户且代理用户和进程用户一致则file://类型的FS为该进程用户的FS(存在共享存储)
* 2、如果传了进程用户且代理用户和进程用户一致且没有共享存储则file://类型的FS为代理到Remote(共享存储机器root)的FS
* 3、如果传了的代理用户和进程用户一致则hdfs类型为该进程用户的FS
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/LineMetaData.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/LineMetaData.scala
index fd08171e2..e720554d4 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/LineMetaData.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/LineMetaData.scala
@@ -23,9 +23,9 @@ import org.apache.linkis.storage.resultset.ResultMetaData
class LineMetaData(private var metaData: String = null) extends ResultMetaData{
- def getMetaData = metaData
+ def getMetaData: String = metaData
- def setMetaData(metaData: String):Unit ={
+ def setMetaData(metaData: String): Unit = {
this.metaData = metaData
}
override def cloneMeta(): MetaData = new LineMetaData(metaData)
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/LineRecord.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/LineRecord.scala
index 286301a01..a060aaceb 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/LineRecord.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/LineRecord.scala
@@ -23,8 +23,8 @@ import org.apache.linkis.storage.resultset.ResultRecord
class LineRecord(private var line: String) extends ResultRecord{
- def getLine = line
- def setLine(line:String): Unit = {
+ def getLine: String = line
+ def setLine(line: String): Unit = {
this.line = line
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
index 9d45a773d..b47103c55 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala
@@ -17,8 +17,8 @@
package org.apache.linkis.storage.conf
-import org.apache.commons.lang.StringUtils
-import org.apache.linkis.common.conf.{ByteType, CommonVars}
+import org.apache.commons.lang3.StringUtils
+import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.utils.ByteTimeUtils
object LinkisStorageConf {
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/csv/StorageCSVWriter.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/csv/StorageCSVWriter.scala
index 748d0355b..e44ddeac5 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/csv/StorageCSVWriter.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/csv/StorageCSVWriter.scala
@@ -18,12 +18,12 @@
package org.apache.linkis.storage.csv
import java.io._
-
import org.apache.linkis.common.io.{MetaData, Record}
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.storage.domain.DataType
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
class StorageCSVWriter(val charset: String, val separator: String, val
quoteRetouchEnable: Boolean, val outputStream: OutputStream) extends
CSVFsWriter with Logging {
@@ -39,13 +39,12 @@ class StorageCSVWriter(val charset: String, val separator:
String, val quoteReto
override def addMetaData(metaData: MetaData): Unit = {
val head = metaData.asInstanceOf[TableMetaData].columns.map(_.columnName)
write(head)
- //IOUtils.write(compact(head).getBytes(charset),outputStream)
}
private def compact(row: Array[String]): String = {
val quotationMarks: String = "\""
def decorateValue(v: String): String = {
- if (v == null || "".equals(v.trim)) v
+ if (StringUtils.isBlank(v)) v
else {
if (quoteRetouchEnable) {
s"$quotationMarks${v.replaceAll(quotationMarks, "")}$quotationMarks"
@@ -58,12 +57,12 @@ class StorageCSVWriter(val charset: String, val separator:
String, val quoteReto
}
private def write(row: Array[String]) = {
- val cotent: String = compact(row)
- if (buffer.length + cotent.length > 49500) {
+ val content: String = compact(row)
+ if (buffer.length + content.length > 49500) {
IOUtils.write(buffer.toString().getBytes(charset), outputStream)
buffer.clear()
}
- buffer.append(cotent)
+ buffer.append(content)
}
@scala.throws[IOException]
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/DataType.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/DataType.scala
index 84acaf269..de63d5576 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/DataType.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/DataType.scala
@@ -54,12 +54,12 @@ object DataType extends Logging{
case "string" => StringType
case "boolean" => BooleanType
case SHORT_REGEX() => ShortIntType
- case LONG_REGEX() => LongType
+ case LONG_REGEX() => LongType
case BIGINT_REGEX() => BigIntType
case INT_REGEX() | "integer" | "smallint" => IntType
case FLOAT_REGEX() => FloatType
case DOUBLE_REGEX() => DoubleType
- case VARCHAR_REGEX() =>VarcharType
+ case VARCHAR_REGEX() => VarcharType
case CHAR_REGEX() => CharType
case "date" => DateType
case "timestamp" => TimestampType
@@ -75,18 +75,18 @@ object DataType extends Logging{
def toValue(dataType: DataType, value: String): Any =
Utils.tryCatch(dataType match {
case NullType => null
case StringType | CharType | VarcharType | StructType | ListType |
ArrayType | MapType => value
- case BooleanType => if(isNumberNull(value)) null else value.toBoolean
- case ShortIntType => if(isNumberNull(value)) null else value.toShort
- case IntType =>if(isNumberNull(value)) null else value.toInt
- case LongType | BigIntType => if(isNumberNull(value)) null else
value.toLong
- case FloatType => if(isNumberNull(value)) null else value.toFloat
- case DoubleType => if(isNumberNull(value)) null else value.toDouble
- case DecimalType => if(isNumberNull(value)) null else new
JavaBigDecimal(value)
- case DateType => if(isNumberNull(value)) null else Date.valueOf(value)
- case TimestampType => if(isNumberNull(value)) null else
Timestamp.valueOf(value).toString.stripSuffix(".0")
- case BinaryType => if(isNull(value)) null else value.getBytes()
+ case BooleanType => if (isNumberNull(value)) null else value.toBoolean
+ case ShortIntType => if (isNumberNull(value)) null else value.toShort
+ case IntType => if (isNumberNull(value)) null else value.toInt
+ case LongType | BigIntType => if (isNumberNull(value)) null else
value.toLong
+ case FloatType => if (isNumberNull(value)) null else value.toFloat
+ case DoubleType => if (isNumberNull(value)) null else value.toDouble
+ case DecimalType => if (isNumberNull(value)) null else new
JavaBigDecimal(value)
+ case DateType => if (isNumberNull(value)) null else Date.valueOf(value)
+ case TimestampType => if (isNumberNull(value)) null else
Timestamp.valueOf(value).toString.stripSuffix(".0")
+ case BinaryType => if (isNull(value)) null else value.getBytes()
case _ => value
- }){
+ }) {
t =>
logger.debug(s"Failed to $value switch to dataType:", t)
value
@@ -111,10 +111,10 @@ object DataType extends Logging{
}
-abstract class DataType(val typeName:String,
- val javaSQLType: Int){
+abstract class DataType(val typeName: String, val javaSQLType: Int) {
override def toString: String = typeName
}
+
case object NullType extends DataType("void", 0)
case object StringType extends DataType("string", 12)
case object BooleanType extends DataType("boolean", 16)
@@ -137,9 +137,9 @@ case object ListType extends DataType("list", 2001)
case object StructType extends DataType("struct", 2002)
case object BigDecimalType extends DataType("bigdecimal", 3)
-case class Column(columnName: String, dataType: DataType, comment: String){
+case class Column(columnName: String, dataType: DataType, comment: String) {
- def toArray:Array[Any] ={
+ def toArray: Array[Any] = {
Array[Any](columnName, dataType, comment)
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/Dolphin.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/Dolphin.scala
index 96ab3924d..7ff52ef3d 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/Dolphin.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/Dolphin.scala
@@ -45,7 +45,7 @@ object Dolphin extends Logging{
val FILE_EMPTY = 31
- def getBytes(value: Any): Array[Byte] ={
+ def getBytes(value: Any): Array[Byte] = {
value.toString.getBytes(CHAR_SET)
}
@@ -57,7 +57,7 @@ object Dolphin extends Logging{
* @param len
* @return
*/
- def getString(bytes: Array[Byte], start:Int, len: Int)= new String(bytes,
start, len, Dolphin.CHAR_SET)
+ def getString(bytes: Array[Byte], start: Int, len: Int): String = new
String(bytes, start, len, Dolphin.CHAR_SET)
/**
* Read an integer value that converts the array to a byte of length 10
bytes
@@ -66,12 +66,13 @@ object Dolphin extends Logging{
* @return
*/
def readInt(inputStream: InputStream): Int = {
- val bytes = new Array[Byte](INT_LEN + 1)
- if(StorageUtils.readBytes(inputStream, bytes, INT_LEN) != INT_LEN) throw
new StorageWarnException(51000, "failed to read integer(读取整数失败)")
+ val bytes = new Array[Byte](INT_LEN + 1)
+ if (StorageUtils.readBytes(inputStream, bytes, INT_LEN) != INT_LEN) {
+ throw new StorageWarnException(51000, "failed to read integer(读取整数失败)")
+ }
getString(bytes, 0, INT_LEN).toInt
}
-
/**
* Print integers at a fixed length(将整数按固定长度打印)
* @param value
@@ -83,17 +84,16 @@ object Dolphin extends Logging{
Dolphin.getBytes(res)
}
-
- def getType(inputStream:InputStream):String = {
+ def getType(inputStream: InputStream): String = {
val bytes = new Array[Byte](100)
- val len = StorageUtils.readBytes(inputStream,bytes, Dolphin.MAGIC_LEN +
INT_LEN)
+ val len = StorageUtils.readBytes(inputStream, bytes, Dolphin.MAGIC_LEN +
INT_LEN)
if(len == -1) return null
getType(Dolphin.getString(bytes, 0, len))
}
def getType(content: String): String = {
- if(content.length < MAGIC.length || content.substring(0, MAGIC.length) !=
MAGIC) throw new IOException(s"File header type must be
dolphin,content:$content is not")
- content.substring(MAGIC.length, MAGIC.length + INT_LEN ).toInt.toString
+ if (content.length < MAGIC.length || content.substring(0, MAGIC.length) !=
MAGIC) throw new IOException(s"File header type must be
dolphin,content:$content is not")
+ content.substring(MAGIC.length, MAGIC.length + INT_LEN).toInt.toString
}
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/MethodEntity.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/MethodEntity.scala
index 996f78d23..cda35ee22 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/MethodEntity.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/MethodEntity.scala
@@ -36,8 +36,8 @@ import org.json4s.jackson.Serialization.write
case class MethodEntity(id: Long,
fsType: String,
creatorUser: String,
- proxyUser:String,
- clientIp:String,
+ proxyUser: String,
+ clientIp: String,
methodName: String,
params: Array[AnyRef]){
override def toString: String = {s"id:$id, methodName:$methodName,
fsType:$fsType, " +
@@ -58,7 +58,7 @@ object MethodEntitySerializer{
* @param code
* @return
*/
- def deserializer(code: String):MethodEntity =
parse(code).extract[MethodEntity]
+ def deserializer(code: String): MethodEntity =
parse(code).extract[MethodEntity]
/**
* Serialize MethodEntity to code
@@ -85,12 +85,12 @@ object MethodEntitySerializer{
* @tparam T
* @return
*/
- def deserializerToJavaObject[T](json:String,classType: Class[T]): T ={
+ def deserializerToJavaObject[T](json: String, classType: Class[T]): T = {
gson.fromJson(json, classType)
}
- def deserializerToJavaObject[T](json:String,oType: Type): T ={
- gson.fromJson(json,oType)
+ def deserializerToJavaObject[T](json: String, oType: Type): T = {
+ gson.fromJson(json, oType)
}
}
\ No newline at end of file
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageExcelWriter.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageExcelWriter.scala
index 2abb12a74..db94ee932 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageExcelWriter.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageExcelWriter.scala
@@ -17,20 +17,18 @@
package org.apache.linkis.storage.excel
-import java.io._
-import java.util
-import org.apache.linkis.common.io.{MetaData, Record}
-import org.apache.linkis.storage.domain.{BigIntType, DataType, IntType,
LongType, ShortIntType, TinyIntType}
-import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.commons.io.IOUtils
+import org.apache.linkis.common.io.{MetaData, Record}
import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.storage.domain.DataType.valueToString
+import org.apache.linkis.storage.domain._
+import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.poi.ss.usermodel._
import org.apache.poi.xssf.streaming.{SXSSFCell, SXSSFSheet, SXSSFWorkbook}
-import scala.collection.mutable.ArrayBuffer
-import org.apache.linkis.storage.domain._
+import java.io._
+import java.util
import java.util.Date
+import scala.collection.mutable.ArrayBuffer
class StorageExcelWriter(val charset: String, val sheetName: String, val
dateFormat: String, val outputStream: OutputStream, val autoFormat: Boolean)
extends ExcelFsWriter with Logging {
@@ -47,7 +45,7 @@ class StorageExcelWriter(val charset: String, val sheetName:
String, val dateFor
protected val os = new ByteArrayOutputStream()
protected var is: ByteArrayInputStream = _
- def init = {
+ def init: Unit = {
workBook = new SXSSFWorkbook()
sheet = workBook.createSheet(sheetName)
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageMultiExcelWriter.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageMultiExcelWriter.scala
index d30e241c4..78856f592 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageMultiExcelWriter.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/excel/StorageMultiExcelWriter.scala
@@ -27,7 +27,7 @@ class StorageMultiExcelWriter(override val outputStream:
OutputStream, override
private var sheetIndex = 0
- override def init = {
+ override def init: Unit = {
if (workBook == null) workBook = new SXSSFWorkbook()
//1.让表自适应列宽
if (sheet != null) {
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/io/IOClient.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/io/IOClient.scala
index 216b2feb3..502f96a1a 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/io/IOClient.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/io/IOClient.scala
@@ -20,39 +20,41 @@ package org.apache.linkis.storage.io
import java.util.UUID
import org.apache.linkis.storage.domain.MethodEntity
import org.apache.linkis.storage.exception.StorageErrorException
+import org.slf4j.{Logger, LoggerFactory}
/**
* IOClient is used to execute the proxy as the ujes code execution entry in
io and get the return result.
* IOClient用于在io进行代理作为ujes的代码执行入口,并获取返回结果
*/
trait IOClient {
- def execute(user: String, methodEntity: MethodEntity,
params:java.util.Map[String,Any]):String
+ def execute(user: String, methodEntity: MethodEntity, params:
java.util.Map[String, Any]): String
- def executeWithEngine(user: String, methodEntity: MethodEntity,
params:java.util.Map[String,Any]):Array[String]
+ def executeWithEngine(user: String, methodEntity: MethodEntity, params:
java.util.Map[String, Any]): Array[String]
}
object IOClient{
- var ioClient:IOClient = null
+ val logger: Logger = LoggerFactory.getLogger(classOf[IOClient])
+ var ioClient: IOClient = null
val SUCCESS = "SUCCESS"
val FAILED = "FAILED"
- def getIOClient():IOClient = {
- if(ioClient == null) throw new StorageErrorException(52004,"You must
register IOClient before you can use proxy mode.(必须先注册IOClient,才能使用代理模式)")
+ def getIOClient(): IOClient = {
+ if (ioClient == null) throw new StorageErrorException(52004,"You must
register IOClient before you can use proxy mode.(必须先注册IOClient,才能使用代理模式)")
ioClient
}
/**
* This method is called when ioClient is initialized.
* ioClient初始化时会调用该方法
- * @param client
+ * @param client IOClient
*/
- def register(client: IOClient):Unit = {
+ def register(client: IOClient): Unit = {
this.ioClient = client
- println(ioClient)
+ logger.debug(s"IOClient: ${ioClient.toString} registered")
}
- def getFSId():String = {
+ def getFSId(): String = {
UUID.randomUUID().toString
}
}
\ No newline at end of file
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
index 5a1582494..02bc85937 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
@@ -38,12 +38,12 @@ class DefaultResultSetFactory extends ResultSetFactory with
Logging{
val resultTypes = ResultSetFactory.resultSetType.keys.toArray
override def getResultSetByType(resultSetType: String): ResultSet[_ <:
MetaData, _ <: Record] = {
- if(! resultClasses.contains(resultSetType)) throw new
StorageErrorException(50000, s"Unsupported result
type(不支持的结果类型):$resultSetType" )
+ if (!resultClasses.contains(resultSetType)) throw new
StorageErrorException(50000, s"Unsupported result
type(不支持的结果类型):$resultSetType" )
resultClasses(resultSetType).newInstance()
}
override def getResultSetByPath(fsPath: FsPath): ResultSet[_ <: MetaData, _
<: Record] = {
- getResultSetByPath(fsPath,StorageUtils.getJvmUser)
+ getResultSetByPath(fsPath, StorageUtils.getJvmUser)
}
override def getResultSetByContent(content: String): ResultSet[_ <:
MetaData, _ <: Record] = {
@@ -56,14 +56,14 @@ class DefaultResultSetFactory extends ResultSetFactory with
Logging{
path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)
}
- override def isResultSet(content: String): Boolean =
Utils.tryCatch(resultClasses.contains(Dolphin.getType(content))){ t =>
+ override def isResultSet(content: String): Boolean =
Utils.tryCatch(resultClasses.contains(Dolphin.getType(content))) { t =>
logger.info("Wrong result Set: " + t.getMessage)
false
}
- override def getResultSet(output: String): ResultSet[_ <: MetaData, _ <:
Record] = getResultSet(output,StorageUtils.getJvmUser)
+ override def getResultSet(output: String): ResultSet[_ <: MetaData, _ <:
Record] = getResultSet(output, StorageUtils.getJvmUser)
- override def getResultSetType:Array[String] = resultTypes
+ override def getResultSetType: Array[String] = resultTypes
override def getResultSetByPath(fsPath: FsPath, fs: Fs): ResultSet[_ <:
MetaData, _ <: Record] = {
val inputStream = fs.read(fsPath)
@@ -75,20 +75,20 @@ class DefaultResultSetFactory extends ResultSetFactory with
Logging{
}
override def getResultSetByPath(fsPath: FsPath, proxyUser: String):
ResultSet[_ <: MetaData, _ <: Record] = {
- if (fsPath == null ) return null
+ if (fsPath == null) return null
logger.info("Get Result Set By Path:" + fsPath.getPath)
- val fs = FSFactory.getFsByProxyUser(fsPath,proxyUser)
- fs.init(new util.HashMap[String,String]())
+ val fs = FSFactory.getFsByProxyUser(fsPath, proxyUser)
+ fs.init(new util.HashMap[String, String]())
val inputStream = fs.read(fsPath)
val resultSetType = Dolphin.getType(inputStream)
- if(StringUtils.isEmpty(resultSetType)) throw new
StorageWarnException(51000, s"The file (${fsPath.getPath}) is
empty(文件(${fsPath.getPath}) 为空)")
+ if (StringUtils.isEmpty(resultSetType)) throw new
StorageWarnException(51000, s"The file (${fsPath.getPath}) is
empty(文件(${fsPath.getPath}) 为空)")
Utils.tryQuietly(inputStream.close())
Utils.tryQuietly(fs.close())
getResultSetByType(resultSetType)
}
override def getResultSet(output: String, proxyUser: String): ResultSet[_ <:
MetaData, _ <: Record] = {
- if(isResultSetPath(output)) {
+ if (isResultSetPath(output)) {
getResultSetByPath(new FsPath(output), proxyUser)
} else if (isResultSet(output)) {
getResultSetByContent(output)
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
index 605211196..eb409abc0 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
@@ -22,31 +22,29 @@ import org.apache.linkis.common.io.{Fs, FsPath, MetaData,
Record}
import scala.collection.mutable
-
-
trait ResultSetFactory extends scala.AnyRef {
- def getResultSetByType(resultSetType : scala.Predef.String) :ResultSet[_ <:
MetaData, _ <: Record]
+ def getResultSetByType(resultSetType: scala.Predef.String): ResultSet[_ <:
MetaData, _ <: Record]
def getResultSetByPath(fsPath : FsPath) : ResultSet[_ <: MetaData, _ <:
Record]
def getResultSetByPath(fsPath: FsPath, fs: Fs) : ResultSet[_ <: MetaData, _
<: Record]
def getResultSetByContent(content : scala.Predef.String) : ResultSet[_ <:
MetaData, _ <: Record]
def exists(resultSetType : scala.Predef.String) : scala.Boolean
def isResultSetPath(path : scala.Predef.String) : scala.Boolean
def isResultSet(content : scala.Predef.String) : scala.Boolean
- def getResultSet(output:String):ResultSet[_ <: MetaData, _ <: Record]
+ def getResultSet(output: String): ResultSet[_ <: MetaData, _ <: Record]
- def getResultSetByPath(fsPath : FsPath, proxyUser:String) : ResultSet[_ <:
MetaData, _ <: Record]
+ def getResultSetByPath(fsPath : FsPath, proxyUser: String) : ResultSet[_ <:
MetaData, _ <: Record]
+
+ def getResultSet(output: String, proxyUser: String): ResultSet[_ <:
MetaData, _ <: Record]
- def getResultSet(output:String, proxyUser:String):ResultSet[_ <: MetaData, _
<: Record]
/**
* The first must-time text(第一个必须时text)
* @return
*/
- def getResultSetType:Array[String]
+ def getResultSetType: Array[String]
}
object ResultSetFactory {
-
val TEXT_TYPE = "1"
val TABLE_TYPE = "2"
val IO_TYPE = "3"
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetReader.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetReader.scala
index 71ff18d05..219d4f601 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetReader.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetReader.scala
@@ -24,7 +24,6 @@ import org.apache.linkis.storage.FSFactory
import org.apache.linkis.storage.exception.StorageErrorException
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord,
TableResultSet}
-
object ResultSetReader {
def getResultSetReader[K <: MetaData, V <: Record](resultSet: ResultSet[K,
V], inputStream: InputStream): ResultSetReader[K, V] = {
@@ -35,7 +34,7 @@ object ResultSetReader {
new StorageResultSetReader[K, V](resultSet, value)
}
- def getResultSetReader(res: String):ResultSetReader[_ <: MetaData, _ <:
Record]= {
+ def getResultSetReader(res: String): ResultSetReader[_ <: MetaData, _ <:
Record] = {
val rsFactory = ResultSetFactory.getInstance
if (rsFactory.isResultSet(res)) {
val resultSet = rsFactory.getResultSet(res)
@@ -54,7 +53,7 @@ object ResultSetReader {
}
}
- def getTableResultReader(res:
String):ResultSetReader[TableMetaData,TableRecord] = {
+ def getTableResultReader(res: String): ResultSetReader[TableMetaData,
TableRecord] = {
val rsFactory = ResultSetFactory.getInstance
if (rsFactory.isResultSet(res)) {
val resultSet = rsFactory.getResultSet(res)
@@ -62,7 +61,7 @@ object ResultSetReader {
throw new StorageErrorException(52002, "Result sets that are not
tables are not supported(不支持不是表格的结果集)")
}
ResultSetReader.getResultSetReader(resultSet.asInstanceOf[TableResultSet], res)
- }else {
+ } else {
val resPath = new FsPath(res)
val resultSet = rsFactory.getResultSetByPath(resPath)
if (ResultSetFactory.TABLE_TYPE != resultSet.resultSetType()) {
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSet.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSet.scala
index e6892ce83..9aa13ecba 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSet.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSet.scala
@@ -29,13 +29,12 @@ abstract class StorageResultSet[K <: MetaData, V <: Record]
extends ResultSet[K,
val resultHeaderBytes = Dolphin.MAGIC_BYTES ++
Dolphin.getIntBytes(resultSetType().toInt)
override val charset: String =
StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue
-
-
override def getResultSetPath(parentDir: FsPath, fileName: String): FsPath =
{
- val path = if(parentDir.getPath.endsWith("/"))
+ val path = if (parentDir.getPath.endsWith("/")) {
parentDir.toPath + fileName + Dolphin.DOLPHIN_FILE_SUFFIX
- else
+ } else {
parentDir.toPath + "/" + fileName + Dolphin.DOLPHIN_FILE_SUFFIX
+ }
new FsPath(path)
}
@@ -43,7 +42,7 @@ abstract class StorageResultSet[K <: MetaData, V <: Record]
extends ResultSet[K,
override def belongToPath(path: String): Boolean =
path.endsWith(Dolphin.DOLPHIN_FILE_SUFFIX)
- override def belongToResultSet(content: String): Boolean =
Utils.tryCatch(Dolphin.getType(content) == resultSetType()){ t =>
+ override def belongToResultSet(content: String): Boolean =
Utils.tryCatch(Dolphin.getType(content) == resultSetType()) { t =>
logger.info("Wrong result Set: ", t)
false
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetReader.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetReader.scala
index 74c30de5b..580c94401 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetReader.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/StorageResultSetReader.scala
@@ -28,8 +28,6 @@ import org.apache.linkis.storage.utils.StorageUtils
import scala.collection.mutable.ArrayBuffer
-
-
class StorageResultSetReader[K <: MetaData, V <: Record](resultSet:
ResultSet[K, V], inputStream: InputStream) extends ResultSetReader[K,
V](resultSet, inputStream) with Logging{
private val deserializer = resultSet.createResultSetDeserializer
@@ -54,21 +52,21 @@ class StorageResultSetReader[K <: MetaData, V <:
Record](resultSet: ResultSet[K,
}
/**
- * Read a row of data
- Read the line length first
- * Get the entire row of data by the length of the line, first obtain the
column length in the entire row of data,
- * and then divide into column length to split the data
- * 读取一行数据
- * 先读取行长度
- * 通过行长度获取整行数据,在整行数据中先获取列长度,进而分割成列长度从而分割数据
- * @return
- */
+ * Read a row of data
+ * Read the line length first
+ * Get the entire row of data by the length of the line, first obtain the
column length in the entire row of data,
+ * and then divide into column length to split the data
+ * 读取一行数据
+ * 先读取行长度
+ * 通过行长度获取整行数据,在整行数据中先获取列长度,进而分割成列长度从而分割数据
+ * @return
+ */
def readLine(): Array[Byte] = {
var rowLen = 0
try rowLen = Dolphin.readInt(inputStream)
catch {
- case t:StorageWarnException => logger.info(s"Read finished(读取完毕)") ;
return null
+ case t: StorageWarnException => logger.info(s"Read finished(读取完毕)", t);
return null
case t: Throwable => throw t
}
@@ -77,10 +75,11 @@ class StorageResultSetReader[K <: MetaData, V <:
Record](resultSet: ResultSet[K,
//Read the entire line, except for the data of the line
length(读取整行,除了行长的数据)
while (rowLen > 0 && len >= 0) {
- if (rowLen > READ_CACHE)
- len = StorageUtils.readBytes(inputStream,bytes, READ_CACHE)
- else
- len = StorageUtils.readBytes(inputStream,bytes, rowLen)
+ if (rowLen > READ_CACHE) {
+ len = StorageUtils.readBytes(inputStream, bytes, READ_CACHE)
+ } else {
+ len = StorageUtils.readBytes(inputStream, bytes, rowLen)
+ }
if (len > 0) {
rowLen -= len
@@ -103,16 +102,16 @@ class StorageResultSetReader[K <: MetaData, V <:
Record](resultSet: ResultSet[K,
@scala.throws[IOException]
override def getMetaData: MetaData = {
- if(metaData == null) init()
+ if (metaData == null) init()
metaData = deserializer.createMetaData(readLine())
metaData
}
@scala.throws[IOException]
override def skip(recordNum: Int): Int = {
- if(recordNum < 0 ) return -1
+ if (recordNum < 0) return -1
- if(metaData == null) getMetaData
+ if (metaData == null) getMetaData
for(i <- recordNum until (0, -1)) {
try inputStream.skip(Dolphin.readInt(inputStream))
catch {
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IOMetaData.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IOMetaData.scala
index 3d09c457e..eeb5b3a40 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IOMetaData.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IOMetaData.scala
@@ -21,6 +21,6 @@ import org.apache.linkis.common.io.MetaData
import org.apache.linkis.storage.resultset.ResultMetaData
-class IOMetaData(val off:Int, val len:Int) extends ResultMetaData {
+class IOMetaData(val off: Int, val len: Int) extends ResultMetaData {
override def cloneMeta(): MetaData = new IOMetaData(off, len)
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IORecord.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IORecord.scala
index 5b7801522..c8fb82668 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IORecord.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IORecord.scala
@@ -21,6 +21,6 @@ import org.apache.linkis.common.io.Record
import org.apache.linkis.storage.resultset.ResultRecord
-class IORecord(val value:Array[Byte]) extends ResultRecord{
+class IORecord(val value: Array[Byte]) extends ResultRecord{
override def cloneRecord(): Record = new IORecord(value)
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IOResultSerializer.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IOResultSerializer.scala
index d20bdcab1..a05ca5227 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IOResultSerializer.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/io/IOResultSerializer.scala
@@ -36,7 +36,7 @@ class IOResultSerializer extends ResultSerializer{
}
def lineToBytes(value: String): Array[Byte] = {
- val bytes = if(value == null) Dolphin.NULL_BYTES else
Dolphin.getBytes(value)
+ val bytes = if (value == null) Dolphin.NULL_BYTES else
Dolphin.getBytes(value)
Dolphin.getIntBytes(bytes.length) ++ bytes
}
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableRecord.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableRecord.scala
index e581da537..91782f73e 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableRecord.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableRecord.scala
@@ -22,15 +22,15 @@ import org.apache.linkis.storage.resultset.ResultRecord
import org.apache.linkis.storage.utils.StorageUtils
-class TableRecord(val row:Array[Any]) extends ResultRecord{
+class TableRecord(val row: Array[Any]) extends ResultRecord{
override def cloneRecord(): Record = {
new TableRecord(row)
}
- def tableRecordToString(nullValue:String = "NULL"):Array[String] = {
+ def tableRecordToString(nullValue: String = "NULL"): Array[String] = {
row.map{ col =>
- StorageUtils.colToString(col,nullValue)
+ StorageUtils.colToString(col, nullValue)
}
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultDeserializer.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultDeserializer.scala
index 2e3cd520a..b45b96866 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultDeserializer.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultDeserializer.scala
@@ -33,11 +33,11 @@ class TableResultDeserializer extends
ResultDeserializer[TableMetaData, TableRec
override def createMetaData(bytes: Array[Byte]): TableMetaData = {
val colByteLen = Dolphin.getString(bytes, 0, Dolphin.INT_LEN).toInt
val colString = Dolphin.getString(bytes, Dolphin.INT_LEN, colByteLen)
- val colArray = if(colString.endsWith(Dolphin.COL_SPLIT))
colString.substring(0, colString.length -1).split(Dolphin.COL_SPLIT) else
colString.split(Dolphin.COL_SPLIT)
+ val colArray = if (colString.endsWith(Dolphin.COL_SPLIT))
colString.substring(0, colString.length -1).split(Dolphin.COL_SPLIT) else
colString.split(Dolphin.COL_SPLIT)
var index = Dolphin.INT_LEN + colByteLen
- if(colArray.length % 3 != 0) throw new
StorageErrorException(52001,"Parsing metadata failed(解析元数据失败)")
+ if (colArray.length % 3 != 0) throw new
StorageErrorException(52001,"Parsing metadata failed(解析元数据失败)")
val columns = new ArrayBuffer[Column]()
- for(i <- 0 until (colArray.length, 3)){
+ for (i <- 0 until (colArray.length, 3)) {
var len = colArray(i).toInt
val colName = Dolphin.getString(bytes, index, len)
index += len
@@ -64,15 +64,16 @@ class TableResultDeserializer extends
ResultDeserializer[TableMetaData, TableRec
override def createRecord(bytes: Array[Byte]): TableRecord = {
val colByteLen = Dolphin.getString(bytes, 0, Dolphin.INT_LEN).toInt
val colString = Dolphin.getString(bytes, Dolphin.INT_LEN, colByteLen)
- val colArray = if(colString.endsWith(Dolphin.COL_SPLIT))
colString.substring(0, colString.length -1).split(Dolphin.COL_SPLIT) else
colString.split(Dolphin.COL_SPLIT)
+ val colArray = if (colString.endsWith(Dolphin.COL_SPLIT))
colString.substring(0, colString.length -1).split(Dolphin.COL_SPLIT) else
colString.split(Dolphin.COL_SPLIT)
var index = Dolphin.INT_LEN + colByteLen
val data = colArray.indices.map { i =>
val len = colArray(i).toInt
val res = Dolphin.getString(bytes, index, len)
index += len
- if(i >= metaData.columns.length) res
- else
- toValue(metaData.columns(i).dataType,res)
+ if (i >= metaData.columns.length) res
+ else {
+ toValue(metaData.columns(i).dataType, res)
+ }
}.toArray
new TableRecord(data)
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultSerializer.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultSerializer.scala
index ae513fb86..7b38011c8 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultSerializer.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/table/TableResultSerializer.scala
@@ -33,16 +33,16 @@ class TableResultSerializer extends ResultSerializer{
}
override def recordToBytes(record: Record): Array[Byte] = {
- val tableRecord = record.asInstanceOf[TableRecord]
+ val tableRecord = record.asInstanceOf[TableRecord]
lineToBytes(tableRecord.row)
}
/**
* Convert a row of data to an array of Bytes
- * Convert the data to byte and get the corresponding total byte length to
write to the file
- * Data write format: line length (fixed length) column length (fixed length)
field index comma segmentation real data
- * For example: 000000004900000000116,10,3,4,5,peace1johnnwang1101true11.51
- * The length of the line does not include its own length
+ * Convert the data to byte and get the corresponding total byte length to
write to the file
+ * Data write format: line length (fixed length) column length (fixed
length) field index comma segmentation real data
+ * For example: 000000004900000000116,10,3,4,5,peace1johnnwang1101true11.51
+ * The length of the line does not include its own length
* 将一行数据转换为Bytes的数组
* 对数据转换为byte,并获取相应的总byte长度写入文件
* 数据写入格式:行长(固定长度) 列长(固定长度) 字段索引逗号分割 真实数据
@@ -58,7 +58,7 @@ class TableResultSerializer extends ResultSerializer{
var colByteLen = 0
var length = 0
line.foreach { data =>
- val bytes = if(data == null ) Dolphin.NULL_BYTES else
Dolphin.getBytes(data)
+ val bytes = if (data == null) Dolphin.NULL_BYTES else
Dolphin.getBytes(data)
dataBytes += bytes
val colBytes = Dolphin.getBytes(bytes.length)
colIndex += colBytes += Dolphin.COL_SPLIT_BYTES
@@ -85,8 +85,4 @@ class TableResultSerializer extends ResultSerializer{
colIndex.foreach(row.appendAll(_))
row.toArray
}
-
-
-
-
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/txt/TextResultSerializer.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/txt/TextResultSerializer.scala
index 84b9d85de..98f99128b 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/txt/TextResultSerializer.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/txt/TextResultSerializer.scala
@@ -26,7 +26,7 @@ import org.apache.linkis.storage.{LineMetaData, LineRecord}
class TextResultSerializer extends ResultSerializer{
override def metaDataToBytes(metaData: MetaData): Array[Byte] = {
- if(metaData == null){
+ if (metaData == null) {
lineToBytes(null)
} else {
val textMetaData = metaData.asInstanceOf[LineMetaData]
@@ -40,7 +40,7 @@ class TextResultSerializer extends ResultSerializer{
}
def lineToBytes(value: String): Array[Byte] = {
- val bytes = if(value == null) Dolphin.NULL_BYTES else
Dolphin.getBytes(value)
+ val bytes = if (value == null) Dolphin.NULL_BYTES else
Dolphin.getBytes(value)
Dolphin.getIntBytes(bytes.length) ++ bytes
}
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
index bb6705e3f..ed710f31d 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
@@ -71,7 +71,6 @@ object FileSystemUtils extends Logging{
case l: LocalFileSystem => fileSystem.setOwner(filePath, user)
case _ => logger.info(s"doesn't need to call setOwner")
}
-
/*fileSystem.setOwner(filePath,user,StorageConfiguration.STORAGE_HDFS_GROUP.getValue)*/
}
}
@@ -102,7 +101,6 @@ object FileSystemUtils extends Logging{
case l: LocalFileSystem => fileSystem.setOwner(path, user)
case _ => logger.info(s"doesn't need to call setOwner")
}
-
//fileSystem.setOwner(path,user,StorageConfiguration.STORAGE_HDFS_GROUP.getValue)
}
true
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
index 96cc89113..f9ec54096 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
@@ -36,7 +36,7 @@ object StorageConfiguration {
val STORAGE_RS_FILE_SUFFIX = CommonVars("wds.linkis.storage.rs.file.suffix",
".dolphin")
- val ResultTypes = List( "%TEXT","%TABLE", "%HTML", "%IMG", "%ANGULAR",
"%SVG")
+ val ResultTypes = List("%TEXT", "%TABLE", "%HTML", "%IMG", "%ANGULAR",
"%SVG")
val STORAGE_RESULT_SET_PACKAGE =
CommonVars("wds.linkis.storage.result.set.package",
"org.apache.linkis.storage.resultset")
val STORAGE_RESULT_SET_CLASSES =
CommonVars("wds.linkis.storage.result.set.classes",
"txt.TextResultSet,table.TableResultSet,io.IOResultSet,html.HtmlResultSet,picture.PictureResultSet")
@@ -45,7 +45,6 @@ object StorageConfiguration {
val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true)
-
val ENABLE_IO_PROXY = CommonVars("wds.linkis.storage.enable.io.proxy", false)
val IO_USER = CommonVars("wds.linkis.storage.io.user", "root")
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageHelper.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageHelper.scala
index 5ee3ec3a0..3f2760327 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageHelper.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageHelper.scala
@@ -25,9 +25,9 @@ import org.apache.linkis.storage.resultset.{ResultSetFactory,
ResultSetReader}
object StorageHelper {
def main(args: Array[String]): Unit = {
- if(args.length < 2) println("Usage method params eg:getTableResLines path")
+ if (args.length < 2) println("Usage method params eg:getTableResLines
path")
val method = args(0)
- val params = args.slice(1,args.length)
+ val params = args.slice(1, args.length)
Thread.sleep(10000L)
method match {
@@ -43,18 +43,18 @@ object StorageHelper {
*
* @param args
*/
- def getTableResLines(args: Array[String]) = {
+ def getTableResLines(args: Array[String]): Unit = {
val resPath = StorageUtils.getFsPath(args(0))
val resultSetFactory = ResultSetFactory.getInstance
val resultSet =
resultSetFactory.getResultSetByType(ResultSetFactory.TABLE_TYPE)
val fs = FSFactory.getFs(resPath)
fs.init(null)
- val reader = ResultSetReader.getResultSetReader(resultSet,fs.read(resPath))
+ val reader = ResultSetReader.getResultSetReader(resultSet,
fs.read(resPath))
val rmetaData = reader.getMetaData
rmetaData.asInstanceOf[TableMetaData].columns.foreach(println)
var num = 0
Thread.sleep(10000L)
- while (reader.hasNext){
+ while (reader.hasNext) {
reader.getRecord
num = num + 1
}
@@ -62,7 +62,7 @@ object StorageHelper {
reader.close()
}
- def getTableRes(args: Array[String]): Unit ={
+ def getTableRes(args: Array[String]): Unit = {
val len = Integer.parseInt(args(1))
val max = len + 10
val resPath = StorageUtils.getFsPath(args(0))
@@ -70,15 +70,15 @@ object StorageHelper {
val resultSet =
resultSetFactory.getResultSetByType(ResultSetFactory.TABLE_TYPE)
val fs = FSFactory.getFs(resPath)
fs.init(null)
- val reader = ResultSetReader.getResultSetReader(resultSet,fs.read(resPath))
+ val reader = ResultSetReader.getResultSetReader(resultSet,
fs.read(resPath))
val rmetaData = reader.getMetaData
rmetaData.asInstanceOf[TableMetaData].columns.foreach(println)
rmetaData.asInstanceOf[TableMetaData].columns.map(_.columnName +
",").foreach(print)
var num = 0
- while (reader.hasNext){
+ while (reader.hasNext) {
num = num + 1
if(num > max) return
- if(num > len){
+ if(num > len) {
val record = reader.getRecord
record.asInstanceOf[TableRecord].row.foreach{ value =>
print(value.toString)
@@ -89,10 +89,10 @@ object StorageHelper {
}
}
- def createNewFile(args: Array[String]): Unit ={
+ def createNewFile(args: Array[String]): Unit = {
val resPath = StorageUtils.getFsPath(args(0))
val proxyUser = StorageUtils.getJvmUser
- FileSystemUtils.createNewFile(resPath, proxyUser,true)
+ FileSystemUtils.createNewFile(resPath, proxyUser, true)
println("success")
}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
index 9e002cae8..53a3d22ed 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
@@ -130,7 +130,7 @@ object StorageUtils extends Logging{
val reader = ResultSetReader.getResultSetReader(resultSet, result)
reader.getMetaData
val sb = new StringBuilder
- while (reader.hasNext){
+ while (reader.hasNext) {
val record = reader.getRecord.asInstanceOf[LineRecord]
sb.append(record.getLine)
}
@@ -141,36 +141,36 @@ object StorageUtils extends Logging{
- def close(outputStream: OutputStream): Unit ={
- close(outputStream,null,null)
+ def close(outputStream: OutputStream): Unit = {
+ close(outputStream, null, null)
}
- def close(inputStream:InputStream): Unit ={
- close(null,inputStream,null)
+ def close(inputStream: InputStream): Unit = {
+ close(null, inputStream, null)
}
- def close(fs:Fs): Unit ={
- close(null,null,fs)
+ def close(fs: Fs): Unit = {
+ close(null, null, fs)
}
- def close(outputStream: OutputStream,inputStream:InputStream,fs:Fs) ={
- Utils.tryFinally(if(outputStream != null) outputStream.close())()
- Utils.tryFinally(if(inputStream != null) inputStream.close())()
- Utils.tryFinally(if(fs != null) fs.close())()
+ def close(outputStream: OutputStream, inputStream: InputStream, fs: Fs):
Unit = {
+ Utils.tryFinally(if (outputStream != null) outputStream.close())()
+ Utils.tryFinally(if (inputStream != null) inputStream.close())()
+ Utils.tryFinally(if (fs != null) fs.close())()
}
- def close(closeable: Closeable) ={
- Utils.tryFinally(if(closeable != null) closeable.close())()
+ def close(closeable: Closeable): Unit = {
+ Utils.tryFinally(if (closeable != null) closeable.close())()
}
- def getJvmUser:String = System.getProperty("user.name")
+ def getJvmUser: String = System.getProperty("user.name")
- def isHDFSNode:Boolean = {
+ def isHDFSNode: Boolean = {
val confPath = new File(HadoopConf.hadoopConfDir)
//TODO IO-client mode need return false
- if(!confPath.exists() || confPath.isFile)
+ if (!confPath.exists() || confPath.isFile) {
throw new StorageFatalException(50001, "HDFS configuration was not read,
please configure hadoop.config.dir or add env:HADOOP_CONF_DIR")
- else true
+ } else true
}
/**
@@ -179,16 +179,16 @@ object StorageUtils extends Logging{
* @param path
* @return
*/
- def getFsPath(path: String):FsPath ={
- if(path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA)) new
FsPath(path)
- else
+ def getFsPath(path: String): FsPath = {
+ if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA)) new
FsPath(path)
+ else {
new FsPath(FILE_SCHEMA + path)
-
+ }
}
- def readBytes(inputStream: InputStream,bytes:Array[Byte],len:Int):Int = {
+ def readBytes(inputStream: InputStream, bytes: Array[Byte], len: Int): Int =
{
var count = 0
- while ( count < len ){
+ while (count < len) {
val value = inputStream.read()
if(value == -1 && inputStream.available() < 1) return count
bytes(count) = value.toByte
@@ -197,11 +197,11 @@ object StorageUtils extends Logging{
count
}
- def colToString(col:Any,nullValue:String = "NULL"):String ={
- if(null == col) nullValue
+ def colToString(col: Any, nullValue: String = "NULL"): String = {
+ if (null == col) nullValue
else {
col match {
- case value:Double => doubleToString(value)
+ case value: Double => doubleToString(value)
case "NULL" | "" => nullValue
case _ => col.toString
}
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
index 352d0e88c..61a4e73fc 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
@@ -138,8 +138,6 @@ object EngineConnMonitor extends Logging {
endJobByEngineInstance(status._1)
}
}
- case o: Any =>
- logger.error(s"Status of engine ${status._1.toString} is
${status._2}")
}
}
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index d0e5fd3c6..5c33ed8a3 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -97,7 +97,7 @@ class CodeLogicalUnitExecTask (parents: Array[ExecTask],
children: Array[ExecTas
getPhysicalContext.pushProgress(event)
getPhysicalContext.pushLog(TaskLogEvent(this,
LogUtils.generateInfo(s"Task submit to ec:
${codeExecutor.getEngineConnExecutor.getServiceInstance} get engineConnExecId
is: ${engineConnExecId}")))
new AsyncTaskResponse {
- override def notifyMe(listener: NotifyListener): Unit = null
+ override def notifyMe(listener: NotifyListener): Unit = {}
override def waitForCompleted(): TaskResponse = throw new
OrchestratorErrorException(OrchestratorErrorCodeSummary.METHOD_NUT_SUPPORT_CODE,
"waitForCompleted method not support")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]