This is an automated email from the ASF dual-hosted git repository.
chengjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 43cb5b2e30 The interface of basic data management should be judged by
the administrator. (#5108)
43cb5b2e30 is described below
commit 43cb5b2e307d51a5194135b0295176bb3f3bdc1b
Author: peacewong <[email protected]>
AuthorDate: Tue May 14 15:50:11 2024 +0800
The interface of basic data management should be judged by the
administrator. (#5108)
* Add administrator judgment
* Add jdbc params
* Add file path judgment
* add throw Exception
* code format
code format
* remove no use file
* code optimize
* use feign requestInterceptor to fixed server
* add gateway ip priority load balancer
* Fix Rpc bug
* Support automatic retry for important RPC requests
* Fix build issue
* code format
---
.../apache/linkis/common/utils/SecurityUtils.java | 36 ++++++
.../linkis/protocol/AbstractRetryableProtocol.java | 6 +-
.../apache/linkis/protocol/engine/EngineInfo.java | 47 --------
.../linkis/protocol/IRServiceGroupProtocol.scala | 27 -----
.../apache/linkis/protocol/RetryableProtocol.scala | 6 +-
.../apache/linkis/protocol/UserWithCreator.scala} | 4 +-
.../protocol/callback/LogCallbackProtocol.scala | 24 ----
.../linkis/protocol/engine/EngineCallback.scala | 35 ------
.../engine/EngineStateTransitionRequest.scala | 27 -----
.../protocol/engine/RequestEngineStatus.scala | 32 ------
.../protocol/engine/RequestUserEngineKill.scala | 34 ------
.../linkis/protocol/utils/ProtocolUtils.scala | 44 -------
.../protocol/engine/RequestEngineStatusTest.scala | 44 -------
.../engine/ResponseUserEngineKillTest.scala | 35 ------
.../linkis/protocol/utils/ProtocolUtilsTest.scala | 45 --------
.../linkis/rpc/conf/CacheManualRefresher.java | 22 ----
.../apache/linkis/rpc/conf/DynamicFeignClient.java | 126 ---------------------
.../rpc/conf/EurekaClientCacheManualRefresher.java | 118 -------------------
.../linkis/rpc/conf/FeignRequestInterceptor.java | 64 -----------
.../rpc/conf/NacosClientCacheManualRefresher.java | 40 -------
.../apache/linkis/rpc/constant/RpcConstant.java | 4 -
.../rpc/errorcode/LinkisRpcErrorCodeSummary.java | 2 +
.../org/apache/linkis/rpc/BaseRPCSender.scala | 49 ++------
.../org/apache/linkis/rpc/RPCReceiveRemote.scala | 15 +--
.../org/apache/linkis/rpc/RPCSpringBeanCache.scala | 8 --
.../common/RetryableRPCInterceptor.scala | 10 --
.../linkis/rpc/sender/SpringMVCRPCSender.scala | 52 ++++++---
.../common/protocol/job/JobReqProcotol.scala | 3 +-
.../common/protocol/task/RequestTask.scala | 3 +-
.../protocol/task/ResponseEngineConnPid.scala | 3 +-
.../common/protocol/task/ResponseTaskExecute.scala | 7 +-
.../service/TaskExecutionServiceImpl.scala | 3 +-
.../common/resource/EngineResourceRequest.scala | 2 +-
.../service/impl/EnginePluginAdminServiceImpl.java | 11 ++
.../linkis/manager/am/exception/AMErrorCode.java | 3 +-
.../engineplugin/jdbc/ConnectionManager.java | 1 +
.../query/service/mysql/SqlConnection.java | 5 +-
.../restful/ConfigurationTemplateRestfulApi.java | 15 ++-
.../server/restful/DatasourceAccessRestfulApi.java | 10 +-
.../server/restful/DatasourceEnvRestfulApi.java | 7 +-
.../restful/DatasourceTypeKeyRestfulApi.java | 8 +-
.../server/restful/DatasourceTypeRestfulApi.java | 9 +-
.../server/restful/GatewayAuthTokenRestfulApi.java | 22 +++-
.../StaticAuthenticationStrategy.scala | 20 +++-
.../dws/response/DWSAuthenticationResult.scala | 5 +
.../springcloud/http/IpPriorityLoadBalancer.java | 100 ++++------------
.../LinkisLoadBalancerClientConfiguration.java | 10 +-
47 files changed, 229 insertions(+), 974 deletions(-)
diff --git
a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java
b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java
index 0278b3337e..af163a6494 100644
---
a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java
+++
b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/SecurityUtils.java
@@ -320,4 +320,40 @@ public abstract class SecurityUtils {
return key.toLowerCase().contains(param.toLowerCase())
|| value.toLowerCase().contains(param.toLowerCase());
}
+
+ /**
+ *
allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false
+ *
+ * @return
+ */
+ public static Properties getMysqlSecurityParams() {
+ Properties properties = new Properties();
+ properties.setProperty("allowLoadLocalInfile", "false");
+ properties.setProperty("autoDeserialize", "false");
+ properties.setProperty("allowLocalInfile", "false");
+ properties.setProperty("allowUrlInLocalInfile", "false");
+ return properties;
+ }
+
+ /**
+ * Check if the path has a relative path
+ *
+ * @param path
+ * @return
+ */
+ public static boolean containsRelativePath(String path) {
+ if (path.startsWith("./")
+ || path.contains("/./")
+ || path.startsWith("../")
+ || path.contains("/../")) {
+ return true;
+ }
+ if (path.startsWith(".\\")
+ || path.contains("\\.\\")
+ || path.startsWith("..\\")
+ || path.contains("\\..\\")) {
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java
b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java
index 3dfd166846..aa7ddece50 100644
---
a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java
+++
b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java
@@ -21,7 +21,7 @@ public class AbstractRetryableProtocol implements
RetryableProtocol {
@Override
public long maxPeriod() {
- return 3000L;
+ return 30000L;
}
@Override
@@ -31,11 +31,11 @@ public class AbstractRetryableProtocol implements
RetryableProtocol {
@Override
public int retryNum() {
- return 2;
+ return 5;
}
@Override
public long period() {
- return 1000L;
+ return 10000L;
}
}
diff --git
a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java
b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java
deleted file mode 100644
index 0504ee2113..0000000000
---
a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.engine;
-
-public class EngineInfo {
-
- private Long id;
- private EngineState engineState;
-
- public EngineInfo() {}
-
- public EngineInfo(Long id, EngineState state) {
- this.id = id;
- this.engineState = state;
- }
-
- public Long getId() {
- return id;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- public EngineState getEngineState() {
- return engineState;
- }
-
- public void setEngineState(EngineState engineState) {
- this.engineState = engineState;
- }
-}
diff --git
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala
deleted file mode 100644
index 675dc0c830..0000000000
---
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol
-
-trait IRServiceGroupProtocol extends IRProtocol with InstanceProtocol {
- val userWithCreator: UserWithCreator
-
- def user: String = userWithCreator.user
- def creator: String = userWithCreator.creator
-}
-
-case class UserWithCreator(user: String, creator: String)
diff --git
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala
index 51509d6883..6ebee7d0e2 100644
---
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala
+++
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala
@@ -18,8 +18,8 @@
package org.apache.linkis.protocol
trait RetryableProtocol extends Protocol {
- def retryNum: Int = 2
- def period: Long = 1000L
- def maxPeriod: Long = 3000L
+ def retryNum: Int = 5
+ def period: Long = 10000L
+ def maxPeriod: Long = 30000L
def retryExceptions: Array[Class[_ <: Throwable]] = Array.empty[Class[_ <:
Throwable]]
}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/GatewayLoadBalancerConfiguration.java
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/UserWithCreator.scala
similarity index 89%
rename from
linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/GatewayLoadBalancerConfiguration.java
rename to
linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/UserWithCreator.scala
index 0c9aecf177..cebaf3b9b2 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/GatewayLoadBalancerConfiguration.java
+++
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/UserWithCreator.scala
@@ -15,6 +15,6 @@
* limitations under the License.
*/
-package org.apache.linkis.rpc.loadbalancer;
+package org.apache.linkis.protocol
-public class GatewayLoadBalancerConfiguration {}
+case class UserWithCreator(user: String, creator: String)
diff --git
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala
deleted file mode 100644
index 0109472a90..0000000000
---
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.callback
-
-import org.apache.linkis.protocol.message.RequestProtocol
-
-case class YarnAPPIdCallbackProtocol(nodeId: String, applicationId: String)
extends RequestProtocol
-
-case class YarnInfoCallbackProtocol(nodeId: String, uri: String) extends
RequestProtocol
diff --git
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala
deleted file mode 100644
index 8856d3a927..0000000000
---
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.engine
-
-object EngineCallback {
- private val DWC_APPLICATION_NAME = "dwc.application.name"
- private val DWC_INSTANCE = "dwc.application.instance"
-
- def mapToEngineCallback(options: Map[String, String]): EngineCallback =
- EngineCallback(options(DWC_APPLICATION_NAME), options(DWC_INSTANCE))
-
- def callbackToMap(engineCallback: EngineCallback): Map[String, String] =
- Map(
- DWC_APPLICATION_NAME -> engineCallback.applicationName,
- DWC_INSTANCE -> engineCallback.instance
- )
-
-}
-
-case class EngineCallback(applicationName: String, instance: String)
diff --git
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala
deleted file mode 100644
index 9137001c14..0000000000
---
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.engine
-
-case class EngineStateTransitionRequest(engineInstance: String, state: String)
-
-case class EngineStateTransitionResponse(
- engineInstance: String,
- state: String,
- result: Boolean,
- message: String
-)
diff --git
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala
deleted file mode 100644
index a4672aa4e5..0000000000
---
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.engine
-
-import org.apache.linkis.protocol.RetryableProtocol
-import org.apache.linkis.protocol.message.RequestProtocol
-
-case class RequestEngineStatus(messageType: Int) extends RetryableProtocol
with RequestProtocol
-
-object RequestEngineStatus {
- val Status_Only = 1
- val Status_Overload = 2
- val Status_Concurrent = 3
- val Status_Overload_Concurrent = 4
- val Status_BasicInfo = 5
- val ALL = 6
-}
diff --git
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala
deleted file mode 100644
index beb7987b01..0000000000
---
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.engine
-
-import org.apache.linkis.protocol.message.RequestProtocol
-
-case class RequestUserEngineKill(
- ticketId: String,
- creator: String,
- user: String,
- properties: Map[String, String]
-) extends RequestProtocol
-
-case class ResponseUserEngineKill(ticketId: String, status: String, message:
String)
-
-object ResponseUserEngineKill {
- val Success = "Success"
- val Error = "Error"
-}
diff --git
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala
b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala
deleted file mode 100644
index 1bb0791be3..0000000000
---
a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.utils
-
-import org.apache.linkis.common.conf.CommonVars
-
-object ProtocolUtils {
-
- val SERVICE_SUFFIX = CommonVars("wds.linkis.service.suffix",
"engineManager,entrance,engine")
- val suffixs = SERVICE_SUFFIX.getValue.split(",")
-
- /**
- * Pass in moduleName to return the corresponding appName
传入moduleName返回对应的appName
- * @param moduleName
- * module's name
- * @return
- * application's name
- */
- def getAppName(moduleName: String): Option[String] = {
- val moduleNameLower = moduleName.toLowerCase()
- for (suffix <- suffixs) {
- if (moduleNameLower.contains(suffix.toLowerCase())) {
- return Some(moduleNameLower.replace(suffix.toLowerCase(), ""))
- }
- }
- None
- }
-
-}
diff --git
a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala
b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala
deleted file mode 100644
index d9fc07b6c0..0000000000
---
a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.engine
-
-import org.junit.jupiter.api.{Assertions, DisplayName, Test}
-
-class RequestEngineStatusTest {
-
- @Test
- @DisplayName("constTest")
- def constTest(): Unit = {
-
- val statusOnly = RequestEngineStatus.Status_Only
- val statusOverload = RequestEngineStatus.Status_Overload
- val statusConcurrent = RequestEngineStatus.Status_Concurrent
- val statusOverloadConcurrent =
RequestEngineStatus.Status_Overload_Concurrent
- val statusBasicInfo = RequestEngineStatus.Status_BasicInfo
- val all = RequestEngineStatus.ALL
-
- Assertions.assertTrue(1 == statusOnly)
- Assertions.assertTrue(2 == statusOverload)
- Assertions.assertTrue(3 == statusConcurrent)
- Assertions.assertTrue(4 == statusOverloadConcurrent)
- Assertions.assertTrue(5 == statusBasicInfo)
- Assertions.assertTrue(6 == all)
-
- }
-
-}
diff --git
a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala
b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala
deleted file mode 100644
index dbf3f5e3b5..0000000000
---
a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.engine
-
-import org.junit.jupiter.api.{Assertions, DisplayName, Test}
-
-class ResponseUserEngineKillTest {
-
- @Test
- @DisplayName("constTest")
- def constTest(): Unit = {
-
- val success = ResponseUserEngineKill.Success
- val error = ResponseUserEngineKill.Error
-
- Assertions.assertEquals("Success", success)
- Assertions.assertEquals("Error", error)
- }
-
-}
diff --git
a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala
b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala
deleted file mode 100644
index 2435f51497..0000000000
---
a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.protocol.utils
-
-import org.junit.jupiter.api.{Assertions, DisplayName, Test}
-
-class ProtocolUtilsTest {
-
- @Test
- @DisplayName("constTest")
- def constTest(): Unit = {
-
- val serviceSuffix = ProtocolUtils.SERVICE_SUFFIX.getValue
- val suffixs = ProtocolUtils.suffixs
-
- Assertions.assertNotNull(serviceSuffix)
- Assertions.assertTrue(suffixs.length == 3)
- }
-
- @Test
- @DisplayName("getAppNameTest")
- def getAppNameTest(): Unit = {
-
- val modeleName = "engineManager"
- val appNameOption = ProtocolUtils.getAppName(modeleName)
- Assertions.assertNotNull(appNameOption.get)
-
- }
-
-}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java
deleted file mode 100644
index dbdf52a1fc..0000000000
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.rpc.conf;
-
-public interface CacheManualRefresher {
- void refresh();
-}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java
deleted file mode 100644
index dd1f6a7dc9..0000000000
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.rpc.conf;
-
-import org.apache.linkis.DataWorkCloudApplication;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.springframework.cloud.openfeign.FeignClientBuilder;
-import org.springframework.cloud.openfeign.FeignClientFactoryBean;
-import org.springframework.stereotype.Component;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-@Component
-public class DynamicFeignClient<T> {
-
- private FeignClientBuilder feignClientBuilder;
-
- private final ConcurrentHashMap<String, T> CACHE_BEAN = new
ConcurrentHashMap();
-
- public DynamicFeignClient() {
- this.feignClientBuilder =
- new
FeignClientBuilder(DataWorkCloudApplication.getApplicationContext());
- }
-
- public T getFeignClient(final Class<T> type, final String serviceName) {
- return getFeignClient(type, serviceName, null);
- }
-
- public T getFeignClient(
- final Class<T> type, final Class<?> fallbackFactory, final String
serviceName) {
- return getFeignClient(type, fallbackFactory, serviceName, null);
- }
-
- public T getFeignClient(
- final Class<T> type,
- final FeignClientFactoryBean clientFactoryBean,
- final String serviceName) {
- return getFeignClient(type, clientFactoryBean, serviceName, null);
- }
-
- public T getFeignClient(final Class<T> type, String serviceName, final
String serviceUrl) {
- String k = serviceName;
- if (StringUtils.isNotEmpty(serviceUrl)) {
- k = serviceUrl;
- }
- return CACHE_BEAN.computeIfAbsent(
- k,
- (t) -> {
- FeignClientBuilder.Builder<T> builder =
- this.feignClientBuilder.forType(type, serviceName);
- if (StringUtils.isNotEmpty(serviceUrl)) {
- builder.url(serviceUrl);
- }
- return builder.build();
- });
- }
-
- public T getFeignClient(
- final Class<T> type,
- final Class<?> fallbackFactory,
- final String serviceName,
- final String serviceUrl) {
- String k = serviceName;
- if (StringUtils.isNotEmpty(serviceUrl)) {
- k = serviceUrl;
- }
- return CACHE_BEAN.computeIfAbsent(
- k,
- (t) -> {
- FeignClientFactoryBean feignClientFactoryBean = new
FeignClientFactoryBean();
- feignClientFactoryBean.setFallbackFactory(fallbackFactory);
- FeignClientBuilder.Builder<T> builder =
- this.feignClientBuilder.forType(type, feignClientFactoryBean,
serviceName);
- if (StringUtils.isNotEmpty(serviceUrl)) {
- builder.url(serviceUrl);
- }
- return builder.build();
- });
- }
-
- public T getFeignClient(
- final Class<T> type,
- final FeignClientFactoryBean clientFactoryBean,
- final String serviceName,
- final String serviceUrl) {
- String k = serviceName;
- if (StringUtils.isNotEmpty(serviceUrl)) {
- k = serviceUrl;
- }
- return CACHE_BEAN.computeIfAbsent(
- k,
- (t) -> {
- FeignClientBuilder.Builder<T> builder =
- this.feignClientBuilder.forType(type, clientFactoryBean,
serviceName);
- if (StringUtils.isNotEmpty(serviceUrl)) {
- builder.url(serviceUrl);
- }
- return builder.build();
- });
- }
-
- private T getFromCache(final String serviceName, final String serviceUrl) {
- if (StringUtils.isNotEmpty(serviceUrl)) {
- return CACHE_BEAN.get(serviceUrl);
- } else {
- return CACHE_BEAN.get(serviceName);
- }
- }
-}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java
deleted file mode 100644
index 7394698672..0000000000
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.rpc.conf;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-import org.springframework.beans.factory.BeanFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ReflectionUtils;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.time.Duration;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component
-@ConditionalOnClass(name =
"org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
-public class EurekaClientCacheManualRefresher implements CacheManualRefresher {
- private static final Logger logger =
- LoggerFactory.getLogger(EurekaClientCacheManualRefresher.class);
- private final AtomicBoolean isRefreshing = new AtomicBoolean(false);
- private final ExecutorService refreshExecutor =
Executors.newSingleThreadExecutor();
- private final String cacheRefreshTaskField = "cacheRefreshTask";
- private Object cacheRefreshTask;
-
- private long lastRefreshMillis = 0;
- private final Duration refreshIntervalDuration = Duration.ofSeconds(3);
-
- @Autowired private BeanFactory beanFactory;
-
- public void refreshOnExceptions(Exception e, List<Class<? extends
Exception>> clazzs) {
- if (null == clazzs || clazzs.size() == 0) {
- throw new IllegalArgumentException();
- }
-
- if (clazzs.stream()
- .anyMatch(
- clazz -> clazz.isInstance(e) ||
clazz.isInstance(ExceptionUtils.getRootCause(e)))) {
- refresh();
- }
- }
-
- public void refresh() {
- if (isRefreshing.compareAndSet(false, true)) {
- refreshExecutor.execute(
- () -> {
- try {
- if (System.currentTimeMillis()
- <= lastRefreshMillis + refreshIntervalDuration.toMillis()) {
- logger.warn(
- "Not manually refresh eureka client cache as refresh
interval was not exceeded:{}",
- refreshIntervalDuration.getSeconds());
- return;
- }
-
- String discoveryClientClassName =
"com.netflix.discovery.DiscoveryClient";
- if (null == cacheRefreshTask) {
- Class<?> discoveryClientClass =
Class.forName(discoveryClientClassName);
- Field field =
- ReflectionUtils.findField(discoveryClientClass,
cacheRefreshTaskField);
- if (null != field) {
- ReflectionUtils.makeAccessible(field);
- Object discoveryClient =
beanFactory.getBean(discoveryClientClass);
- cacheRefreshTask = ReflectionUtils.getField(field,
discoveryClient);
- }
- }
-
- if (null == cacheRefreshTask) {
- logger.error(
- "Field ({}) not found in class '{}'",
- cacheRefreshTaskField,
- discoveryClientClassName);
- return;
- }
-
- lastRefreshMillis = System.currentTimeMillis();
- Class<?> timedSupervisorTaskClass =
- Class.forName("com.netflix.discovery.TimedSupervisorTask");
- Method method =
timedSupervisorTaskClass.getDeclaredMethod("run");
- method.setAccessible(true);
- method.invoke(cacheRefreshTask);
- logger.info(
- "Manually refresh eureka client cache
completed(DiscoveryClient.cacheRefreshTask#run())");
- } catch (Exception e) {
- logger.error("An exception occurred when manually refresh eureka
client cache", e);
- } finally {
- isRefreshing.set(false);
- }
- });
- } else {
- logger.warn(
- "Not manually refresh eureka client cache as another thread is
refreshing it already");
- }
- }
-}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java
deleted file mode 100644
index ef1c2dd095..0000000000
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.rpc.conf;
-
-import org.apache.linkis.rpc.BaseRPCSender;
-import org.apache.linkis.rpc.constant.RpcConstant;
-import org.apache.linkis.server.BDPJettyServerHelper;
-import org.apache.linkis.server.Message;
-import org.apache.linkis.server.security.SSOUtils$;
-import org.apache.linkis.server.security.SecurityFilter$;
-
-import org.springframework.stereotype.Component;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import scala.Tuple2;
-
-import feign.RequestInterceptor;
-import feign.RequestTemplate;
-
-@Component
-public class FeignRequestInterceptor implements RequestInterceptor {
-
- @Override
- public void apply(RequestTemplate requestTemplate) {
- Map<String, Collection<String>> headers = new
HashMap<>(requestTemplate.headers());
- headers.put(
- RpcConstant.LINKIS_LOAD_BALANCER_TYPE,
- Arrays.asList(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC));
- Tuple2<String, String> userTicketKV =
-
SSOUtils$.MODULE$.getUserTicketKV(SecurityFilter$.MODULE$.OTHER_SYSTEM_IGNORE_UM_USER());
- headers.put(userTicketKV._1, Arrays.asList(userTicketKV._2));
- try {
- String body =
- new String(
- requestTemplate.body(),
-
org.apache.linkis.common.conf.Configuration.BDP_ENCODING().getValue());
- Message message = BDPJettyServerHelper.gson().fromJson(body,
Message.class);
- headers.put(
- RpcConstant.FIXED_INSTANCE,
Arrays.asList(BaseRPCSender.getFixedInstanceInfo(message)));
- requestTemplate.headers(headers);
- } catch (UnsupportedEncodingException e) {
- }
- }
-}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java
deleted file mode 100644
index db26cd0f2c..0000000000
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.rpc.conf;
-
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.stereotype.Component;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component
-@ConditionalOnClass(name =
"com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration")
-public class NacosClientCacheManualRefresher implements CacheManualRefresher {
- private static final Logger logger =
- LoggerFactory.getLogger(NacosClientCacheManualRefresher.class);
-
- public void refresh() {
- try {
- logger.warn("Failed to obtain nacos metadata. Wait 100 milliseconds");
- Thread.sleep(100L);
- } catch (InterruptedException e) {
-
- }
- }
-}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java
index 3d46661de2..9fd0b81104 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java
+++
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java
@@ -19,9 +19,5 @@ package org.apache.linkis.rpc.constant;
public class RpcConstant {
- public static final String LINKIS_LOAD_BALANCER_TYPE =
"LinkisLoadBalancerType";
-
- public static final String LINKIS_LOAD_BALANCER_TYPE_RPC = "RPC";
-
public static final String FIXED_INSTANCE = "client-ip";
}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java
index b87e730994..a8daece891 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java
+++
b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java
@@ -32,6 +32,8 @@ public enum LinkisRpcErrorCodeSummary implements
LinkisErrorCode {
10051, "The instance:{0} of application {1} does not exist(应用程序:{0}
的实例:{1} 不存在)."),
INSTANCE_ERROR(10052, "The instance:{0} is error should ip:port."),
+
+ INSTANCE_NOT_FOUND_ERROR(10053, "The instance:{0} is not found."),
RPC_INIT_ERROR(10054, "Asyn RPC Consumer Thread has stopped!(Asyn RPC
Consumer 线程已停止!)");
/** 错误码 */
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
index 1c4e43b3cc..149179f8b1 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
@@ -22,28 +22,27 @@ import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.exception.WarnException
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.protocol.Protocol
-import org.apache.linkis.rpc.conf.DynamicFeignClient
import org.apache.linkis.rpc.conf.RPCConfiguration.{
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX,
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX,
BDP_RPC_SENDER_ASYN_QUEUE_CAPACITY
}
-import org.apache.linkis.rpc.constant.RpcConstant
import org.apache.linkis.rpc.interceptor._
import org.apache.linkis.rpc.transform.{RPCConsumer, RPCProduct}
import org.apache.linkis.server.Message
-
-import org.apache.commons.lang3.StringUtils
+import org.apache.linkis.server.conf.ServerConfiguration
import java.util
import scala.concurrent.duration.Duration
import scala.runtime.BoxedUnit
+import feign.{Feign, Retryer}
+import feign.slf4j.Slf4jLogger
+
private[rpc] class BaseRPCSender extends Sender with Logging {
private var name: String = _
private var rpc: RPCReceiveRemote = _
- private var dynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = _
protected def getRPCInterceptors: Array[RPCInterceptor] = Array.empty
@@ -68,21 +67,18 @@ private[rpc] class BaseRPCSender extends Sender with
Logging {
rpc
}
- private def getDynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = {
- if (dynamicFeignClient == null) this synchronized {
- if (dynamicFeignClient == null) dynamicFeignClient = new
DynamicFeignClient()
- }
- dynamicFeignClient
- }
-
private[rpc] def getApplicationName = name
- def getSenderInstance(): String = {
- null
- }
+ protected def doBuilder(builder: Feign.Builder): Unit =
+ builder.retryer(Retryer.NEVER_RETRY)
protected def newRPC: RPCReceiveRemote = {
- getDynamicFeignClient.getFeignClient(classOf[RPCReceiveRemote], name)
+ val builder = Feign.builder.logger(new
Slf4jLogger()).logLevel(feign.Logger.Level.FULL)
+ doBuilder(builder)
+ var url = if (name.startsWith("http://")) name else "http://" + name
+ if (url.endsWith("/")) url = url.substring(0, url.length - 1)
+ url += ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue
+ builder.target(classOf[RPCReceiveRemote], url)
}
private def execute(message: Any)(op: => Any): Any = message match {
@@ -94,9 +90,6 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
override def ask(message: Any): Any = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
- if (StringUtils.isNotBlank(getSenderInstance())) {
- BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
- }
BaseRPCSender.addInstanceInfo(msg.getData)
val response = getRPC.receiveAndReply(msg)
RPCConsumer.getRPCConsumer.toObject(response)
@@ -105,9 +98,6 @@ private[rpc] class BaseRPCSender extends Sender with Logging
{
override def ask(message: Any, timeout: Duration): Any = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
msg.data("duration", timeout.toMillis)
- if (StringUtils.isNotBlank(getSenderInstance())) {
- BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
- }
BaseRPCSender.addInstanceInfo(msg.getData)
val response = getRPC.receiveAndReplyInMills(msg)
RPCConsumer.getRPCConsumer.toObject(response)
@@ -115,9 +105,6 @@ private[rpc] class BaseRPCSender extends Sender with
Logging {
private def sendIt(message: Any, op: Message => Message): Unit =
execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
- if (StringUtils.isNotBlank(getSenderInstance())) {
- BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
- }
BaseRPCSender.addInstanceInfo(msg.getData)
RPCConsumer.getRPCConsumer.toObject(op(msg)) match {
case w: WarnException => logger.warn("RPC requests an
alarm!(RPC请求出现告警!)", w)
@@ -188,16 +175,4 @@ private[rpc] object BaseRPCSender extends Logging {
ServiceInstance(name, instance)
}
- def addFixedInstanceInfo(map: util.Map[String, Object], fixedInstance:
String): Unit = {
- map.put(RpcConstant.FIXED_INSTANCE, fixedInstance)
- }
-
- def getFixedInstanceInfo(message: Message): String = {
- if (null != message && null != message.getData) {
- message.getData.getOrDefault(RpcConstant.FIXED_INSTANCE,
null).asInstanceOf[String]
- } else {
- null
- }
- }
-
}
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala
index 458ada9308..c539652d31 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala
@@ -23,22 +23,13 @@ import
org.springframework.web.bind.annotation.{RequestBody, RequestMapping, Req
private[rpc] trait RPCReceiveRemote {
- @RequestMapping(
- value = Array("${spring.mvc.servlet.path}/rpc/receive"),
- method = Array(RequestMethod.POST)
- )
+ @RequestMapping(value = Array("/rpc/receive"), method =
Array(RequestMethod.POST))
def receive(@RequestBody message: Message): Message
- @RequestMapping(
- value = Array("${spring.mvc.servlet.path}/rpc/receiveAndReply"),
- method = Array(RequestMethod.POST)
- )
+ @RequestMapping(value = Array("/rpc/receiveAndReply"), method =
Array(RequestMethod.POST))
def receiveAndReply(@RequestBody message: Message): Message
- @RequestMapping(
- value = Array("${spring.mvc.servlet.path}/rpc/replyInMills"),
- method = Array(RequestMethod.POST)
- )
+ @RequestMapping(value = Array("/rpc/replyInMills"), method =
Array(RequestMethod.POST))
def receiveAndReplyInMills(@RequestBody message: Message): Message
}
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
index ff542aaad5..00fa019d99 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
@@ -33,7 +33,6 @@ private[rpc] object RPCSpringBeanCache extends Logging {
private var rpcServerLoader: RPCServerLoader = _
private var senderBuilders: Array[BroadcastSenderBuilder] = _
private var rpcReceiveRestful: RPCReceiveRestful = _
- private var rpcReceiveRemote: RPCReceiveRemote = _
def registerReceiver(receiverName: String, receiver: Receiver): Unit = {
if (beanNameToReceivers == null) {
@@ -64,13 +63,6 @@ private[rpc] object RPCSpringBeanCache extends Logging {
rpcReceiveRestful
}
- def getRPCReceiveRemote: RPCReceiveRemote = {
- if (rpcReceiveRemote == null) {
- rpcReceiveRemote =
getApplicationContext.getBean(classOf[RPCReceiveRemote])
- }
- rpcReceiveRemote
- }
-
private[rpc] def getReceivers: util.Map[String, Receiver] = {
if (beanNameToReceivers == null) {
beanNameToReceivers =
getApplicationContext.getBeansOfType(classOf[Receiver])
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala
index 4faeaa180e..d835eef328 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala
@@ -42,25 +42,15 @@ import feign.RetryableException
class RetryableRPCInterceptor extends RPCInterceptor {
override val order: Int = 20
-// private val commonRetryHandler = new RPCRetryHandler
-// commonRetryHandler.setRetryInfo(new RetryableProtocol{})
-//
-// private def isCommonRetryHandler(retry: RetryableProtocol): Boolean =
retry.maxPeriod == commonRetryHandler.getRetryMaxPeriod &&
-// retry.period == commonRetryHandler.getRetryPeriod && retry.retryNum ==
commonRetryHandler.getRetryNum &&
-// (retry.retryExceptions.isEmpty ||
commonRetryHandler.getRetryExceptions.containsSlice(retry.retryExceptions))
-
override def intercept(
interceptorExchange: RPCInterceptorExchange,
chain: RPCInterceptorChain
): Any = interceptorExchange.getProtocol match {
case retry: RetryableProtocol =>
val retryName = retry.getClass.getSimpleName
-// if(isCommonRetryHandler(retry))
commonRetryHandler.retry(chain.handle(interceptorExchange), retryName)
-// else {
val retryHandler = new RPCRetryHandler
retryHandler.setRetryInfo(retry, chain)
retryHandler.retry(chain.handle(interceptorExchange), retryName)
-// }
case _ => chain.handle(interceptorExchange)
}
diff --git
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
index 1aae1f0cf3..ae1070865a 100644
---
a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
+++
b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
@@ -18,17 +18,21 @@
package org.apache.linkis.rpc.sender
import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.common.utils.Logging
import org.apache.linkis.rpc.{BaseRPCSender, RPCMessageEvent,
RPCSpringBeanCache}
import org.apache.linkis.rpc.interceptor.{RPCInterceptor,
ServiceInstanceRPCInterceptorChain}
+import org.apache.linkis.server.conf.ServerConfiguration
import org.apache.commons.lang3.StringUtils
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.core.env.Environment
+import feign._
private[rpc] class SpringMVCRPCSender private[rpc] (
private[rpc] val serviceInstance: ServiceInstance
-) extends BaseRPCSender(serviceInstance.getApplicationName) {
+) extends BaseRPCSender(serviceInstance.getApplicationName)
+ with Logging {
+
+ import SpringCloudFeignConfigurationCache._
override protected def getRPCInterceptors: Array[RPCInterceptor] =
RPCSpringBeanCache.getRPCInterceptors
@@ -36,8 +40,38 @@ private[rpc] class SpringMVCRPCSender private[rpc] (
override protected def createRPCInterceptorChain() =
new ServiceInstanceRPCInterceptorChain(0, getRPCInterceptors,
serviceInstance)
- @Autowired
- private var env: Environment = _
+ /**
+ * If it's a random call, you don't need to set target specify instance,need
to specify target and
+ * do not set client setting
+ * @param builder
+ */
+ override protected def doBuilder(builder: Feign.Builder): Unit = {
+ if (serviceInstance != null &&
StringUtils.isNotBlank(serviceInstance.getInstance)) {
+ builder.requestInterceptor(new RequestInterceptor() {
+ def apply(template: RequestTemplate): Unit = {
+ template.target(
+
s"http://${serviceInstance.getInstance}${ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue}"
+ )
+ }
+ })
+ }
+ super.doBuilder(builder)
+ if (StringUtils.isBlank(serviceInstance.getInstance)) {
+ builder
+ .contract(getContract)
+ .encoder(getEncoder)
+ .decoder(getDecoder)
+ .client(getClient)
+ .requestInterceptor(getRPCTicketIdRequestInterceptor)
+ } else {
+ builder
+ .contract(getContract)
+ .encoder(getEncoder)
+ .decoder(getDecoder)
+ .requestInterceptor(getRPCTicketIdRequestInterceptor)
+ }
+
+ }
/**
* Deliver is an asynchronous method that requests the target microservice
asynchronously,
@@ -66,12 +100,4 @@ private[rpc] class SpringMVCRPCSender private[rpc] (
s"RPCSender(${serviceInstance.getApplicationName})"
} else s"RPCSender($getApplicationName, ${serviceInstance.getInstance})"
- override def getSenderInstance(): String = {
- if (null != serviceInstance) {
- serviceInstance.getInstance
- } else {
- null
- }
- }
-
}
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala
index df197ddb2c..829a967aab 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.governance.common.protocol.job
import org.apache.linkis.governance.common.entity.job.JobRequest
+import org.apache.linkis.protocol.RetryableProtocol
import org.apache.linkis.protocol.message.RequestProtocol
import java.util
@@ -25,7 +26,7 @@ import java.util.Date
import scala.beans.BeanProperty
-trait JobReq extends RequestProtocol
+trait JobReq extends RequestProtocol with RetryableProtocol
case class JobReqInsert(jobReq: JobRequest) extends JobReq
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala
index 4d0b8952ca..17c01fcfc2 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.governance.common.protocol.task
import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.protocol.RetryableProtocol
import org.apache.linkis.protocol.message.RequestProtocol
import java.util
@@ -91,7 +92,7 @@ trait TaskState extends RequestProtocol {}
case class RequestTaskPause(execId: String) extends TaskState
case class RequestTaskResume(execId: String) extends TaskState
-case class RequestTaskKill(execId: String) extends TaskState
+case class RequestTaskKill(execId: String) extends TaskState with
RetryableProtocol
/**
* The status of requesting job execution, mainly used for:<br>
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala
index 971bdf247b..ef1355d580 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala
@@ -25,5 +25,4 @@ import org.apache.linkis.protocol.message.RequestProtocol
* @param pid
*/
case class ResponseEngineConnPid(serviceInstance: ServiceInstance, pid:
String, ticketId: String)
- extends RetryableProtocol
- with RequestProtocol
+ extends RequestProtocol
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala
index b136c61099..a4a7837da0 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala
@@ -30,8 +30,7 @@ case class ResponseTaskProgress(
execId: String,
progress: Float,
progressInfo: Array[JobProgressInfo]
-) extends RetryableProtocol
- with RequestProtocol
+) extends RequestProtocol
case class ResponseEngineLock(lock: String)
@@ -67,9 +66,7 @@ case class ResponseEngineStatus(
engineInfo: ResponseEngineInfo
)
-case class ResponseTaskLog(execId: String, log: String)
- extends RetryableProtocol
- with RequestProtocol
+case class ResponseTaskLog(execId: String, log: String) extends RequestProtocol
case class ResponseTaskError(execId: String, errorMsg: String)
extends RetryableProtocol
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 6b4fc64fe6..9dba95ef66 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -154,8 +154,7 @@ class TaskExecutionServiceImpl
sender = Sender.getSender(task.getCallbackServiceInstance())
sender.send(msg)
} else {
- // todo
- logger.debug("SendtoEntrance error, cannot find entrance instance.")
+ logger.warn("SendtoEntrance error, cannot find entrance instance.")
}
} { t =>
val errorMsg = s"SendToEntrance error. $msg" + t.getCause
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala
index 3b3005fee6..8bcc79b410 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala
@@ -22,7 +22,7 @@ import org.apache.linkis.protocol.message.RequestProtocol
import java.util
-trait EngineResourceRequest extends RequestProtocol {
+trait EngineResourceRequest {
val user: String
val labels: util.List[Label[_]]
val properties: util.Map[String, String]
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/impl/EnginePluginAdminServiceImpl.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/impl/EnginePluginAdminServiceImpl.java
index 069afd4f1b..603641fa78 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/impl/EnginePluginAdminServiceImpl.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/impl/EnginePluginAdminServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.linkis.bml.client.BmlClient;
import org.apache.linkis.bml.client.BmlClientFactory;
import org.apache.linkis.bml.protocol.BmlResourceVersionsResponse;
import org.apache.linkis.bml.protocol.Version;
+import org.apache.linkis.common.utils.SecurityUtils;
import org.apache.linkis.common.utils.ZipUtils;
import org.apache.linkis.engineplugin.server.dao.EngineConnBmlResourceDao;
import org.apache.linkis.engineplugin.server.entity.EngineConnBmlResource;
@@ -28,6 +29,8 @@ import
org.apache.linkis.engineplugin.server.localize.DefaultEngineConnBmlResour
import org.apache.linkis.engineplugin.server.restful.EnginePluginRestful;
import org.apache.linkis.engineplugin.server.service.EnginePluginAdminService;
import org.apache.linkis.engineplugin.vo.EnginePluginBMLVo;
+import org.apache.linkis.manager.am.exception.AMErrorCode;
+import org.apache.linkis.manager.am.exception.AMErrorException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -37,6 +40,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
+import java.text.MessageFormat;
import java.util.List;
import com.github.pagehelper.PageHelper;
@@ -79,6 +83,11 @@ public class EnginePluginAdminServiceImpl implements
EnginePluginAdminService {
@Override
public void deleteEnginePluginBML(String ecType, String version, String
username) {
List<EngineConnBmlResource> allEngineConnBmlResource = null;
+ if (ecType != null && SecurityUtils.containsRelativePath(ecType)) {
+ throw new AMErrorException(
+ AMErrorCode.EC_PLUGIN_ERROR.getErrorCode(),
+ MessageFormat.format(AMErrorCode.EC_PLUGIN_ERROR.getErrorDesc(),
ecType));
+ }
try {
allEngineConnBmlResource =
engineConnBmlResourceDao.getAllEngineConnBmlResource(ecType,
version);
@@ -88,7 +97,9 @@ public class EnginePluginAdminServiceImpl implements
EnginePluginAdminService {
engineConnBmlResourceDao.delete(engineConnBmlResource);
});
String engineConnsHome =
defaultEngineConnBmlResourceGenerator.getEngineConnsHome();
+
File file = new File(engineConnsHome + "/" + ecType);
+
if (file.exists()) {
deleteDir(file);
log.info("file {} delete success", ecType);
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
index c05768739c..cc8997d857 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
@@ -32,7 +32,8 @@ public enum AMErrorCode implements LinkisErrorCode {
ASK_ENGINE_ERROR_RETRY(210005, "Ask engine error, retry(请求引擎失败,重试)"),
- EC_OPERATE_ERROR(210006, "Failed to execute operation(引擎操作失败)");
+ EC_OPERATE_ERROR(210006, "Failed to execute operation(引擎操作失败)"),
+ EC_PLUGIN_ERROR(210007, "ECType {0} contains RelativePath");
private final int errorCode;
diff --git
a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
index a49613f8d1..38299081d8 100644
---
a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
+++
b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
@@ -192,6 +192,7 @@ public class ConnectionManager {
datasource.setUrl(dbUrl);
datasource.setUsername(username);
datasource.setPassword(password);
+ datasource.setConnectProperties(SecurityUtils.getMysqlSecurityParams());
datasource.setDriverClassName(driverClassName);
datasource.setInitialSize(initialSize);
datasource.setMinIdle(minIdle);
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java
index d7559aa5cf..0844ce2cdf 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-datasource-manager/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java
@@ -158,7 +158,10 @@ public class SqlConnection extends AbstractSqlConnection {
url += "?" + extraParamString;
}
LOG.info("jdbc connection url: {}", url);
- return DriverManager.getConnection(url, connectMessage.username,
connectMessage.password);
+ Properties properties = SecurityUtils.getMysqlSecurityParams();
+ properties.setProperty("user", connectMessage.username);
+ properties.setProperty("password", connectMessage.password);
+ return DriverManager.getConnection(url, properties);
}
public String getSqlConnectUrl() {
diff --git
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/ConfigurationTemplateRestfulApi.java
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/ConfigurationTemplateRestfulApi.java
index d6db4f7571..657d4531f3 100644
---
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/ConfigurationTemplateRestfulApi.java
+++
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/ConfigurationTemplateRestfulApi.java
@@ -21,6 +21,7 @@ import
org.apache.linkis.basedatamanager.server.domain.ConfigurationConfigKey;
import
org.apache.linkis.basedatamanager.server.request.ConfigurationTemplateSaveRequest;
import org.apache.linkis.basedatamanager.server.response.EngineLabelResponse;
import
org.apache.linkis.basedatamanager.server.service.ConfigurationTemplateService;
+import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;
@@ -49,7 +50,11 @@ public class ConfigurationTemplateRestfulApi {
@RequestMapping(path = "/save", method = RequestMethod.POST)
public Message add(
HttpServletRequest httpRequest, @RequestBody
ConfigurationTemplateSaveRequest request) {
- ModuleUserUtils.getOperationUser(httpRequest, "save a configuration
template");
+ String username =
+ ModuleUserUtils.getOperationUser(httpRequest, "save a configuration
template");
+ if (!Configuration.isAdmin(username)) {
+ return Message.error("User '" + username + "' is not admin
user[非管理员用户]");
+ }
if (Objects.isNull(request)
|| StringUtils.isEmpty(request.getEngineLabelId())
|| StringUtils.isEmpty(request.getKey())
@@ -67,8 +72,12 @@ public class ConfigurationTemplateRestfulApi {
@ApiOperation(value = "delete", notes = "delete a configuration template",
httpMethod = "DELETE")
@RequestMapping(path = "/{keyId}", method = RequestMethod.DELETE)
public Message delete(HttpServletRequest httpRequest, @PathVariable("keyId")
Long keyId) {
- ModuleUserUtils.getOperationUser(
- httpRequest, "delete a configuration template, keyId: " + keyId);
+ String username =
+ ModuleUserUtils.getOperationUser(
+ httpRequest, "delete a configuration template, keyId: " + keyId);
+ if (!Configuration.isAdmin(username)) {
+ return Message.error("User '" + username + "' is not admin
user[非管理员用户]");
+ }
Boolean flag =
configurationTemplateService.deleteConfigurationTemplate(keyId);
return Message.ok("").data("success: ", flag);
}
diff --git
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceAccessRestfulApi.java
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceAccessRestfulApi.java
index 7d39d493bd..4b0df0051d 100644
---
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceAccessRestfulApi.java
+++
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceAccessRestfulApi.java
@@ -107,8 +107,14 @@ public class DatasourceAccessRestfulApi {
httpMethod = "DELETE")
@RequestMapping(path = "/{id}", method = RequestMethod.DELETE)
public Message remove(HttpServletRequest request, @PathVariable("id") Long
id) {
- ModuleUserUtils.getOperationUser(
- request, "Remove a Datasource Access Record,id:" + id.toString());
+ String username =
+ ModuleUserUtils.getOperationUser(
+ request, "Remove a Datasource Access Record,id:" + id.toString());
+
+ if (!Configuration.isAdmin(username)) {
+ return Message.error("User '" + username + "' is not admin
user[非管理员用户]");
+ }
+
boolean result = datasourceAccessService.removeById(id);
return Message.ok("").data("result", result);
}
diff --git
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceEnvRestfulApi.java
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceEnvRestfulApi.java
index cf2953b0e4..e6029c761c 100644
---
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceEnvRestfulApi.java
+++
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceEnvRestfulApi.java
@@ -97,7 +97,12 @@ public class DatasourceEnvRestfulApi {
httpMethod = "DELETE")
@RequestMapping(path = "/{id}", method = RequestMethod.DELETE)
public Message remove(HttpServletRequest request, @PathVariable("id") Long
id) {
- ModuleUserUtils.getOperationUser(request, "Remove a Datasource Env
Record,id:" + id.toString());
+ String username =
+ ModuleUserUtils.getOperationUser(
+ request, "Remove a Datasource Env Record,id:" + id.toString());
+ if (!Configuration.isAdmin(username)) {
+ return Message.error("User '" + username + "' is not admin
user[非管理员用户]");
+ }
boolean result = datasourceEnvService.removeById(id);
return Message.ok("").data("result", result);
}
diff --git
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeKeyRestfulApi.java
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeKeyRestfulApi.java
index 557c753227..b8ef65df5b 100644
---
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeKeyRestfulApi.java
+++
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeKeyRestfulApi.java
@@ -99,8 +99,12 @@ public class DatasourceTypeKeyRestfulApi {
httpMethod = "DELETE")
@RequestMapping(path = "/{id}", method = RequestMethod.DELETE)
public Message remove(HttpServletRequest request, @PathVariable("id") Long
id) {
- ModuleUserUtils.getOperationUser(
- request, "Remove a Datasource Type Key Record,id:" + id.toString());
+ String username =
+ ModuleUserUtils.getOperationUser(
+ request, "Remove a Datasource Type Key Record,id:" +
id.toString());
+ if (!Configuration.isAdmin(username)) {
+ return Message.error("User '" + username + "' is not admin
user[非管理员用户]");
+ }
boolean result = datasourceTypeKeyService.removeById(id);
return Message.ok("").data("result", result);
}
diff --git
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeRestfulApi.java
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeRestfulApi.java
index 9fc8ea9d73..3c47d9385f 100644
---
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeRestfulApi.java
+++
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/DatasourceTypeRestfulApi.java
@@ -95,8 +95,13 @@ public class DatasourceTypeRestfulApi {
httpMethod = "DELETE")
@RequestMapping(path = "/{id}", method = RequestMethod.DELETE)
public Message remove(HttpServletRequest request, @PathVariable("id") Long
id) {
- ModuleUserUtils.getOperationUser(
- request, "Remove a Datasource Type Record,id:" + id.toString());
+ String username =
+ ModuleUserUtils.getOperationUser(
+ request, "Remove a Datasource Type Record,id:" + id.toString());
+ if (!Configuration.isAdmin(username)) {
+ return Message.error("User '" + username + "' is not admin
user[非管理员用户]");
+ }
+
boolean result = datasourceTypeService.removeById(id);
return Message.ok("").data("result", result);
}
diff --git
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/GatewayAuthTokenRestfulApi.java
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/GatewayAuthTokenRestfulApi.java
index 7d5668c074..d55a8e258a 100644
---
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/GatewayAuthTokenRestfulApi.java
+++
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/basedatamanager/server/restful/GatewayAuthTokenRestfulApi.java
@@ -55,9 +55,17 @@ public class GatewayAuthTokenRestfulApi {
@RequestMapping(path = "", method = RequestMethod.GET)
public Message list(
HttpServletRequest request, String searchName, Integer currentPage,
Integer pageSize) {
- ModuleUserUtils.getOperationUser(
- request, "Query list data of Gateway Auth Token,search name:" +
searchName);
+
+ String username =
+ ModuleUserUtils.getOperationUser(
+ request, "Query list data of Gateway Auth Token,search name:" +
searchName);
+
+ if (!Configuration.isAdmin(username)) {
+ return Message.error("User '" + username + "' is not admin
user[非管理员用户]");
+ }
+
PageInfo pageList = gatewayAuthTokenService.getListByPage(searchName,
currentPage, pageSize);
+
return Message.ok("").data("list", pageList);
}
@@ -65,8 +73,14 @@ public class GatewayAuthTokenRestfulApi {
@ApiOperation(value = "get", notes = "Get a Gateway Auth Token Record by
id", httpMethod = "GET")
@RequestMapping(path = "/{id}", method = RequestMethod.GET)
public Message get(HttpServletRequest request, @PathVariable("id") Long id) {
- ModuleUserUtils.getOperationUser(
- request, "Get a Gateway Auth Token Record,id:" + id.toString());
+
+ String username =
+ ModuleUserUtils.getOperationUser(
+ request, "Get a Gateway Auth Token Record,id:" + id.toString());
+
+ if (!Configuration.isAdmin(username)) {
+ return Message.error("User '" + username + "' is not admin
user[非管理员用户]");
+ }
GatewayAuthTokenEntity gatewayAuthToken =
gatewayAuthTokenService.getById(id);
return Message.ok("").data("item", gatewayAuthToken);
}
diff --git
a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/authentication/StaticAuthenticationStrategy.scala
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/authentication/StaticAuthenticationStrategy.scala
index 2bd50a1ac8..a684152ad9 100644
---
a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/authentication/StaticAuthenticationStrategy.scala
+++
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/authentication/StaticAuthenticationStrategy.scala
@@ -101,12 +101,28 @@ class StaticAuthenticationStrategy(override protected val
sessionMaxAliveTime: L
override def isTimeout(authentication: Authentication): Boolean =
System.currentTimeMillis() - authentication.getLastAccessTime >=
serverSessionTimeout
+ /**
+ * Forced login needs to consider the situation of multiple calls at the
same time. If there are
+ * simultaneous calls, it should not be updated. request time < last
creatTime and last createTime
+ * - currentTime < 1s
+ * @param requestAction
+ * @param serverUrl
+ * @return
+ */
override def enforceLogin(requestAction: Action, serverUrl: String):
Authentication = {
val key = getKey(requestAction, serverUrl)
if (key == null) return null
+ val requestTime = System.currentTimeMillis()
key.intern() synchronized {
- val authentication = tryLogin(requestAction, serverUrl)
- putSession(key, authentication)
+ var authentication = getAuthenticationActionByKey(key)
+ if (
+ authentication == null || (authentication.getCreateTime <
requestTime && (System
+ .currentTimeMillis() - authentication.getCreateTime) > 1000)
+ ) {
+ authentication = tryLogin(requestAction, serverUrl)
+ putSession(key, authentication)
+ logger.info(s"$key try enforceLogin")
+ }
authentication
}
}
diff --git
a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/response/DWSAuthenticationResult.scala
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/response/DWSAuthenticationResult.scala
index 6ea3fc1673..6d714a3cdd 100644
---
a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/response/DWSAuthenticationResult.scala
+++
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-httpclient-support/src/main/scala/org/apache/linkis/httpclient/dws/response/DWSAuthenticationResult.scala
@@ -52,6 +52,8 @@ class DWSAuthenticationResult(response: HttpResponse,
serverUrl: String)
override def getAuthentication: Authentication = if (getStatus == 0) new
HttpAuthentication {
private var lastAccessTime: Long = System.currentTimeMillis
+ private val createTime: Long = System.currentTimeMillis()
+
override def authToCookies: Array[Cookie] = Array.empty
override def authToHeaders: util.Map[String, String] = new
util.HashMap[String, String]()
@@ -61,6 +63,9 @@ class DWSAuthenticationResult(response: HttpResponse,
serverUrl: String)
override def getLastAccessTime: Long = lastAccessTime
override def updateLastAccessTime(): Unit = lastAccessTime =
System.currentTimeMillis
+
+ override def getCreateTime: Long = createTime
+
}
else {
throw new HttpMessageParseException(
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java
similarity index 51%
rename from
linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java
rename to
linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java
index 2f81ba84d5..206b31ccf5 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java
+++
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java
@@ -15,21 +15,14 @@
* limitations under the License.
*/
-package org.apache.linkis.rpc.loadbalancer;
+package org.apache.linkis.gateway.springcloud.http;
-import org.apache.linkis.rpc.conf.CacheManualRefresher;
import org.apache.linkis.rpc.constant.RpcConstant;
-import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary;
-import org.apache.linkis.rpc.exception.NoInstanceExistsException;
-import org.apache.linkis.rpc.sender.SpringCloudFeignConfigurationCache$;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.ObjectProvider;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.*;
import
org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
@@ -37,38 +30,26 @@ import
org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBal
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
-import java.text.MessageFormat;
import java.util.List;
import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadLocalRandom;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
-public class ServiceInstancePriorityLoadBalancer implements
ReactorServiceInstanceLoadBalancer {
+public class IpPriorityLoadBalancer implements
ReactorServiceInstanceLoadBalancer {
- private static final Log log =
LogFactory.getLog(ServiceInstancePriorityLoadBalancer.class);
-
- @Autowired private CacheManualRefresher cacheManualRefresher;
+ private static final Logger logger =
LoggerFactory.getLogger(IpPriorityLoadBalancer.class);
private final String serviceId;
-
- final AtomicInteger position;
private final ObjectProvider<ServiceInstanceListSupplier>
serviceInstanceListSupplierProvider;
- public ServiceInstancePriorityLoadBalancer(
- ObjectProvider<ServiceInstanceListSupplier>
serviceInstanceListSupplierProvider,
- String serviceId) {
- this(serviceInstanceListSupplierProvider, serviceId, (new
Random()).nextInt(1000));
- }
-
- public ServiceInstancePriorityLoadBalancer(
- ObjectProvider<ServiceInstanceListSupplier>
serviceInstanceListSupplierProvider,
+ public IpPriorityLoadBalancer(
String serviceId,
- int seedPosition) {
+ ObjectProvider<ServiceInstanceListSupplier>
serviceInstanceListSupplierProvider) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider =
serviceInstanceListSupplierProvider;
- this.position = new AtomicInteger(seedPosition);
}
@Override
@@ -84,47 +65,15 @@ public class ServiceInstancePriorityLoadBalancer implements
ReactorServiceInstan
return supplier
.get(request)
.next()
- .map(
- serviceInstances ->
- processInstanceResponse(request, supplier, serviceInstances,
clientIp));
+ .map(serviceInstances -> processInstanceResponse(supplier,
serviceInstances, clientIp));
}
private Response<ServiceInstance> processInstanceResponse(
- Request request,
ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances,
String clientIp) {
Response<ServiceInstance> serviceInstanceResponse =
getInstanceResponse(serviceInstances, clientIp);
- Long endTtime = System.currentTimeMillis() + 2 * 60 * 1000;
-
- List<String> linkisLoadBalancerTypeList =
- ((RequestDataContext) request.getContext())
- .getClientRequest()
- .getHeaders()
- .get(RpcConstant.LINKIS_LOAD_BALANCER_TYPE);
- String linkisLoadBalancerType =
- CollectionUtils.isNotEmpty(linkisLoadBalancerTypeList)
- ? linkisLoadBalancerTypeList.get(0)
- : null;
-
- while (null == serviceInstanceResponse
- && StringUtils.isNoneBlank(clientIp)
- && isRPC(linkisLoadBalancerType)
- && System.currentTimeMillis() < endTtime) {
- cacheManualRefresher.refresh();
- List<ServiceInstance> instances =
-
SpringCloudFeignConfigurationCache$.MODULE$.discoveryClient().getInstances(serviceId);
- serviceInstanceResponse = getInstanceResponse(instances, clientIp);
- if (null == serviceInstanceResponse) {
- try {
- Thread.sleep(5000L);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
if (supplier instanceof SelectedInstanceCallback &&
serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier)
.selectedServiceInstance(serviceInstanceResponse.getServer());
@@ -132,40 +81,29 @@ public class ServiceInstancePriorityLoadBalancer
implements ReactorServiceInstan
return serviceInstanceResponse;
}
- private boolean isRPC(String linkisLoadBalancerType) {
- return StringUtils.isNotBlank(linkisLoadBalancerType)
- &&
linkisLoadBalancerType.equalsIgnoreCase(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC);
- }
-
private Response<ServiceInstance> getInstanceResponse(
List<ServiceInstance> instances, String clientIp) {
if (instances.isEmpty()) {
- log.warn("No servers available for service: " + serviceId);
- return null;
+ logger.warn("No servers available for service: " + serviceId);
+ return new EmptyResponse();
}
- int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
-
- if (StringUtils.isBlank(clientIp)) {
- return new DefaultResponse(instances.get(pos % instances.size()));
+ if (StringUtils.isEmpty(clientIp)) {
+ return new DefaultResponse(
+
instances.get(ThreadLocalRandom.current().nextInt(instances.size())));
}
String[] ipAndPort = clientIp.split(":");
if (ipAndPort.length != 2) {
- throw new NoInstanceExistsException(
- LinkisRpcErrorCodeSummary.INSTANCE_ERROR.getErrorCode(),
-
MessageFormat.format(LinkisRpcErrorCodeSummary.INSTANCE_ERROR.getErrorDesc(),
clientIp));
+ return new DefaultResponse(
+
instances.get(ThreadLocalRandom.current().nextInt(instances.size())));
}
ServiceInstance chooseInstance = null;
for (ServiceInstance instance : instances) {
if (Objects.equals(ipAndPort[0], instance.getHost())
&& Objects.equals(ipAndPort[1], String.valueOf(instance.getPort())))
{
- chooseInstance = instance;
- break;
+ return new DefaultResponse(instance);
}
}
- if (null == chooseInstance) {
- return null;
- } else {
- return new DefaultResponse(chooseInstance);
- }
+ return new DefaultResponse(
+ instances.get(ThreadLocalRandom.current().nextInt(instances.size())));
}
}
diff --git
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/LinkisLoadBalancerClientConfiguration.java
similarity index 77%
rename from
linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java
rename to
linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/LinkisLoadBalancerClientConfiguration.java
index f4d501dfe4..5c0f07a8d3 100644
---
a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java
+++
b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/LinkisLoadBalancerClientConfiguration.java
@@ -15,25 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.rpc.loadbalancer;
+package org.apache.linkis.gateway.springcloud.http;
import org.springframework.cloud.client.ServiceInstance;
-import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import
org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
-@Configuration
-@LoadBalancerClients(defaultConfiguration =
{LinkisLoadBalancerClientConfiguration.class})
public class LinkisLoadBalancerClientConfiguration {
@Bean
public ReactorLoadBalancer<ServiceInstance> customLoadBalancer(
Environment environment, LoadBalancerClientFactory
loadBalancerClientFactory) {
String name =
environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
- return new ServiceInstancePriorityLoadBalancer(
- loadBalancerClientFactory.getLazyProvider(name,
ServiceInstanceListSupplier.class), name);
+ return new IpPriorityLoadBalancer(
+ name, loadBalancerClientFactory.getLazyProvider(name,
ServiceInstanceListSupplier.class));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]