This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch debugger-sidecar
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 03240679ebbc91af376eb3cee863463d82cbc470
Author: rodrigopardomeza <[email protected]>
AuthorDate: Tue May 18 18:17:06 2021 -0400

    [WAYANG-30] Adjustment to create/delete flink cluster
---
 .../hackit/sidecar/webservice/WebService.java      | 75 +++++++++++++++++++---
 1 file changed, 67 insertions(+), 8 deletions(-)

diff --git 
a/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/webservice/WebService.java
 
b/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/webservice/WebService.java
index 38a5003..93f4818 100644
--- 
a/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/webservice/WebService.java
+++ 
b/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/webservice/WebService.java
@@ -9,12 +9,11 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.wayang.core.util.ReflectionUtils;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.*;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 
 @RestController
 @RequestMapping("/sidecar")
@@ -41,14 +40,18 @@ public class WebService {
     @GetMapping("/debug/start")
     public UUID serviceStart(){
 
-        ProcessBuilder builder_proc1 = new ProcessBuilder("kubectl", "create", 
"-f", "jobmanager.yaml");
-        ProcessBuilder builder_proc2 = new ProcessBuilder("kubectl", "create", 
"-f", "jobmanager-service.yaml");
-        ProcessBuilder builder_proc3 = new ProcessBuilder("kubectl", "create", 
"-f", "taskmanger.yaml");
+        ProcessBuilder builder_proc1 = new ProcessBuilder("kubectl", "create", 
"-f", "volume.yaml");
+        ProcessBuilder builder_proc2 = new ProcessBuilder("kubectl", "create", 
"-f", "claim.yaml");
+        ProcessBuilder builder_proc3 = new ProcessBuilder("kubectl", "create", 
"-f", "jobmanager_new.yaml");
+        ProcessBuilder builder_proc4 = new ProcessBuilder("kubectl", "create", 
"-f", "jobmanager-service.yaml");
+        ProcessBuilder builder_proc5 = new ProcessBuilder("kubectl", "create", 
"-f", "taskmanger_new.yaml");
 
         List<ProcessBuilder> processes = new ArrayList<>();
         processes.add(0, builder_proc1);
         processes.add(1, builder_proc2);
         processes.add(2, builder_proc3);
+        processes.add(3, builder_proc4);
+        processes.add(4, builder_proc5);
 
         UUID ProcessID = ExecutorManager.addThread(processes);
 
@@ -80,12 +83,68 @@ public class WebService {
         return HttpStatus.ACCEPTED;
     }
 
-    @GetMapping("/debug/stop")
-    public HttpStatus serviceStop(){
+    @GetMapping("/debug/plan/run")
+    public HttpStatus planRun(){
+
+        try {
+            // LocalEnvironment
+            ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081);
+            
+            //ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 57261, getJars());
+            //DataSet<String> data = 
env.readTextFile("file:///Users/rodrigopardomeza/flink/count");
+            DataSet<String> data = env.readTextFile("/mnt/example/count");
+
+            data
+                    /*.filter(new FilterFunction<String>() {
+                        public boolean filter(String value) {
+                            return value.startsWith("T");
+                        }
+                    })*/
+                    .writeAsText("/mnt/example/output_z");
+
+            JobExecutionResult res = env.execute();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
 
         return HttpStatus.ACCEPTED;
     }
 
+    private String[] getJars(){
+        List<String> jars = new ArrayList<>(5);
+        List<Class> clazzs = Arrays.asList(new Class[]{this.getClass()});
+
+        clazzs.stream().map(
+                ReflectionUtils::getDeclaringJar
+        ).filter(
+                element -> element != null
+        ).forEach(jars::add);
+
+        return jars.toArray(new String[0]);
+    }
+
+    @GetMapping("/debug/stop")
+    public UUID serviceStop(){
+
+        ProcessBuilder builder_proc1 = new ProcessBuilder("kubectl", "delete", 
"-f", "volume.yaml");
+        ProcessBuilder builder_proc2 = new ProcessBuilder("kubectl", "delete", 
"-f", "claim.yaml");
+        ProcessBuilder builder_proc3 = new ProcessBuilder("kubectl", "delete", 
"-f", "jobmanager_new.yaml");
+        ProcessBuilder builder_proc4 = new ProcessBuilder("kubectl", "delete", 
"-f", "jobmanager-service.yaml");
+        ProcessBuilder builder_proc5 = new ProcessBuilder("kubectl", "delete", 
"-f", "taskmanger_new.yaml");
+
+        List<ProcessBuilder> processes = new ArrayList<>();
+        processes.add(0, builder_proc1);
+        processes.add(1, builder_proc2);
+        processes.add(2, builder_proc3);
+        processes.add(3, builder_proc4);
+        processes.add(4, builder_proc5);
+
+        UUID ProcessID = ExecutorManager.addThread(processes);
+
+        return ProcessID;
+    }
+
     @PostMapping(path = "/debug/check")
     public String checkStatus(
             @RequestParam("key") String key,

Reply via email to