This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch feat/runtime-manager
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/feat/runtime-manager by this
push:
new 781772d feat:support kubernates.kube.config
781772d is described below
commit 781772dffbd02a40dbff3be6ce8a3d3efc7bc3c2
Author: 2011shenlin <[email protected]>
AuthorDate: Sat Jun 8 16:05:48 2024 +0800
feat:support kubernates.kube.config
---
.../adapter/runtime/manager/watch/WatchWorker.java | 10 ++++--
.../runtime/manager/k8s/api/KubectlService.java | 40 +++++++++++++++-------
.../WorkerInstanceRepositoryOnK8STest.java | 4 +--
start/pom.xml | 11 ++++++
4 files changed, 47 insertions(+), 18 deletions(-)
diff --git
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
index b507128..69e2c33 100644
---
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
+++
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.manager.watch;
+import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.List;
@@ -40,8 +41,8 @@ public class WatchWorker {
@Autowired
WorkerService workerService;
-// @Autowired
-// WorkerInstanceRepository workerInstanceRepository;
+ @Autowired
+ WorkerInstanceRepository workerInstanceRepository;
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl(WatchWorker.class.getSimpleName()));
@@ -57,8 +58,11 @@ public class WatchWorker {
if (!workerService.isFinalState(worker)) {
Map<String, Object> environments = new
Gson().fromJson(worker.getConfig(), new TypeToken<Map<String, Object>>() {
}.getType());
+ if(environments == null){
+ environments = Maps.newHashMap();
+ }
log.info("applyWorkerInstance, workerName: {},
workerImageTag: {}, workerResource: {}, environments: {}", worker.getName(),
worker.getImage(), worker.getResources(), new Gson().toJson(environments));
-//
workerInstanceRepository.applyWorkerInstance(worker.getName(),
worker.getImage(), new Gson().fromJson(worker.getResources(),
WorkerResource.class), environments);
+
workerInstanceRepository.applyWorkerInstance(worker.getName(),
worker.getImage(), new Gson().fromJson(worker.getResources(),
WorkerResource.class), environments);
workerService.refreshMD5(worker);
}
diff --git
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
index a41c8cd..1caed04 100644
---
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
+++
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
@@ -25,16 +25,18 @@ import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.util.Strings;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
+import io.fabric8.kubernetes.client.Config;
@Slf4j
@Service
-public class KubectlService implements AutoCloseable{
+public class KubectlService implements AutoCloseable {
- private KubernetesClient client ;
+ private KubernetesClient client;
@Value("${eventbus.cs.accessKey:}")
private String accessKey;
@@ -45,7 +47,7 @@ public class KubectlService implements AutoCloseable{
@Value("${conductor.run.env:online}")
private String env;
- @Value("${kubernates.api.server}")
+ @Value("${kubernates.api.server:}")
private String apiServer;
@Value("${kubernates.api.version:apps/v1}")
@@ -54,16 +56,18 @@ public class KubectlService implements AutoCloseable{
@Value("${kubernates.auth.token:}")
private String oauthToken;
- private final String DEFAULT_KEY= "default";
+ @Value("${kubernates.kube.config:}")
+ private String kubeConfig;
+ private final String DEFAULT_KEY = "default";
private Map<String, KubernetesClient> kubernetesClientMap = new
ConcurrentHashMap<>();
@PostConstruct
- public void initClient(){
+ public void initClient() {
client = getKubernetesClient();
}
- public KubernetesClient getClient(){
+ public KubernetesClient getClient() {
return this.client;
}
@@ -82,19 +86,29 @@ public class KubectlService implements AutoCloseable{
private KubernetesClient getKubernetesClient() {
Config config = getKubeConfig();
log.info("connect to api server [{}]", apiServer);
- return new KubernetesClientBuilder().withConfig(config).build();
+ if (config == null && !Strings.isBlank(kubeConfig)) {
+ System.setProperty(Config.KUBERNETES_KUBECONFIG_FILE, kubeConfig);
+ return new DefaultKubernetesClient();
+ } else {
+ return new KubernetesClientBuilder().withConfig(config).build();
+ }
}
private Config getKubeConfig() {
- Config config = new ConfigBuilder()
+ Config config = null;
+ if (StringUtils.isNotBlank(accessKey) &&
StringUtils.isNotBlank(secretKey)) {
+ config = new ConfigBuilder()
.withMasterUrl(apiServer)
.withApiVersion(apiVersion)
.build();
- if (StringUtils.isNotBlank(accessKey) &&
StringUtils.isNotBlank(secretKey)) {
config.setUsername(accessKey);
config.setPassword(secretKey);
log.info("use ak and sk connect to api server.");
- } else if(StringUtils.isNotBlank(oauthToken)){
+ } else if (StringUtils.isNotBlank(oauthToken)) {
+ config = new ConfigBuilder()
+ .withMasterUrl(apiServer)
+ .withApiVersion(apiVersion)
+ .build();
config.setTrustCerts(true);
config.setOauthToken(oauthToken);
log.info("use auth token connect to api server.");
@@ -107,14 +121,14 @@ public class KubectlService implements AutoCloseable{
@Override
public void close() throws Exception {
- if(client != null) {
+ if (client != null) {
client.close();
}
if (!kubernetesClientMap.isEmpty()) {
- for(String clientId : kubernetesClientMap.keySet()) {
+ for (String clientId : kubernetesClientMap.keySet()) {
KubernetesClient kubernetesClient =
kubernetesClientMap.get(clientId);
- if(kubernetesClient != null){
+ if (kubernetesClient != null) {
kubernetesClient.close();
}
}
diff --git
a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
index 2259c72..a0c18cb 100644
---
a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
+++
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = WorkerInstanceRepositoryOnK8STest.class)
@SpringBootApplication(scanBasePackages =
{"org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.*"})
-@Ignore
+//@Ignore
public class WorkerInstanceRepositoryOnK8STest {
@Autowired
@@ -52,7 +52,7 @@ public class WorkerInstanceRepositoryOnK8STest {
Map<String, Object> environments = Maps.newHashMap();
environments.put("key1", "value1");
environments.put("key2", "value2");
- workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4",
"apache/rocketmq-eventbridge:1.1.0", new
Gson().fromJson("{\"cpu\":100,\"memory\":100}", WorkerResource.class),
environments);
+ workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-3",
"apache/rocketmq-eventbridge:1.1.0", new
Gson().fromJson("{\"cpu\":100,\"memory\":100}", WorkerResource.class),
environments);
}
@Test
diff --git a/start/pom.xml b/start/pom.xml
index bfed07f..063b238 100644
--- a/start/pom.xml
+++ b/start/pom.xml
@@ -45,6 +45,12 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-eventbridge-adapter-storage</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>okhttp</artifactId>
+ <groupId>com.squareup.okhttp3</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
@@ -94,6 +100,11 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+
<artifactId>rocketmq-eventbridge-adapter-runtime-on-k8s</artifactId>
+ <version>1.0.0</version>
+ </dependency>
</dependencies>
<build>