This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch feat/runtime-manager
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/feat/runtime-manager by this 
push:
     new 781772d  feat:support kubernates.kube.config
781772d is described below

commit 781772dffbd02a40dbff3be6ce8a3d3efc7bc3c2
Author: 2011shenlin <[email protected]>
AuthorDate: Sat Jun 8 16:05:48 2024 +0800

    feat:support kubernates.kube.config
---
 .../adapter/runtime/manager/watch/WatchWorker.java | 10 ++++--
 .../runtime/manager/k8s/api/KubectlService.java    | 40 +++++++++++++++-------
 .../WorkerInstanceRepositoryOnK8STest.java         |  4 +--
 start/pom.xml                                      | 11 ++++++
 4 files changed, 47 insertions(+), 18 deletions(-)

diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
index b507128..69e2c33 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/watch/WatchWorker.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.watch;
 
+import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import java.util.List;
@@ -40,8 +41,8 @@ public class WatchWorker {
 
     @Autowired
     WorkerService workerService;
-//    @Autowired
-//    WorkerInstanceRepository workerInstanceRepository;
+    @Autowired
+    WorkerInstanceRepository workerInstanceRepository;
 
     private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
         new ThreadFactoryImpl(WatchWorker.class.getSimpleName()));
@@ -57,8 +58,11 @@ public class WatchWorker {
                         if (!workerService.isFinalState(worker)) {
                             Map<String, Object> environments = new 
Gson().fromJson(worker.getConfig(), new TypeToken<Map<String, Object>>() {
                             }.getType());
+                            if(environments == null){
+                                 environments = Maps.newHashMap();
+                            }
                             log.info("applyWorkerInstance, workerName: {}, 
workerImageTag: {}, workerResource: {}, environments: {}", worker.getName(), 
worker.getImage(), worker.getResources(), new Gson().toJson(environments));
-//                            
workerInstanceRepository.applyWorkerInstance(worker.getName(), 
worker.getImage(), new Gson().fromJson(worker.getResources(), 
WorkerResource.class), environments);
+                            
workerInstanceRepository.applyWorkerInstance(worker.getName(), 
worker.getImage(), new Gson().fromJson(worker.getResources(), 
WorkerResource.class), environments);
                             workerService.refreshMD5(worker);
                         }
 
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
index a41c8cd..1caed04 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
@@ -25,16 +25,18 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.util.Strings;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
+import io.fabric8.kubernetes.client.Config;
 
 @Slf4j
 @Service
-public class KubectlService implements AutoCloseable{
+public class KubectlService implements AutoCloseable {
 
-    private KubernetesClient client ;
+    private KubernetesClient client;
 
     @Value("${eventbus.cs.accessKey:}")
     private String accessKey;
@@ -45,7 +47,7 @@ public class KubectlService implements AutoCloseable{
     @Value("${conductor.run.env:online}")
     private String env;
 
-    @Value("${kubernates.api.server}")
+    @Value("${kubernates.api.server:}")
     private String apiServer;
 
     @Value("${kubernates.api.version:apps/v1}")
@@ -54,16 +56,18 @@ public class KubectlService implements AutoCloseable{
     @Value("${kubernates.auth.token:}")
     private String oauthToken;
 
-    private final String DEFAULT_KEY= "default";
+    @Value("${kubernates.kube.config:}")
+    private String kubeConfig;
+    private final String DEFAULT_KEY = "default";
 
     private Map<String, KubernetesClient> kubernetesClientMap = new 
ConcurrentHashMap<>();
 
     @PostConstruct
-    public void initClient(){
+    public void initClient() {
         client = getKubernetesClient();
     }
 
-    public KubernetesClient getClient(){
+    public KubernetesClient getClient() {
         return this.client;
     }
 
@@ -82,19 +86,29 @@ public class KubectlService implements AutoCloseable{
     private KubernetesClient getKubernetesClient() {
         Config config = getKubeConfig();
         log.info("connect to api server [{}]", apiServer);
-        return new KubernetesClientBuilder().withConfig(config).build();
+        if (config == null && !Strings.isBlank(kubeConfig)) {
+            System.setProperty(Config.KUBERNETES_KUBECONFIG_FILE, kubeConfig);
+            return new DefaultKubernetesClient();
+        } else {
+            return new KubernetesClientBuilder().withConfig(config).build();
+        }
     }
 
     private Config getKubeConfig() {
-        Config config = new ConfigBuilder()
+        Config config = null;
+        if (StringUtils.isNotBlank(accessKey) && 
StringUtils.isNotBlank(secretKey)) {
+            config = new ConfigBuilder()
                 .withMasterUrl(apiServer)
                 .withApiVersion(apiVersion)
                 .build();
-        if (StringUtils.isNotBlank(accessKey) && 
StringUtils.isNotBlank(secretKey)) {
             config.setUsername(accessKey);
             config.setPassword(secretKey);
             log.info("use ak and sk connect to api server.");
-        } else if(StringUtils.isNotBlank(oauthToken)){
+        } else if (StringUtils.isNotBlank(oauthToken)) {
+            config = new ConfigBuilder()
+                .withMasterUrl(apiServer)
+                .withApiVersion(apiVersion)
+                .build();
             config.setTrustCerts(true);
             config.setOauthToken(oauthToken);
             log.info("use auth token connect to api server.");
@@ -107,14 +121,14 @@ public class KubectlService implements AutoCloseable{
     @Override
     public void close() throws Exception {
 
-        if(client != null) {
+        if (client != null) {
             client.close();
         }
 
         if (!kubernetesClientMap.isEmpty()) {
-            for(String clientId : kubernetesClientMap.keySet()) {
+            for (String clientId : kubernetesClientMap.keySet()) {
                 KubernetesClient kubernetesClient = 
kubernetesClientMap.get(clientId);
-                if(kubernetesClient != null){
+                if (kubernetesClient != null) {
                     kubernetesClient.close();
                 }
             }
diff --git 
a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
 
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
index 2259c72..a0c18cb 100644
--- 
a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
+++ 
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertEquals;
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = WorkerInstanceRepositoryOnK8STest.class)
 @SpringBootApplication(scanBasePackages = 
{"org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.*"})
-@Ignore
+//@Ignore
 public class WorkerInstanceRepositoryOnK8STest {
 
     @Autowired
@@ -52,7 +52,7 @@ public class WorkerInstanceRepositoryOnK8STest {
         Map<String, Object> environments = Maps.newHashMap();
         environments.put("key1", "value1");
         environments.put("key2", "value2");
-        workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", 
"apache/rocketmq-eventbridge:1.1.0", new 
Gson().fromJson("{\"cpu\":100,\"memory\":100}", WorkerResource.class), 
environments);
+        workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-3", 
"apache/rocketmq-eventbridge:1.1.0", new 
Gson().fromJson("{\"cpu\":100,\"memory\":100}", WorkerResource.class), 
environments);
     }
 
     @Test
diff --git a/start/pom.xml b/start/pom.xml
index bfed07f..063b238 100644
--- a/start/pom.xml
+++ b/start/pom.xml
@@ -45,6 +45,12 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-eventbridge-adapter-storage</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>okhttp</artifactId>
+                    <groupId>com.squareup.okhttp3</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
@@ -94,6 +100,11 @@
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            
<artifactId>rocketmq-eventbridge-adapter-runtime-on-k8s</artifactId>
+            <version>1.0.0</version>
+        </dependency>
     </dependencies>
 
     <build>

Reply via email to