This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 50a8484  add tmaster physical plan endpoint (#2941)
50a8484 is described below

commit 50a8484f555a8459205d05c98a940898507e8927
Author: Yao Li <clshl...@gmail.com>
AuthorDate: Fri Jun 29 10:48:11 2018 -0700

    add tmaster physical plan endpoint (#2941)
    
    * add tmaster pplan endpoint
    
    * fix http_server->InstallCallBack
    
    * fix request deletion
---
 heron/tmaster/src/cpp/BUILD                   |  1 +
 heron/tmaster/src/cpp/manager/tcontroller.cpp | 38 +++++++++++++++++++++++++++
 heron/tmaster/src/cpp/manager/tcontroller.h   |  1 +
 3 files changed, 40 insertions(+)

diff --git a/heron/tmaster/src/cpp/BUILD b/heron/tmaster/src/cpp/BUILD
index 91cf0b5..e337034 100644
--- a/heron/tmaster/src/cpp/BUILD
+++ b/heron/tmaster/src/cpp/BUILD
@@ -51,6 +51,7 @@ cc_library(
         "//heron/statemgrs/src/cpp:statemgrs-cxx",
         "//heron/proto:proto-cxx",
         "@com_github_jbeder_yaml_cpp//:yaml-cxx",
+        "@com_github_cereal//:cereal-cxx",
     ],
     linkstatic = 1,
 )
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp 
b/heron/tmaster/src/cpp/manager/tcontroller.cpp
index 1afabc0..b7a4ee6 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.cpp
+++ b/heron/tmaster/src/cpp/manager/tcontroller.cpp
@@ -26,6 +26,7 @@
 #include <vector>
 #include "basics/basics.h"
 #include "basics/strutils.h"
+#include "cereal/external/base64.hpp"
 #include "config/topology-config-helper.h"
 #include "errors/errors.h"
 #include "manager/tmaster.h"
@@ -65,6 +66,12 @@ TController::TController(EventLoop* eventLoop, const 
NetworkOptions& options, TM
     this->HandleUpdateRuntimeConfigRequest(request);
   };
   http_server_->InstallCallBack("/runtime_config/update", 
std::move(cbUpdateRuntimeConfg));
+
+  // Get current physical plan
+  auto cbGetCurPPlan = [this](IncomingHTTPRequest* request) {
+    this->HandleGetCurPPlanRequest(request);
+  };
+  http_server_->InstallCallBack("/get_current_physical_plan", 
std::move(cbGetCurPPlan));
 }
 
 TController::~TController() { delete http_server_; }
@@ -262,6 +269,37 @@ void 
TController::HandleUpdateRuntimeConfigRequestDone(IncomingHTTPRequest* requ
   delete request;
 }
 
+void TController::HandleGetCurPPlanRequest(IncomingHTTPRequest* request) {
+  LOG(INFO) << "Got a GetCurPPlan request from " << request->GetRemoteHost() 
<< ":"
+              << request->GetRemotePort();
+
+  // make sure all the stream managers are alive, in case that when container 
is fail,
+  // physical plan is still available at TMaster but not a valid one.
+  if (tmaster_->GetStmgrsRegSummary()->absent_stmgrs_size() != 0) {
+      http_server_->SendErrorReply(request, 400);
+      delete request;
+      return;
+  }
+
+  if (tmaster_->getPhysicalPlan() == NULL) {
+    http_server_->SendErrorReply(request, 400);
+  } else {
+    std::string pplanString;
+    tmaster_->getPhysicalPlan()->SerializeToString(&pplanString);
+
+    // SerializeToString() returns object in binary format which needs to be 
encoded
+    const unsigned char * encodeString = (unsigned char *)pplanString.c_str();
+    std::string pplanStringFixed = cereal::base64::encode(encodeString, 
pplanString.size());
+
+    const std::string message("Get current physical plan");
+    LOG(INFO) << message;
+    OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+    response->AddResponse(pplanStringFixed);
+    http_server_->SendReply(request, 200, response);
+  }
+  delete request;
+}
+
 /*
  * Validate topology.
  * - topology id matches
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.h 
b/heron/tmaster/src/cpp/manager/tcontroller.h
index beb8fb7..f7e7a48 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.h
+++ b/heron/tmaster/src/cpp/manager/tcontroller.h
@@ -66,6 +66,7 @@ class TController {
   void HandleUpdateRuntimeConfigRequest(IncomingHTTPRequest* request);
   void HandleUpdateRuntimeConfigRequestDone(IncomingHTTPRequest* request,
                                             proto::system::StatusCode);
+  void HandleGetCurPPlanRequest(IncomingHTTPRequest* request);
 
   // We are a http server
   HTTPServer* http_server_;

Reply via email to