This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 92a840ea0be70150a9adbec5be8c3af8ccfc407c
Author: Elliott Clark <[email protected]>
AuthorDate: Tue Apr 12 12:03:52 2016 -0700

    HBASE-15620 Add on Call serialization
    
    Summary: Add on delimited serialization so that request headers and request 
payloads can be serialized.
    
    Test Plan: Add a unit test.
    
    Differential Revision: https://reviews.facebook.net/D56757
---
 hbase-native-client/Dockerfile                     |  19 +--
 ...ocal_hbase_and_wait.sh => start-local-hbase.sh} |   0
 ...local_hbase_and_wait.sh => stop-local-hbase.sh} |   0
 hbase-native-client/connection/BUCK                |  43 +++++++
 .../{core => connection}/client-dispatcher.cc      |   4 +-
 .../{core => connection}/client-dispatcher.h       |  12 +-
 hbase-native-client/connection/client-handler.cc   |  80 ++++++++++++
 .../client-handler.h}                              |  27 ++--
 .../{core => connection}/connection-factory.cc     |  35 +++---
 .../{core => connection}/connection-factory.h      |  15 ++-
 .../{core => connection}/pipeline.cc               |  12 +-
 .../{core => connection}/pipeline.h                |  11 +-
 hbase-native-client/{core => connection}/request.h |  11 +-
 .../{core/request.h => connection/response.h}      |  19 ++-
 hbase-native-client/{core => connection}/service.h |   6 +-
 hbase-native-client/core/BUCK                      |  41 +-----
 hbase-native-client/core/admin.cc                  |  20 ---
 hbase-native-client/core/admin.h                   |  22 ----
 .../core/client-serialize-handler.cc               | 104 ---------------
 hbase-native-client/core/client.h                  |   2 +-
 hbase-native-client/core/connection_attr.h         |  24 ----
 hbase-native-client/core/delete.cc                 |  21 ----
 hbase-native-client/core/delete.h                  |  27 ----
 hbase-native-client/core/get-request.h             |   2 +-
 hbase-native-client/core/get-result.h              |   2 +-
 hbase-native-client/core/location-cache.cc         |   3 +-
 hbase-native-client/core/location-cache.h          |   2 +-
 hbase-native-client/core/mutation.cc               |  41 ------
 hbase-native-client/core/mutation.h                |  58 ---------
 hbase-native-client/core/scanner.cc                |  20 ---
 hbase-native-client/core/scanner.h                 |  22 ----
 hbase-native-client/core/simple-client.cc          |  45 ++++++-
 .../core/simple-native-client-test.cc              |  25 ----
 hbase-native-client/core/table-name.h              |   2 +-
 .../{native-client-test-env.cc => test-env.cc}     |   4 +-
 hbase-native-client/if/BUCK                        |  53 ++++----
 hbase-native-client/serde/BUCK                     |  54 ++++++++
 .../serde/client-deserializer-test.cc              |  67 ++++++++++
 hbase-native-client/serde/client-deserializer.cc   |  68 ++++++++++
 .../response.h => serde/client-deserializer.h}     |  22 ++--
 .../serde/client-serializer-test.cc                |  75 +++++++++++
 hbase-native-client/serde/client-serializer.cc     | 139 +++++++++++++++++++++
 .../client-serializer.h}                           |  43 ++++---
 43 files changed, 747 insertions(+), 555 deletions(-)

diff --git a/hbase-native-client/Dockerfile b/hbase-native-client/Dockerfile
index 36959a5..9ccc0d3 100644
--- a/hbase-native-client/Dockerfile
+++ b/hbase-native-client/Dockerfile
@@ -22,7 +22,14 @@ ARG CXX=/usr/bin/g++-5
 ARG CFLAGS="-D_GLIBCXX_USE_CXX11_ABI=0 -fPIC -g -fno-omit-frame-pointer -O3 
-pthread"
 ARG CXXFLAGS="-D_GLIBCXX_USE_CXX11_ABI=0 -fPIC -g -fno-omit-frame-pointer -O3 
-pthread"
 
-RUN apt-get install -y clang-format-3.7 vim maven inetutils-ping
+RUN apt-get install -y clang-format-3.7 vim maven inetutils-ping python-pip && 
\
+      pip install yapf && \
+      ln -sf /usr/bin/clang-format-3.7 /usr/bin/clang-format && \
+      apt-get -qq clean && \
+      apt-get -y -qq autoremove && \
+      rm -rf /var/lib/{apt,dpkg,cache,log}/ && \
+      rm -rf /tmp/*
+
 RUN git clone --depth 1 --branch v2.6.1 https://github.com/google/protobuf.git 
/usr/src/protobuf && \
   cd /usr/src/protobuf/ && \
   ldconfig && \
@@ -31,9 +38,8 @@ RUN git clone --depth 1 --branch v2.6.1 
https://github.com/google/protobuf.git /
   make && \
   make install && \ 
   make clean && \
-  rm -rf .git
-
-RUN cd /usr/src && \
+  rm -rf .git && \
+  cd /usr/src && \
   wget 
http://www-us.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz 
&& \ 
   tar zxf zookeeper-3.4.8.tar.gz && \ 
   rm -rf zookeeper-3.4.8.tar.gz && \
@@ -43,8 +49,7 @@ RUN cd /usr/src && \
   ./configure && \
   make && \
   make install && \
-  make clean
-
-RUN ldconfig
+  make clean && \
+  ldconfig
 
 WORKDIR /usr/src/hbase/hbase-native-client
diff --git a/hbase-native-client/bin/start_local_hbase_and_wait.sh 
b/hbase-native-client/bin/start-local-hbase.sh
similarity index 100%
rename from hbase-native-client/bin/start_local_hbase_and_wait.sh
rename to hbase-native-client/bin/start-local-hbase.sh
diff --git a/hbase-native-client/bin/stop_local_hbase_and_wait.sh 
b/hbase-native-client/bin/stop-local-hbase.sh
similarity index 100%
rename from hbase-native-client/bin/stop_local_hbase_and_wait.sh
rename to hbase-native-client/bin/stop-local-hbase.sh
diff --git a/hbase-native-client/connection/BUCK 
b/hbase-native-client/connection/BUCK
new file mode 100644
index 0000000..5067708
--- /dev/null
+++ b/hbase-native-client/connection/BUCK
@@ -0,0 +1,43 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is the library dealing with a single connection
+# to a single server.
+cxx_library(name="connection",
+            exported_headers=[
+                "client-dispatcher.h",
+                "client-handler.h",
+                "connection-factory.h",
+                "pipeline.h",
+                "request.h",
+                "response.h",
+                "service.h",
+            ],
+            srcs=[
+                "client-dispatcher.cc",
+                "client-handler.cc",
+                "connection-factory.cc",
+                "pipeline.cc",
+            ],
+            deps=[
+                "//if:if",
+                "//utils:utils",
+                "//serde:serde",
+                "//third-party:folly",
+                "//third-party:wangle",
+            ],
+            visibility=['//core/...', ], )
diff --git a/hbase-native-client/core/client-dispatcher.cc 
b/hbase-native-client/connection/client-dispatcher.cc
similarity index 97%
rename from hbase-native-client/core/client-dispatcher.cc
rename to hbase-native-client/connection/client-dispatcher.cc
index d356759..25cff7d 100644
--- a/hbase-native-client/core/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -16,7 +16,7 @@
  * limitations under the License.
  *
  */
-#include "core/client-dispatcher.h"
+#include "connection/client-dispatcher.h"
 
 using namespace folly;
 using namespace hbase;
@@ -27,6 +27,7 @@ void ClientDispatcher::read(Context *ctx, Response in) {
   auto search = requests_.find(call_id);
   CHECK(search != requests_.end());
   auto p = std::move(search->second);
+
   requests_.erase(call_id);
 
   // TODO(eclark): check if the response
@@ -36,6 +37,7 @@ void ClientDispatcher::read(Context *ctx, Response in) {
 
 Future<Response> ClientDispatcher::operator()(Request arg) {
   auto call_id = ++current_call_id_;
+
   arg.set_call_id(call_id);
   auto &p = requests_[call_id];
   auto f = p.getFuture();
diff --git a/hbase-native-client/core/client-dispatcher.h 
b/hbase-native-client/connection/client-dispatcher.h
similarity index 85%
rename from hbase-native-client/core/client-dispatcher.h
rename to hbase-native-client/connection/client-dispatcher.h
index 4b9d35a..c3987c9 100644
--- a/hbase-native-client/core/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -21,9 +21,9 @@
 
 #include <wangle/service/ClientDispatcher.h>
 
-#include "core/pipeline.h"
-#include "core/request.h"
-#include "core/response.h"
+#include "connection/pipeline.h"
+#include "connection/request.h"
+#include "connection/response.h"
 
 namespace hbase {
 class ClientDispatcher
@@ -36,7 +36,7 @@ public:
   folly::Future<folly::Unit> close() override;
 
 private:
-  std::unordered_map<int32_t, folly::Promise<Response>> requests_;
-  uint32_t current_call_id_ = 1;
+  std::unordered_map<uint32_t, folly::Promise<Response>> requests_;
+  uint32_t current_call_id_ = 10;
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/connection/client-handler.cc 
b/hbase-native-client/connection/client-handler.cc
new file mode 100644
index 0000000..74b23ef
--- /dev/null
+++ b/hbase-native-client/connection/client-handler.cc
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/client-handler.h"
+
+#include <glog/logging.h>
+#include <folly/Likely.h>
+
+#include <string>
+
+#include "connection/request.h"
+#include "connection/response.h"
+#include "if/RPC.pb.h"
+#include "if/Client.pb.h"
+
+using namespace hbase;
+using namespace folly;
+using namespace wangle;
+using hbase::pb::ResponseHeader;
+using hbase::pb::GetResponse;
+
+ClientHandler::ClientHandler(std::string user_name) : user_name_(user_name) {}
+
+void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) {
+  if (LIKELY(buf != nullptr)) {
+    buf->coalesce();
+    Response received;
+    ResponseHeader header;
+
+    int used_bytes = deser_.parse_delimited(buf.get(), &header);
+    LOG(INFO) << "Read ResponseHeader size=" << used_bytes
+              << " call_id=" << header.call_id()
+              << " has_exception=" << header.has_exception();
+    received.set_call_id(header.call_id());
+
+    if (header.has_exception() == false) {
+      buf->trimStart(used_bytes);
+      // For now assume that everything was a get.
+      // We'll need to set this up later.
+      received.set_response(std::make_shared<GetResponse>());
+      used_bytes = deser_.parse_delimited(buf.get(), 
received.response().get());
+    }
+    ctx->fireRead(std::move(received));
+  }
+}
+
+Future<Unit> ClientHandler::write(Context *ctx, Request r) {
+  // Keep track of if we have sent the header.
+  if (UNLIKELY(need_send_header_)) {
+    need_send_header_ = false;
+
+    // Should we be sending just one fireWrite?
+    // Right now we're sending one for the header
+    // and one for the request.
+    //
+    // That doesn't seem like too bad, but who knows.
+    auto pre = ser_.preamble();
+    auto header = ser_.header(user_name_);
+    pre->appendChain(std::move(header));
+    ctx->fireWrite(std::move(pre));
+  }
+
+  return ctx->fireWrite(ser_.request(r.call_id(), r.method(), r.msg()));
+}
diff --git a/hbase-native-client/core/client-serialize-handler.h 
b/hbase-native-client/connection/client-handler.h
similarity index 75%
copy from hbase-native-client/core/client-serialize-handler.h
copy to hbase-native-client/connection/client-handler.h
index 961a03b..38c5725 100644
--- a/hbase-native-client/core/client-serialize-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -20,25 +20,30 @@
 
 #include <wangle/channel/Handler.h>
 
-#include "if/HBase.pb.h"
-#include "if/RPC.pb.h"
-#include "core/request.h"
-#include "core/response.h"
+#include <string>
+
+#include "serde/client-serializer.h"
+#include "serde/client-deserializer.h"
+
+// Forward decs.
+namespace hbase {
+class Request;
+class Response;
+}
 
 namespace hbase {
-class ClientSerializeHandler
+class ClientHandler
     : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, Request,
                              std::unique_ptr<folly::IOBuf>> {
 public:
+  ClientHandler(std::string user_name);
   void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override;
   folly::Future<folly::Unit> write(Context *ctx, Request r) override;
 
 private:
-  folly::Future<folly::Unit> write_preamble(Context *ctx);
-  folly::Future<folly::Unit> write_header(Context *ctx);
-  // Our own simple version of LengthFieldPrepender
-  std::unique_ptr<folly::IOBuf>
-  prepend_length(std::unique_ptr<folly::IOBuf> msg);
   bool need_send_header_ = true;
+  std::string user_name_;
+  ClientSerializer ser_;
+  ClientDeserializer deser_;
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/connection-factory.cc 
b/hbase-native-client/connection/connection-factory.cc
similarity index 66%
rename from hbase-native-client/core/connection-factory.cc
rename to hbase-native-client/connection/connection-factory.cc
index 785b239..5d1b0da 100644
--- a/hbase-native-client/core/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -17,41 +17,42 @@
  *
  */
 
-#include "core/connection-factory.h"
+#include "connection/connection-factory.h"
 
+#include <folly/futures/Future.h>
+#include <wangle/bootstrap/ClientBootstrap.h>
 #include <wangle/channel/AsyncSocketHandler.h>
 #include <wangle/channel/EventBaseHandler.h>
 #include <wangle/channel/OutputBufferingHandler.h>
 #include <wangle/service/ClientDispatcher.h>
+#include <wangle/service/CloseOnReleaseFilter.h>
 #include <wangle/service/ExpiringFilter.h>
-#include <folly/futures/Future.h>
 
 #include <string>
 
-#include "core/client-dispatcher.h"
-#include "core/pipeline.h"
-#include "core/request.h"
-#include "core/response.h"
-#include "core/service.h"
+#include "connection/client-dispatcher.h"
+#include "connection/pipeline.h"
+#include "connection/request.h"
+#include "connection/response.h"
+#include "connection/service.h"
 
 using namespace folly;
 using namespace hbase;
 using namespace wangle;
 
 ConnectionFactory::ConnectionFactory() {
-  bootstrap_.group(std::make_shared<wangle::IOThreadPoolExecutor>(2));
+  bootstrap_.group(std::make_shared<wangle::IOThreadPoolExecutor>(1));
   bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
 }
 
-Future<ClientDispatcher> ConnectionFactory::make_connection(std::string host,
-                                                            int port) {
+std::shared_ptr<Service<Request, Response>>
+ConnectionFactory::make_connection(std::string host, int port) {
   // Connect to a given server
   // Then when connected create a ClientDispactcher.
-  auto srv = bootstrap_.connect(SocketAddress(host, port, true))
-                 .then([](SerializePipeline *pipeline) {
-                   ClientDispatcher dispatcher;
-                   dispatcher.setPipeline(pipeline);
-                   return dispatcher;
-                 });
-  return srv;
+  auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get();
+  auto dispatcher = std::make_shared<ClientDispatcher>();
+  dispatcher->setPipeline(pipeline);
+  auto service =
+      std::make_shared<CloseOnReleaseFilter<Request, Response>>(dispatcher);
+  return service;
 }
diff --git a/hbase-native-client/core/connection-factory.h 
b/hbase-native-client/connection/connection-factory.h
similarity index 78%
rename from hbase-native-client/core/connection-factory.h
rename to hbase-native-client/connection/connection-factory.h
index 6f450c2..73ac032 100644
--- a/hbase-native-client/core/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -18,24 +18,23 @@
  */
 #pragma once
 
-#include <wangle/bootstrap/ClientBootstrap.h>
 #include <wangle/service/Service.h>
 
 #include <string>
 
-#include "core/service.h"
-#include "core/pipeline.h"
-#include "core/client-dispatcher.h"
-#include "core/request.h"
-#include "core/response.h"
+#include "connection/pipeline.h"
+#include "connection/request.h"
+#include "connection/response.h"
+#include "connection/service.h"
 
 namespace hbase {
 class ConnectionFactory {
 public:
   ConnectionFactory();
-  folly::Future<ClientDispatcher> make_connection(std::string host, int port);
+  std::shared_ptr<wangle::Service<Request, Response>>
+  make_connection(std::string host, int port);
 
 private:
   wangle::ClientBootstrap<SerializePipeline> bootstrap_;
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/pipeline.cc 
b/hbase-native-client/connection/pipeline.cc
similarity index 82%
rename from hbase-native-client/core/pipeline.cc
rename to hbase-native-client/connection/pipeline.cc
index 30d14ff..b9f5e0b 100644
--- a/hbase-native-client/core/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -16,7 +16,7 @@
  * limitations under the License.
  *
  */
-#include "core/pipeline.h"
+#include "connection/pipeline.h"
 
 #include <folly/Logging.h>
 #include <wangle/channel/AsyncSocketHandler.h>
@@ -24,7 +24,7 @@
 #include <wangle/channel/OutputBufferingHandler.h>
 #include <wangle/codec/LengthFieldBasedFrameDecoder.h>
 
-#include "core/client-serialize-handler.h"
+#include "connection/client-handler.h"
 
 using namespace folly;
 using namespace hbase;
@@ -33,10 +33,10 @@ using namespace wangle;
 SerializePipeline::Ptr
 RpcPipelineFactory::newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {
   auto pipeline = SerializePipeline::create();
-  pipeline->addBack(AsyncSocketHandler(sock));
-  pipeline->addBack(EventBaseHandler());
-  pipeline->addBack(LengthFieldBasedFrameDecoder());
-  pipeline->addBack(ClientSerializeHandler());
+  pipeline->addBack(AsyncSocketHandler{sock});
+  pipeline->addBack(EventBaseHandler{});
+  pipeline->addBack(LengthFieldBasedFrameDecoder{});
+  pipeline->addBack(ClientHandler{user_util_.user_name()});
   pipeline->finalize();
   return pipeline;
 }
diff --git a/hbase-native-client/core/pipeline.h 
b/hbase-native-client/connection/pipeline.h
similarity index 88%
rename from hbase-native-client/core/pipeline.h
rename to hbase-native-client/connection/pipeline.h
index d199d08..68ade48 100644
--- a/hbase-native-client/core/pipeline.h
+++ b/hbase-native-client/connection/pipeline.h
@@ -20,8 +20,10 @@
 
 #include <wangle/service/Service.h>
 #include <folly/io/IOBufQueue.h>
-#include "core/request.h"
-#include "core/response.h"
+
+#include "connection/request.h"
+#include "connection/response.h"
+#include "utils/user-util.h"
 
 namespace hbase {
 using SerializePipeline = wangle::Pipeline<folly::IOBufQueue &, Request>;
@@ -30,5 +32,8 @@ class RpcPipelineFactory : public 
wangle::PipelineFactory<SerializePipeline> {
 public:
   SerializePipeline::Ptr
   newPipeline(std::shared_ptr<folly::AsyncTransportWrapper> sock) override;
+
+private:
+  UserUtil user_util_;
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/request.h 
b/hbase-native-client/connection/request.h
similarity index 71%
copy from hbase-native-client/core/request.h
copy to hbase-native-client/connection/request.h
index 39083ed..e9e3e88 100644
--- a/hbase-native-client/core/request.h
+++ b/hbase-native-client/connection/request.h
@@ -18,7 +18,10 @@
  */
 #pragma once
 
+#include <google/protobuf/message.h>
+
 #include <cstdint>
+#include <string>
 
 namespace hbase {
 class Request {
@@ -26,8 +29,14 @@ public:
   Request() : call_id_(0) {}
   uint32_t call_id() { return call_id_; }
   void set_call_id(uint32_t call_id) { call_id_ = call_id; }
+  google::protobuf::Message *msg() { return msg_.get(); }
+  void set_msg(std::shared_ptr<google::protobuf::Message> msg) { msg_ = msg; }
+  std::string method() { return method_; }
+  void set_method(std::string method) { method_ = method; }
 
 private:
   uint32_t call_id_;
+  std::shared_ptr<google::protobuf::Message> msg_ = nullptr;
+  std::string method_ = "Get";
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/request.h 
b/hbase-native-client/connection/response.h
similarity index 72%
rename from hbase-native-client/core/request.h
rename to hbase-native-client/connection/response.h
index 39083ed..a7f7939 100644
--- a/hbase-native-client/core/request.h
+++ b/hbase-native-client/connection/response.h
@@ -20,14 +20,27 @@
 
 #include <cstdint>
 
+// Forward
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+
 namespace hbase {
-class Request {
+
+class Response {
 public:
-  Request() : call_id_(0) {}
+  Response() : call_id_(0) {}
   uint32_t call_id() { return call_id_; }
   void set_call_id(uint32_t call_id) { call_id_ = call_id; }
+  std::shared_ptr<google::protobuf::Message> response() { return response_; }
+  void set_response(std::shared_ptr<google::protobuf::Message> response) {
+    response_ = std::move(response);
+  }
 
 private:
   uint32_t call_id_;
+  std::shared_ptr<google::protobuf::Message> response_;
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/service.h 
b/hbase-native-client/connection/service.h
similarity index 91%
rename from hbase-native-client/core/service.h
rename to hbase-native-client/connection/service.h
index 880e65f..feb14ec 100644
--- a/hbase-native-client/core/service.h
+++ b/hbase-native-client/connection/service.h
@@ -18,9 +18,9 @@
  */
 #pragma once
 
-#include "core/request.h"
-#include "core/response.h"
+#include "connection/request.h"
+#include "connection/response.h"
 
 namespace hbase {
 using HBaseService = wangle::Service<Request, Response>;
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 2b00d66..195fc5c 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -15,50 +15,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+# This is the main library.
 cxx_library(name="core",
             exported_headers=[
-                "admin.h",
-                "client-dispatcher.h",
-                "client-serialize-handler.h",
                 "client.h",
-                "connection-factory.h",
                 "connection.h",
-                "connection_attr.h",
-                "delete.h",
                 "get-request.h",
                 "get-result.h",
-                "get.h",
                 "hbase_macros.h",
                 "location-cache.h",
-                "mutation.h",
-                "pipeline.h",
-                "put.h",
-                "request.h",
-                "response.h",
-                "scanner.h",
-                "service.h",
                 "table-name.h",
             ],
             srcs=[
-                "admin.cc",
-                "client-dispatcher.cc",
-                "client-serialize-handler.cc",
                 "client.cc",
-                "connection-factory.cc",
-                "connection.cc",
-                "delete.cc",
                 "get-request.cc",
                 "get-result.cc",
-                "get.cc",
                 "location-cache.cc",
-                "mutation.cc",
-                "pipeline.cc",
-                "put.cc",
-                "scanner.cc",
                 "table-name.cc",
             ],
             deps=[
+                "//connection:connection",
                 "//if:if",
+                "//serde:serde",
                 "//third-party:folly",
                 "//third-party:wangle",
                 "//third-party:zookeeper_mt",
@@ -67,18 +45,9 @@ cxx_library(name="core",
                 'PUBLIC',
             ], )
 
-cxx_test(name="simple-test",
-         srcs=[
-             "native-client-test-env.cc",
-             "simple-native-client-test.cc",
-         ],
-         deps=[
-             ":core",
-         ],
-         run_test_separately=True, )
 cxx_test(name="location-cache-test",
          srcs=[
-             "native-client-test-env.cc",
+             "test-env.cc",
              "location-cache-test.cc",
          ],
          deps=[
@@ -87,4 +56,4 @@ cxx_test(name="location-cache-test",
          run_test_separately=True, )
 cxx_binary(name="simple-client",
            srcs=["simple-client.cc", ],
-           deps=[":core", ], )
+           deps=[":core", "//connection:connection"], )
diff --git a/hbase-native-client/core/admin.cc 
b/hbase-native-client/core/admin.cc
deleted file mode 100644
index 897e6bf..0000000
--- a/hbase-native-client/core/admin.cc
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/admin.h"
diff --git a/hbase-native-client/core/admin.h b/hbase-native-client/core/admin.h
deleted file mode 100644
index 775181c..0000000
--- a/hbase-native-client/core/admin.h
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-class Admin {};
diff --git a/hbase-native-client/core/client-serialize-handler.cc 
b/hbase-native-client/core/client-serialize-handler.cc
deleted file mode 100644
index cad1308..0000000
--- a/hbase-native-client/core/client-serialize-handler.cc
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/client-serialize-handler.h"
-
-#include <string>
-
-using namespace hbase;
-using namespace folly;
-using namespace wangle;
-
-static const std::string PREAMBLE = "HBas";
-static const std::string INTERFACE = "ClientService";
-static const uint8_t RPC_VERSION = 0;
-static const uint8_t AUTH_TYPE = 80;
-
-// TODO(eclark): Make this actually do ANYTHING.
-void ClientSerializeHandler::read(Context *ctx, std::unique_ptr<IOBuf> msg) {
-  Response received;
-  ctx->fireRead(received);
-}
-
-Future<Unit> ClientSerializeHandler::write(Context *ctx, Request r) {
-  // Keep track of if we have sent the header.
-  if (need_send_header_) {
-    need_send_header_ = false;
-
-    // Should this be replacing the IOBuf rather than
-    // sending several different calls?
-    write_preamble(ctx);
-    write_header(ctx);
-  }
-
-  // Send out the actual request and not just a test string.
-  std::string out{"test"};
-  return ctx->fireWrite(prepend_length(IOBuf::copyBuffer(out)));
-}
-
-Future<Unit> ClientSerializeHandler::write_preamble(Context *ctx) {
-  auto magic = IOBuf::copyBuffer(PREAMBLE);
-  auto buf = IOBuf::create(2);
-  buf->append(2);
-  folly::io::RWPrivateCursor c(buf.get());
-
-  // Version
-  c.write(RPC_VERSION);
-  // Standard security aka Please don't lie to me.
-  c.write(AUTH_TYPE);
-  magic->appendChain(std::move(buf));
-  return ctx->fireWrite(std::move(magic));
-}
-
-Future<Unit> ClientSerializeHandler::write_header(Context *ctx) {
-  pb::ConnectionHeader h;
-
-  // TODO(eclark): Make this not a total lie.
-  h.mutable_user_info()->set_effective_user("elliott");
-  // The service name that we want to talk to.
-  //
-  // Right now we're completely ignoring the service interface.
-  // That may or may not be the correct thing to do.
-  // It worked for a while with the java client; until it
-  // didn't.
-  h.set_service_name(INTERFACE);
-  // TODO(eclark): Make this 1 copy.
-  auto msg = IOBuf::copyBuffer(h.SerializeAsString());
-  return ctx->fireWrite(prepend_length(std::move(msg)));
-}
-
-// Our own simple version of LengthFieldPrepender
-std::unique_ptr<IOBuf>
-ClientSerializeHandler::prepend_length(std::unique_ptr<IOBuf> msg) {
-  // Java ints are 4 long. So create a buffer that large
-  auto len_buf = IOBuf::create(4);
-  // Then make those bytes visible.
-  len_buf->append(4);
-
-  io::RWPrivateCursor c(len_buf.get());
-  // Get the size of the data to be pushed out the network.
-  auto size = msg->computeChainDataLength();
-
-  // Write the length to this IOBuf.
-  c.writeBE(static_cast<uint32_t>(size));
-
-  // Then attach the origional to the back of len_buf
-  len_buf->appendChain(std::move(msg));
-  return len_buf;
-}
diff --git a/hbase-native-client/core/client.h 
b/hbase-native-client/core/client.h
index 818bc6b..c2dc226 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -39,4 +39,4 @@ private:
   LocationCache location_cache;
 };
 
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/connection_attr.h 
b/hbase-native-client/core/connection_attr.h
deleted file mode 100644
index a312005..0000000
--- a/hbase-native-client/core/connection_attr.h
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include "core/hbase_macros.h"
-
-class ConnectionAttr {};
diff --git a/hbase-native-client/core/delete.cc 
b/hbase-native-client/core/delete.cc
deleted file mode 100644
index 57030be..0000000
--- a/hbase-native-client/core/delete.cc
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "core/delete.h"
-
-Delete::~Delete() {}
diff --git a/hbase-native-client/core/delete.h 
b/hbase-native-client/core/delete.h
deleted file mode 100644
index 34f6a6c..0000000
--- a/hbase-native-client/core/delete.h
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include "core/mutation.h"
-
-class Delete : public Mutation {
-public:
-  ~Delete();
-};
diff --git a/hbase-native-client/core/get-request.h 
b/hbase-native-client/core/get-request.h
index c9113ad..bb755c5 100644
--- a/hbase-native-client/core/get-request.h
+++ b/hbase-native-client/core/get-request.h
@@ -32,4 +32,4 @@ private:
   TableName table_name_;
   std::string key_;
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/get-result.h 
b/hbase-native-client/core/get-result.h
index e021316..a49ad98 100644
--- a/hbase-native-client/core/get-result.h
+++ b/hbase-native-client/core/get-result.h
@@ -29,4 +29,4 @@ public:
 private:
   std::string key_;
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/location-cache.cc 
b/hbase-native-client/core/location-cache.cc
index 34e3236..52e86e3 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -70,8 +70,7 @@ ServerName LocationCache::ReadMetaLocation() {
   // This needs to be int rather than size_t as that's what ZK expects.
   int len = sizeof(contents);
   // TODO(elliott): handle disconnects/reconntion as needed.
-  int zk_result =
-      zoo_get(this->zk_, META_LOCATION, 0, contents, &len, nullptr);
+  int zk_result = zoo_get(this->zk_, META_LOCATION, 0, contents, &len, 
nullptr);
   if (zk_result != ZOK || len < 9) {
     LOG(ERROR) << "Error getting meta location.";
     throw runtime_error("Error getting meta location");
diff --git a/hbase-native-client/core/location-cache.h 
b/hbase-native-client/core/location-cache.h
index efcfde5..28a1ee1 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -51,4 +51,4 @@ private:
 
   zhandle_t *zk_;
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/mutation.cc 
b/hbase-native-client/core/mutation.cc
deleted file mode 100644
index 52910d5..0000000
--- a/hbase-native-client/core/mutation.cc
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/mutation.h"
-
-void Mutation::set_namespace(char *name_space, size_t name_space_length) {
-  this->name_space = name_space;
-  this->name_space_length = name_space_length;
-}
-
-void Mutation::set_table(char *table, size_t table_length) {
-  this->table = table;
-  this->table_length = table_length;
-}
-
-void Mutation::set_row(unsigned char *row, size_t row_length) {
-  this->row = row;
-  this->row_length = row_length;
-}
-
-void Mutation::set_durability(durability_type durability) {
-  this->durability = durability;
-}
-
-Mutation::~Mutation() {}
diff --git a/hbase-native-client/core/mutation.h 
b/hbase-native-client/core/mutation.h
deleted file mode 100644
index 1880571..0000000
--- a/hbase-native-client/core/mutation.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <stdlib.h>
-
-typedef enum {
-  DELETE_ONE_VERSION,
-  DELETE_MULTIPLE_VERSIONS,
-  DELETE_FAMILY,
-  DELETE_FAMILY_VERSION
-} delete_type;
-
-typedef enum {
-  USE_DEFAULT,
-  SKIP_WAL,
-  ASYNC_WAL,
-  SYNC_WAL,
-  HSYNC_WAL
-} durability_type;
-
-class Mutation {
-  char *name_space;
-  size_t name_space_length;
-
-  char *table;
-  size_t table_length;
-
-  unsigned char *row;
-  size_t row_length;
-
-  durability_type durability;
-
-public:
-  void set_namespace(char *name_space, size_t name_space_length);
-  void set_table(char *table, size_t table_length);
-  void set_row(unsigned char *row, size_t row_length);
-  void set_durability(durability_type durability);
-
-  virtual ~Mutation();
-};
diff --git a/hbase-native-client/core/scanner.cc 
b/hbase-native-client/core/scanner.cc
deleted file mode 100644
index a10e444..0000000
--- a/hbase-native-client/core/scanner.cc
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "core/scanner.h"
diff --git a/hbase-native-client/core/scanner.h 
b/hbase-native-client/core/scanner.h
deleted file mode 100644
index 180865a..0000000
--- a/hbase-native-client/core/scanner.h
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-class Scanner {};
diff --git a/hbase-native-client/core/simple-client.cc 
b/hbase-native-client/core/simple-client.cc
index 08e886a..065f70b 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -20,21 +20,30 @@
 #include <folly/Logging.h>
 #include <folly/Random.h>
 #include <gflags/gflags.h>
-#include <glog/logging.h>
 #include <wangle/concurrent/GlobalExecutor.h>
 
 #include <iostream>
+#include <chrono>
 
 #include "core/client.h"
-#include "core/connection-factory.h"
+#include "connection/connection-factory.h"
 #include "if/ZooKeeper.pb.h"
+#include "if/Client.pb.h"
 
 using namespace folly;
 using namespace std;
+using namespace std::chrono;
 using namespace hbase;
 using namespace hbase::pb;
+using namespace google::protobuf;
+
+// TODO(eclark): remove the need for this.
+DEFINE_string(region, "1588230740", "What region to send a get to");
+DEFINE_string(row, "test", "What row to get");
 
 int main(int argc, char *argv[]) {
+  google::SetUsageMessage(
+      "Simple client to get a single row from HBase on the comamnd line");
   google::ParseCommandLineFlags(&argc, &argv, true);
   google::InitGoogleLogging(argv[0]);
 
@@ -44,14 +53,40 @@ int main(int argc, char *argv[]) {
   LocationCache cache{"localhost:2181", wangle::getCPUExecutor()};
 
   auto result = cache.LocateMeta().get();
-  cout << "ServerName = " << result.host_name() << ":" << result.port() << 
endl;
 
   // Create a connection to the local host
-  auto conn = cf.make_connection(result.host_name(), result.port()).get();
+  auto conn = cf.make_connection(result.host_name(), result.port());
 
   // Send the request
   Request r;
-  conn(r).get();
+
+  // This is a get request so make that
+  auto msg = make_shared<hbase::pb::GetRequest>();
+
+  // Set what region
+  msg->mutable_region()->set_value(FLAGS_region);
+  // It's always this.
+  msg->mutable_region()->set_type(
+      RegionSpecifier_RegionSpecifierType::
+          RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
+  // What row.
+  msg->mutable_get()->set_row(FLAGS_row);
+  // Send it.
+  r.set_msg(std::move(msg));
+  auto resp = (*conn)(r).get(milliseconds(5000));
+
+  auto get_resp = std::static_pointer_cast<GetResponse>(resp.response());
+  cout << "GetResponse has_result = " << get_resp->has_result() << '\n';
+  if (get_resp->has_result()) {
+    auto &r = get_resp->result();
+    cout << "Result cell_size = " << r.cell_size() << endl;
+    for (auto &cell : r.cell()) {
+      cout << "\trow = " << cell.row() << " family = " << cell.family()
+           << " qualifier = " << cell.qualifier()
+           << " timestamp = " << cell.timestamp() << " value = " << 
cell.value()
+           << endl;
+    }
+  }
 
   return 0;
 }
diff --git a/hbase-native-client/core/simple-native-client-test.cc 
b/hbase-native-client/core/simple-native-client-test.cc
deleted file mode 100644
index ee39986..0000000
--- a/hbase-native-client/core/simple-native-client-test.cc
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "gtest/gtest.h"
-
-/**
- * Sample test.
- */
-TEST(SampleTest, sample) { EXPECT_TRUE(true); }
diff --git a/hbase-native-client/core/table-name.h 
b/hbase-native-client/core/table-name.h
index 796115b..37c3461 100644
--- a/hbase-native-client/core/table-name.h
+++ b/hbase-native-client/core/table-name.h
@@ -29,4 +29,4 @@ public:
   explicit TableName(std::string tableName);
   explicit TableName(std::string namespaceName, std::string tableName);
 };
-}  // namespace hbase
+} // namespace hbase
diff --git a/hbase-native-client/core/native-client-test-env.cc 
b/hbase-native-client/core/test-env.cc
similarity index 91%
rename from hbase-native-client/core/native-client-test-env.cc
rename to hbase-native-client/core/test-env.cc
index 0269a43..277abd9 100644
--- a/hbase-native-client/core/native-client-test-env.cc
+++ b/hbase-native-client/core/test-env.cc
@@ -25,13 +25,13 @@ class NativeClientTestEnv : public ::testing::Environment {
 public:
   void SetUp() override {
     // start local HBase cluster to be reused by all tests
-    auto result = system("bin/start_local_hbase_and_wait.sh");
+    auto result = system("bin/start-local-hbase.sh");
     ASSERT_EQ(0, result);
   }
 
   void TearDown() override {
     // shutdown local HBase cluster
-    auto result = system("bin/stop_local_hbase_and_wait.sh");
+    auto result = system("bin/stop-local-hbase.sh");
     ASSERT_EQ(0, result);
   }
 };
diff --git a/hbase-native-client/if/BUCK b/hbase-native-client/if/BUCK
index 5ff617d..1a9721d 100644
--- a/hbase-native-client/if/BUCK
+++ b/hbase-native-client/if/BUCK
@@ -15,39 +15,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 PROTO_SRCS = glob(['*.proto'])
-HEADER_FILENAMES = [ x.replace('.proto','.pb.h') for x in PROTO_SRCS]
-CC_FILENAMES = [ x.replace('.proto', '.pb.cc') for x in PROTO_SRCS]
+HEADER_FILENAMES = [x.replace('.proto', '.pb.h') for x in PROTO_SRCS]
+CC_FILENAMES = [x.replace('.proto', '.pb.cc') for x in PROTO_SRCS]
 
 genrule(
-  name = 'generate-proto-sources',
-  srcs = PROTO_SRCS,
-  cmd = 'mkdir -p $OUT && pwd && protoc --proto_path=. --cpp_out=$OUT *.proto',
-  out = 'output',
-)
+    name='generate-proto-sources',
+    srcs=PROTO_SRCS,
+    cmd='mkdir -p $OUT && pwd && protoc --proto_path=. --cpp_out=$OUT *.proto',
+    out='output', )
 
 for header_filename in HEADER_FILENAMES:
-  genrule(
-    name = header_filename,
-    cmd = 'mkdir -p `dirname $OUT` '
-          ' && cp $(location :generate-proto-sources)/{} 
$OUT'.format(header_filename),
-    out = header_filename,
-  )
+    genrule(name=header_filename,
+            cmd='mkdir -p `dirname $OUT` '
+            ' && cp $(location :generate-proto-sources)/{} $OUT'.format(
+                header_filename),
+            out=header_filename, )
 for cc_filename in CC_FILENAMES:
-  genrule(
-    name = cc_filename,
-    cmd = 'mkdir -p `dirname $OUT` '
-          ' && cp $(location :generate-proto-sources)/*.cc `dirname $OUT` '
-          ' && cp $(location :generate-proto-sources)/*.h `dirname 
$OUT`'.format(cc_filename),
-    out = cc_filename,
-  )
+    genrule(
+        name=cc_filename,
+        cmd='mkdir -p `dirname $OUT` '
+        ' && cp $(location :generate-proto-sources)/*.cc `dirname $OUT` '
+        ' && cp $(location :generate-proto-sources)/*.h `dirname $OUT`'.format(
+            cc_filename),
+        out=cc_filename, )
 
-cxx_library(
-  name = 'if',
-  exported_headers =  [':' + x for x in HEADER_FILENAMES],
-  srcs = [':' + x for x in CC_FILENAMES],
-  deps = [ '//third-party:protobuf'],
-  visibility = [ 'PUBLIC', ],
-  exported_deps = ['//third-party:protobuf']
-)
+cxx_library(name='if',
+            exported_headers=[':' + x for x in HEADER_FILENAMES],
+            srcs=[':' + x for x in CC_FILENAMES],
+            deps=['//third-party:protobuf'],
+            visibility=['PUBLIC', ],
+            exported_deps=['//third-party:protobuf'])
diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK
new file mode 100644
index 0000000..207607f
--- /dev/null
+++ b/hbase-native-client/serde/BUCK
@@ -0,0 +1,54 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cxx_library(name="serde",
+            exported_headers=[
+                "client-serializer.h",
+                "client-deserializer.h",
+            ],
+            srcs=[
+                "client-serializer.cc",
+                "client-deserializer.cc",
+            ],
+            deps=[
+                "//if:if",
+                "//third-party:folly",
+            ],
+            tests=[
+                ":client-serializer-test",
+                ":client-deserializer-test",
+            ],
+            visibility=[
+                'PUBLIC',
+            ], )
+
+cxx_test(name="client-serializer-test",
+         srcs=[
+             "client-serializer-test.cc",
+         ],
+         deps=[
+             ":serde",
+             "//if:if",
+         ], )
+cxx_test(name="client-deserializer-test",
+         srcs=[
+             "client-deserializer-test.cc",
+         ],
+         deps=[
+             ":serde",
+             "//if:if",
+         ], )
diff --git a/hbase-native-client/serde/client-deserializer-test.cc 
b/hbase-native-client/serde/client-deserializer-test.cc
new file mode 100644
index 0000000..bb57e50
--- /dev/null
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+#include <folly/io/IOBuf.h>
+
+#include "serde/client-deserializer.h"
+#include "serde/client-serializer.h"
+#include "if/Client.pb.h"
+
+using namespace hbase;
+using folly::IOBuf;
+using hbase::pb::GetRequest;
+using hbase::pb::RegionSpecifier;
+using hbase::pb::RegionSpecifier_RegionSpecifierType;
+
+TEST(TestClientDeserializer, TestReturnFalseOnNullPtr) {
+  ClientDeserializer deser;
+  ASSERT_LT(deser.parse_delimited(nullptr, nullptr), 0);
+}
+
+TEST(TestClientDeserializer, TestReturnFalseOnBadInput) {
+  ClientDeserializer deser;
+  auto buf = IOBuf::copyBuffer("test");
+  GetRequest gr;
+
+  ASSERT_LT(deser.parse_delimited(buf.get(), &gr), 0);
+}
+
+TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) {
+  GetRequest in;
+  ClientSerializer ser;
+  ClientDeserializer deser;
+
+  // fill up the GetRequest.
+  in.mutable_region()->set_value("test_region_id");
+  in.mutable_region()->set_type(
+      RegionSpecifier_RegionSpecifierType::
+          RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
+  in.mutable_get()->set_row("test_row");
+
+  // Create the buffer
+  auto buf = ser.serialize_delimited(in);
+
+  GetRequest out;
+
+  int used_bytes = deser.parse_delimited(buf.get(), &out);
+
+  ASSERT_GT(used_bytes, 0);
+  ASSERT_EQ(used_bytes, buf->length());
+}
diff --git a/hbase-native-client/serde/client-deserializer.cc 
b/hbase-native-client/serde/client-deserializer.cc
new file mode 100644
index 0000000..118b0d1
--- /dev/null
+++ b/hbase-native-client/serde/client-deserializer.cc
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "serde/client-deserializer.h"
+
+#include <google/protobuf/message.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <folly/Logging.h>
+
+using namespace hbase;
+
+using folly::IOBuf;
+using google::protobuf::Message;
+using google::protobuf::io::ArrayInputStream;
+using google::protobuf::io::CodedInputStream;
+
+int ClientDeserializer::parse_delimited(const IOBuf *buf, Message *msg) {
+  if (buf == nullptr || msg == nullptr) {
+    return -2;
+  }
+
+  DCHECK(!buf->isChained());
+
+  ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
+  CodedInputStream coded_stream{&ais};
+
+  uint32_t msg_size;
+
+  // Try and read the varint.
+  if (coded_stream.ReadVarint32(&msg_size) == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
+    return -3;
+  }
+
+  coded_stream.PushLimit(msg_size);
+  // Parse the message.
+  if (msg->MergeFromCodedStream(&coded_stream) == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000)
+        << "Unable to read a protobuf message from data.";
+    return -4;
+  }
+
+  // Make sure all the data was consumed.
+  if (coded_stream.ConsumedEntireMessage() == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000)
+        << "Orphaned data left after reading protobuf message";
+    return -5;
+  }
+
+  return coded_stream.CurrentPosition();
+}
diff --git a/hbase-native-client/core/response.h 
b/hbase-native-client/serde/client-deserializer.h
similarity index 79%
rename from hbase-native-client/core/response.h
rename to hbase-native-client/serde/client-deserializer.h
index 34a284d..b9664b0 100644
--- a/hbase-native-client/core/response.h
+++ b/hbase-native-client/serde/client-deserializer.h
@@ -18,17 +18,19 @@
  */
 #pragma once
 
-#include <cstdint>
+#include <folly/io/IOBuf.h>
 
-namespace hbase {
+// Forward
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
 
-class Response {
+namespace hbase {
+class ClientDeserializer {
 public:
-  Response() : call_id_(0) {}
-  uint32_t call_id() { return call_id_; }
-  void set_call_id(uint32_t call_id) { call_id_ = call_id; }
-
-private:
-  uint32_t call_id_;
+  int parse_delimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
 };
-}  // namespace hbase
+
+} // namespace hbase
diff --git a/hbase-native-client/serde/client-serializer-test.cc 
b/hbase-native-client/serde/client-serializer-test.cc
new file mode 100644
index 0000000..b32b55d
--- /dev/null
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <gtest/gtest.h>
+
+#include <folly/io/Cursor.h>
+
+#include <string>
+
+#include "serde/client-serializer.h"
+#include "if/HBase.pb.h"
+#include "if/RPC.pb.h"
+
+using namespace hbase;
+using namespace hbase::pb;
+using namespace folly;
+using namespace folly::io;
+
+TEST(ClientSerializerTest, PreambleIncludesHBas) {
+  ClientSerializer ser;
+  auto buf = ser.preamble();
+  const char *p = reinterpret_cast<const char *>(buf->data());
+  // Take the first for chars and make sure they are the
+  // magic string
+  EXPECT_EQ("HBas", std::string(p, 4));
+
+  EXPECT_EQ(6, buf->computeChainDataLength());
+}
+
+TEST(ClientSerializerTest, PreambleIncludesVersion) {
+  ClientSerializer ser;
+  auto buf = ser.preamble();
+  EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]);
+  EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]);
+}
+
+TEST(ClientSerializerTest, TestHeaderLengthPrefixed) {
+  ClientSerializer ser;
+  auto header = ser.header("elliott");
+
+  // The header should be prefixed by 4 bytes of length.
+  EXPECT_EQ(4, header->length());
+  EXPECT_TRUE(header->length() < header->computeChainDataLength());
+  EXPECT_TRUE(header->isChained());
+
+  // Now make sure the length is correct.
+  Cursor cursor(header.get());
+  auto prefixed_len = cursor.readBE<uint32_t>();
+  EXPECT_EQ(prefixed_len, header->next()->length());
+}
+
+TEST(ClientSerializerTest, TestHeaderDecode) {
+  ClientSerializer ser;
+  auto buf = ser.header("elliott");
+  auto header_buf = buf->next();
+  ConnectionHeader h;
+
+  EXPECT_TRUE(h.ParseFromArray(header_buf->data(), header_buf->length()));
+  EXPECT_EQ("elliott", h.user_info().effective_user());
+}
diff --git a/hbase-native-client/serde/client-serializer.cc 
b/hbase-native-client/serde/client-serializer.cc
new file mode 100644
index 0000000..881b6e4
--- /dev/null
+++ b/hbase-native-client/serde/client-serializer.cc
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "serde/client-serializer.h"
+
+#include <folly/io/Cursor.h>
+#include <folly/Logging.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include "if/HBase.pb.h"
+#include "if/RPC.pb.h"
+
+using namespace hbase;
+
+using folly::IOBuf;
+using folly::io::RWPrivateCursor;
+using google::protobuf::Message;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedOutputStream;
+using google::protobuf::io::ZeroCopyOutputStream;
+using std::string;
+using std::unique_ptr;
+
+static const std::string PREAMBLE = "HBas";
+static const std::string INTERFACE = "ClientService";
+static const uint8_t RPC_VERSION = 0;
+static const uint8_t DEFAULT_AUTH_TYPE = 80;
+
+ClientSerializer::ClientSerializer() : auth_type_(DEFAULT_AUTH_TYPE) {}
+
+unique_ptr<IOBuf> ClientSerializer::preamble() {
+  auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
+  magic->append(2);
+  RWPrivateCursor c(magic.get());
+  c.skip(4);
+  // Version
+  c.write(RPC_VERSION);
+  // Standard security aka Please don't lie to me.
+  c.write(auth_type_);
+  return magic;
+}
+
+unique_ptr<IOBuf> ClientSerializer::header(const string &user) {
+  pb::ConnectionHeader h;
+
+  // TODO(eclark): Make this not a total lie.
+  h.mutable_user_info()->set_effective_user(user);
+  // The service name that we want to talk to.
+  //
+  // Right now we're completely ignoring the service interface.
+  // That may or may not be the correct thing to do.
+  // It worked for a while with the java client; until it
+  // didn't.
+  h.set_service_name(INTERFACE);
+  return prepend_length(serialize_message(h));
+}
+
+unique_ptr<IOBuf> ClientSerializer::request(const uint32_t call_id,
+                                            const string &method,
+                                            const Message *msg) {
+  pb::RequestHeader rq;
+  rq.set_method_name(method);
+  rq.set_call_id(call_id);
+  rq.set_request_param(msg != nullptr);
+  auto ser_header = serialize_delimited(rq);
+  if (msg != nullptr) {
+    auto ser_req = serialize_delimited(*msg);
+    ser_header->appendChain(std::move(ser_req));
+  }
+
+  return prepend_length(std::move(ser_header));
+}
+
+unique_ptr<IOBuf> ClientSerializer::prepend_length(unique_ptr<IOBuf> msg) {
+  // Java ints are 4 long. So create a buffer that large
+  auto len_buf = IOBuf::create(4);
+  // Then make those bytes visible.
+  len_buf->append(4);
+
+  RWPrivateCursor c(len_buf.get());
+  // Get the size of the data to be pushed out the network.
+  auto size = msg->computeChainDataLength();
+
+  // Write the length to this IOBuf.
+  c.writeBE(static_cast<uint32_t>(size));
+
+  // Then attach the origional to the back of len_buf
+  len_buf->appendChain(std::move(msg));
+  return len_buf;
+}
+
+unique_ptr<IOBuf> ClientSerializer::serialize_delimited(const Message &msg) {
+  // Get the buffer size needed for just the message.
+  int msg_size = msg.ByteSize();
+  int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
+
+  // Create a buffer big enough to hold the varint and the object.
+  auto buf = IOBuf::create(buf_size);
+  buf->append(buf_size);
+
+  // Create the array output stream.
+  ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
+  // Wrap the ArrayOuputStream in the coded output stream to allow writing
+  // Varint32
+  CodedOutputStream cos{&aos};
+
+  // Write out the size.
+  cos.WriteVarint32(msg_size);
+
+  // Now write the rest out.
+  // We're using the protobuf output streams here to keep track
+  // of where in the output array we are rather than IOBuf.
+  msg.SerializeWithCachedSizesToArray(
+      cos.GetDirectBufferForNBytesAndAdvance(msg_size));
+
+  // Return the buffer.
+  return buf;
+}
+// TODO(eclark): Make this 1 copy.
+unique_ptr<IOBuf> ClientSerializer::serialize_message(const Message &msg) {
+  auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
+  return buf;
+}
diff --git a/hbase-native-client/core/client-serialize-handler.h 
b/hbase-native-client/serde/client-serializer.h
similarity index 55%
rename from hbase-native-client/core/client-serialize-handler.h
rename to hbase-native-client/serde/client-serializer.h
index 961a03b..685095d 100644
--- a/hbase-native-client/core/client-serialize-handler.h
+++ b/hbase-native-client/serde/client-serializer.h
@@ -18,27 +18,38 @@
  */
 #pragma once
 
-#include <wangle/channel/Handler.h>
+#include <folly/io/IOBuf.h>
+#include <string>
+#include <cstdint>
 
-#include "if/HBase.pb.h"
-#include "if/RPC.pb.h"
-#include "core/request.h"
-#include "core/response.h"
+// Forward
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+namespace hbase {
+class Request;
+}
 
 namespace hbase {
-class ClientSerializeHandler
-    : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, Request,
-                             std::unique_ptr<folly::IOBuf>> {
+class ClientSerializer {
 public:
-  void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override;
-  folly::Future<folly::Unit> write(Context *ctx, Request r) override;
+  ClientSerializer();
+  std::unique_ptr<folly::IOBuf> preamble();
+  std::unique_ptr<folly::IOBuf> header(const std::string &user);
+  std::unique_ptr<folly::IOBuf> request(const uint32_t call_id,
+                                        const std::string &method,
+                                        const google::protobuf::Message *msg);
+  std::unique_ptr<folly::IOBuf>
+  serialize_delimited(const google::protobuf::Message &msg);
+
+  std::unique_ptr<folly::IOBuf>
+  serialize_message(const google::protobuf::Message &msg);
 
-private:
-  folly::Future<folly::Unit> write_preamble(Context *ctx);
-  folly::Future<folly::Unit> write_header(Context *ctx);
-  // Our own simple version of LengthFieldPrepender
   std::unique_ptr<folly::IOBuf>
   prepend_length(std::unique_ptr<folly::IOBuf> msg);
-  bool need_send_header_ = true;
+
+  uint8_t auth_type_;
 };
-}  // namespace hbase
+} // namespace hbase

Reply via email to