This is an automated email from the ASF dual-hosted git repository. rpardomeza pushed a commit to branch debugger-sidecar in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 03240679ebbc91af376eb3cee863463d82cbc470 Author: rodrigopardomeza <[email protected]> AuthorDate: Tue May 18 18:17:06 2021 -0400 [WAYANG-30] Adjustment to create/delete flink cluster --- .../hackit/sidecar/webservice/WebService.java | 75 +++++++++++++++++++--- 1 file changed, 67 insertions(+), 8 deletions(-) diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/webservice/WebService.java b/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/webservice/WebService.java index 38a5003..93f4818 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/webservice/WebService.java +++ b/wayang-plugins/wayang-hackit/wayang-hackit-sidecar/src/main/java/org/apache/wayang/hackit/sidecar/webservice/WebService.java @@ -9,12 +9,11 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.wayang.core.util.ReflectionUtils; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.*; @RestController @RequestMapping("/sidecar") @@ -41,14 +40,18 @@ public class WebService { @GetMapping("/debug/start") public UUID serviceStart(){ - ProcessBuilder builder_proc1 = new ProcessBuilder("kubectl", "create", "-f", "jobmanager.yaml"); - ProcessBuilder builder_proc2 = new ProcessBuilder("kubectl", "create", "-f", "jobmanager-service.yaml"); - ProcessBuilder builder_proc3 = new ProcessBuilder("kubectl", "create", "-f", "taskmanger.yaml"); + ProcessBuilder builder_proc1 = new ProcessBuilder("kubectl", "create", "-f", "volume.yaml"); + ProcessBuilder builder_proc2 = new ProcessBuilder("kubectl", "create", "-f", "claim.yaml"); + ProcessBuilder builder_proc3 = new ProcessBuilder("kubectl", "create", "-f", "jobmanager_new.yaml"); + ProcessBuilder builder_proc4 = new ProcessBuilder("kubectl", "create", "-f", "jobmanager-service.yaml"); + ProcessBuilder builder_proc5 = new ProcessBuilder("kubectl", "create", "-f", "taskmanger_new.yaml"); List<ProcessBuilder> processes = new ArrayList<>(); processes.add(0, builder_proc1); processes.add(1, builder_proc2); processes.add(2, builder_proc3); + processes.add(3, builder_proc4); + processes.add(4, builder_proc5); UUID ProcessID = ExecutorManager.addThread(processes); @@ -80,12 +83,68 @@ public class WebService { return HttpStatus.ACCEPTED; } - @GetMapping("/debug/stop") - public HttpStatus serviceStop(){ + @GetMapping("/debug/plan/run") + public HttpStatus planRun(){ + + try { + // LocalEnvironment + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081); + + //ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 57261, getJars()); + //DataSet<String> data = env.readTextFile("file:///Users/rodrigopardomeza/flink/count"); + DataSet<String> data = env.readTextFile("/mnt/example/count"); + + data + /*.filter(new FilterFunction<String>() { + public boolean filter(String value) { + return value.startsWith("T"); + } + })*/ + .writeAsText("/mnt/example/output_z"); + + JobExecutionResult res = env.execute(); + + } catch (Exception e) { + e.printStackTrace(); + } return HttpStatus.ACCEPTED; } + private String[] getJars(){ + List<String> jars = new ArrayList<>(5); + List<Class> clazzs = Arrays.asList(new Class[]{this.getClass()}); + + clazzs.stream().map( + ReflectionUtils::getDeclaringJar + ).filter( + element -> element != null + ).forEach(jars::add); + + return jars.toArray(new String[0]); + } + + @GetMapping("/debug/stop") + public UUID serviceStop(){ + + ProcessBuilder builder_proc1 = new ProcessBuilder("kubectl", "delete", "-f", "volume.yaml"); + ProcessBuilder builder_proc2 = new ProcessBuilder("kubectl", "delete", "-f", "claim.yaml"); + ProcessBuilder builder_proc3 = new ProcessBuilder("kubectl", "delete", "-f", "jobmanager_new.yaml"); + ProcessBuilder builder_proc4 = new ProcessBuilder("kubectl", "delete", "-f", "jobmanager-service.yaml"); + ProcessBuilder builder_proc5 = new ProcessBuilder("kubectl", "delete", "-f", "taskmanger_new.yaml"); + + List<ProcessBuilder> processes = new ArrayList<>(); + processes.add(0, builder_proc1); + processes.add(1, builder_proc2); + processes.add(2, builder_proc3); + processes.add(3, builder_proc4); + processes.add(4, builder_proc5); + + UUID ProcessID = ExecutorManager.addThread(processes); + + return ProcessID; + } + @PostMapping(path = "/debug/check") public String checkStatus( @RequestParam("key") String key,
