http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/raw-async-table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/raw-async-table.h b/hbase-native-client/include/hbase/client/raw-async-table.h new file mode 100644 index 0000000..9db291e --- /dev/null +++ b/hbase-native-client/include/hbase/client/raw-async-table.h @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Unit.h> +#include <folly/futures/Future.h> +#include <chrono> +#include <memory> +#include <string> +#include <vector> + +#include "hbase/client/async-client-scanner.h" +#include "hbase/client/async-connection.h" +#include "hbase/client/async-rpc-retrying-caller-factory.h" +#include "hbase/client/async-rpc-retrying-caller.h" +#include "hbase/client/connection-configuration.h" +#include "hbase/client/delete.h" +#include "hbase/client/get.h" +#include "hbase/client/increment.h" +#include "hbase/client/put.h" +#include "hbase/client/result.h" +#include "hbase/client/scan.h" + +namespace hbase { + +/** + * A low level asynchronous table that should not be used by user applications.The implementation + * is required to be thread safe. + */ +class RawAsyncTable { + public: + RawAsyncTable(std::shared_ptr<pb::TableName> table_name, + std::shared_ptr<AsyncConnection> connection) + : connection_(connection), + connection_conf_(connection->connection_conf()), + table_name_(table_name), + rpc_client_(connection->rpc_client()) { + default_scanner_caching_ = connection_conf_->scanner_caching(); + default_scanner_max_result_size_ = connection_conf_->scanner_max_result_size(); + } + virtual ~RawAsyncTable() = default; + + folly::Future<std::shared_ptr<Result>> Get(const hbase::Get& get); + + folly::Future<folly::Unit> Delete(const hbase::Delete& del); + + folly::Future<std::shared_ptr<hbase::Result>> Append(const hbase::Append& append); + + folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment); + + folly::Future<folly::Unit> Put(const hbase::Put& put); + + folly::Future<bool> CheckAndPut(const std::string& row, const std::string& family, + const std::string& qualifier, const std::string& value, + const hbase::Put& put, + const pb::CompareType& compare_op = pb::CompareType::EQUAL); + + folly::Future<bool> CheckAndDelete(const std::string& row, const std::string& family, + const std::string& qualifier, const std::string& value, + const hbase::Delete& del, + const pb::CompareType& compare_op = pb::CompareType::EQUAL); + + void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer); + + void Close() {} + + folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get( + const std::vector<hbase::Get>& gets); + template <typename REQ, typename RESP> + folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ>& rows, + std::chrono::nanoseconds timeout); + folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Put( + const std::vector<hbase::Put>& puts); + + private: + /* Data */ + std::shared_ptr<AsyncConnection> connection_; + std::shared_ptr<ConnectionConfiguration> connection_conf_; + std::shared_ptr<pb::TableName> table_name_; + std::shared_ptr<RpcClient> rpc_client_; + int32_t default_scanner_caching_; + int64_t default_scanner_max_result_size_; + + /* Methods */ + template <typename REQ, typename PREQ, typename PRESP, typename RESP> + folly::Future<RESP> Call( + std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<RegionLocation> loc, const REQ& req, + const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, + const RespConverter<RESP, PRESP> resp_converter); + + template <typename RESP> + std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder( + std::string row, std::chrono::nanoseconds rpc_timeout); + + std::shared_ptr<hbase::Scan> SetDefaultScanConfig(const hbase::Scan& scan); +}; +} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/raw-scan-result-consumer.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/raw-scan-result-consumer.h b/hbase-native-client/include/hbase/client/raw-scan-result-consumer.h new file mode 100644 index 0000000..26ff709 --- /dev/null +++ b/hbase-native-client/include/hbase/client/raw-scan-result-consumer.h @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/ExceptionWrapper.h> +#include <folly/Logging.h> +#include <chrono> +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include "hbase/client/result.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" + +namespace hbase { + +enum class ScanControllerState { kInitialized, kSuspended, kTerminated, kDestroyed }; + +enum class ScanResumerState { kInitialized, kSuspended, kResumed }; + +/** + * Used to resume a scan. + */ +class ScanResumer { + public: + virtual ~ScanResumer() = default; + + /** + * Resume the scan. You are free to call it multiple time but only the first call will take + * effect. + */ + virtual void Resume() = 0; +}; + +/** + * Used to suspend or stop a scan. + * <p> + * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A + * IllegalStateException will be thrown if you call them at other places. + * <p> + * You can only call one of the methods below, i.e., call suspend or terminate(of course you are + * free to not call them both), and the methods are not reentrant. A IllegalStateException will be + * thrown if you have already called one of the methods. + */ +class ScanController { + public: + virtual ~ScanController() = default; + + /** + * Suspend the scan. + * <p> + * This means we will stop fetching data in background, i.e., will not call onNext any more + * before you resume the scan. + * @return A resumer used to resume the scan later. + */ + virtual std::shared_ptr<ScanResumer> Suspend() = 0; + + /** + * Terminate the scan. + * <p> + * This is useful when you have got enough results and want to stop the scan in onNext method, + * or you want to stop the scan in onHeartbeat method because it has spent too many time. + */ + virtual void Terminate() = 0; +}; + +/** + * Receives {@link Result} for an asynchronous scan. + * <p> + * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread + * which we send request to HBase service. So if you want the asynchronous scanner fetch data from + * HBase in background while you process the returned data, you need to move the processing work to + * another thread to make the {@code onNext} call return immediately. And please do NOT do any time + * consuming tasks in all methods below unless you know what you are doing. + */ +class RawScanResultConsumer { + public: + virtual ~RawScanResultConsumer() = default; + + /** + * Indicate that we have receive some data. + * @param results the data fetched from HBase service. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within scope of onNext method. You can only call its method in + * onNext, do NOT store it and call it later outside onNext. + */ + virtual void OnNext(const std::vector<std::shared_ptr<Result>> &results, + std::shared_ptr<ScanController> controller) {} + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + * <p> + * This method give you a chance to terminate a slow scan operation. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. + */ + virtual void OnHeartbeat(std::shared_ptr<ScanController> controller) {} + + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + * <p> + * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + virtual void OnError(const folly::exception_wrapper &error) {} + + /** + * Indicate that the scan operation is completed normally. + */ + virtual void OnComplete() {} +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/region-location.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/region-location.h b/hbase-native-client/include/hbase/client/region-location.h new file mode 100644 index 0000000..3eded91 --- /dev/null +++ b/hbase-native-client/include/hbase/client/region-location.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <memory> +#include <string> + +#include "hbase/if/HBase.pb.h" + +namespace hbase { + +enum class RegionLocateType { kBefore, kCurrent, kAfter }; + +/** + * @brief class to hold where a region is located. + * + * This class holds where a region is located, the information about it, the + * region name. + */ +class RegionLocation { + public: + /** + * Constructor. + * @param region_name The region name of this region. + * @param ri The decoded RegionInfo of this region. + * @param sn The server name of the HBase regionserver thought to be hosting + * this region. + */ + RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName sn) + : region_name_(region_name), ri_(ri), sn_(sn) {} + + /** + * Get a reference to the regio info + */ + const hbase::pb::RegionInfo ®ion_info() const { return ri_; } + + /** + * Get a reference to the server name + */ + const hbase::pb::ServerName &server_name() const { return sn_; } + + /** + * Get a reference to the region name. + */ + const std::string ®ion_name() const { return region_name_; } + + /** + * Set the servername if the region has moved. + */ + void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; } + + const std::string DebugString() const { + return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString(); + } + + private: + std::string region_name_; + hbase::pb::RegionInfo ri_; + hbase::pb::ServerName sn_; +}; + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/region-request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/region-request.h b/hbase-native-client/include/hbase/client/region-request.h new file mode 100644 index 0000000..9e7f85e --- /dev/null +++ b/hbase-native-client/include/hbase/client/region-request.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <memory> +#include <queue> +#include <vector> +#include "hbase/client/action.h" +#include "hbase/client/region-location.h" + +namespace hbase { + +class RegionRequest { + public: + // Concurrent + using ActionList = std::vector<std::shared_ptr<Action>>; + explicit RegionRequest(const std::shared_ptr<hbase::RegionLocation> ®ion_loc) + : region_loc_(region_loc) {} + ~RegionRequest() {} + void AddAction(std::shared_ptr<Action> action) { actions_.push_back(action); } + std::shared_ptr<hbase::RegionLocation> region_location() const { return region_loc_; } + const ActionList &actions() const { return actions_; } + + private: + std::shared_ptr<hbase::RegionLocation> region_loc_; + ActionList actions_; +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/region-result.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/region-result.h b/hbase-native-client/include/hbase/client/region-result.h new file mode 100644 index 0000000..87398a2 --- /dev/null +++ b/hbase-native-client/include/hbase/client/region-result.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <folly/ExceptionWrapper.h> +#include <map> +#include <memory> +#include <string> +#include <tuple> +#include "hbase/client/result.h" +#include "hbase/if/Client.pb.h" + +namespace hbase { + +using ResultOrExceptionTuple = + std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<folly::exception_wrapper>>; + +class RegionResult { + public: + RegionResult(); + void AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result, + std::shared_ptr<folly::exception_wrapper> exc); + + void set_stat(std::shared_ptr<pb::RegionLoadStats> stat); + + int ResultOrExceptionSize() const; + + std::shared_ptr<ResultOrExceptionTuple> ResultOrException(int32_t index) const; + + const std::shared_ptr<pb::RegionLoadStats>& stat() const; + + ~RegionResult(); + + private: + std::map<int, ResultOrExceptionTuple> result_or_excption_; + std::shared_ptr<pb::RegionLoadStats> stat_; +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/request-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/request-converter.h b/hbase-native-client/include/hbase/client/request-converter.h new file mode 100644 index 0000000..eef6b5d --- /dev/null +++ b/hbase-native-client/include/hbase/client/request-converter.h @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <memory> +#include <string> +#include <vector> +#include "hbase/connection/request.h" +#include "hbase/client/action.h" +#include "hbase/client/append.h" +#include "hbase/client/cell.h" +#include "hbase/client/delete.h" +#include "hbase/client/get.h" +#include "hbase/client/increment.h" +#include "hbase/client/mutation.h" +#include "hbase/client/put.h" +#include "hbase/client/region-request.h" +#include "hbase/client/scan.h" +#include "hbase/client/server-request.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" + +using MutationType = hbase::pb::MutationProto_MutationType; +using DeleteType = hbase::pb::MutationProto_DeleteType; + +namespace hbase { + +using ActionsByRegion = ServerRequest::ActionsByRegion; +/** + * RequestConverter class + * This class converts a Client side Get, Scan, Mutate operation to corresponding PB message. + */ +class RequestConverter { + public: + ~RequestConverter(); + + /** + * @brief Returns a Request object comprising of PB GetRequest created using + * passed 'get' + * @param get - Get object used for creating GetRequest + * @param region_name - table region + */ + static std::unique_ptr<Request> ToGetRequest(const Get &get, const std::string ®ion_name); + + /** + * @brief Returns a Request object comprising of PB ScanRequest created using + * passed 'scan' + * @param scan - Scan object used for creating ScanRequest + * @param region_name - table region + */ + static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name); + + static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name, + int32_t num_rows, bool close_scanner); + + static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner); + + static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner, int64_t next_call_seq_id, + bool renew); + + static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion ®ion_requests); + + static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del, + const std::string ®ion_name); + + static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string ®ion_name); + + static std::unique_ptr<Request> CheckAndPutToMutateRequest( + const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const pb::CompareType compare_op, const hbase::Put &put, + const std::string ®ion_name); + + static std::unique_ptr<Request> CheckAndDeleteToMutateRequest( + const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const pb::CompareType compare_op, const hbase::Delete &del, + const std::string ®ion_name); + + static std::unique_ptr<Request> IncrementToMutateRequest(const Increment &incr, + const std::string ®ion_name); + + static std::unique_ptr<pb::MutationProto> ToMutation(const MutationType type, + const Mutation &mutation, + const int64_t nonce); + + static std::unique_ptr<Request> AppendToMutateRequest(const Append &append, + const std::string ®ion_name); + + private: + // Constructor not required. We have all static methods to create PB requests. + RequestConverter(); + + /** + * @brief fills region_specifier with region values. + * @param region_name - table region + * @param region_specifier - RegionSpecifier to be filled and passed in PB + * Request. + */ + static void SetRegion(const std::string ®ion_name, pb::RegionSpecifier *region_specifier); + static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get); + static std::unique_ptr<hbase::pb::Scan> ToScan(const Scan &scan); + static DeleteType ToDeleteType(const CellType type); + static bool IsDelete(const CellType type); + static void SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest>, bool renew); +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/response-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/response-converter.h b/hbase-native-client/include/hbase/client/response-converter.h new file mode 100644 index 0000000..14757a5 --- /dev/null +++ b/hbase-native-client/include/hbase/client/response-converter.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <memory> +#include <vector> +#include "hbase/connection/request.h" +#include "hbase/connection/response.h" +#include "hbase/client/multi-response.h" +#include "hbase/client/result.h" +#include "hbase/client/server-request.h" +#include "hbase/if/Client.pb.h" +#include "hbase/serde/cell-scanner.h" + +namespace hbase { + +/** + * ResponseConverter class + * This class converts a PB Response to corresponding Result or other objects. + */ +class ResponseConverter { + public: + ~ResponseConverter(); + + static std::shared_ptr<Result> ToResult(const hbase::pb::Result& result, + const std::shared_ptr<CellScanner> cell_scanner); + + /** + * @brief Returns a Result object created by PB Message in passed Response object. + * @param resp - Response object having the PB message. + */ + static std::shared_ptr<hbase::Result> FromGetResponse(const Response& resp); + + static std::shared_ptr<hbase::Result> FromMutateResponse(const Response& resp); + + static bool BoolFromMutateResponse(const Response& resp); + + static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp); + + static std::vector<std::shared_ptr<Result>> FromScanResponse( + const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner); + + static std::unique_ptr<hbase::MultiResponse> GetResults( + std::shared_ptr<Request> req, const Response& resp, + const ServerRequest::ActionsByRegion& actions_by_region); + + private: + // Constructor not required. We have all static methods to extract response from PB messages. + ResponseConverter(); + static std::shared_ptr<folly::exception_wrapper> GetRemoteException( + const hbase::pb::NameBytesPair& exc_resp); +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/result-scanner.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/result-scanner.h b/hbase-native-client/include/hbase/client/result-scanner.h new file mode 100644 index 0000000..0b22684 --- /dev/null +++ b/hbase-native-client/include/hbase/client/result-scanner.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <functional> +#include <iterator> +#include <map> +#include <memory> +#include <string> +#include <vector> + +#include "hbase/client/cell.h" +#include "hbase/client/result.h" + +namespace hbase { + +/** + * Interface for client-side scanning. Use Table to obtain instances. + */ +class ResultScanner { + // TODO: should we implement forward iterators? + + public: + virtual ~ResultScanner() {} + + virtual void Close() = 0; + + virtual std::shared_ptr<Result> Next() = 0; +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/result.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/result.h b/hbase-native-client/include/hbase/client/result.h new file mode 100644 index 0000000..e5222b6 --- /dev/null +++ b/hbase-native-client/include/hbase/client/result.h @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <functional> +#include <map> +#include <memory> +#include <string> +#include <vector> + +#include "hbase/client/cell.h" +#include "hbase/utils/optional.h" + +namespace hbase { + +/** + * @brief Map of families to all versions of its qualifiers and values + * We need to have a reverse ordered map, when storing TS -> value, so that the + * most recent value is stored first + */ +using ResultMap = + std::map<std::string, + std::map<std::string, std::map<int64_t, std::string, std::greater<int64_t> > > >; + +class Result { + public: + /** + * Constructors + */ + Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale, bool partial); + Result(const Result &result); + ~Result(); + + /** + * @brief Return the vector of Cells backing this Result instance. This vector + * will be ordered in the same manner + * as the one which was passed while creation of the Result instance. + */ + const std::vector<std::shared_ptr<Cell> > &Cells() const; + + /** + * @brief Return a vector of Cells for the family and qualifier or empty list + * if the column + * did not exist in the result. + * @param family - column family + * @param qualifier - column qualifier + */ + std::vector<std::shared_ptr<Cell> > ColumnCells(const std::string &family, + const std::string &qualifier) const; + + /** + * @brief Returns the Cell for the most recent timestamp for a given family + * and qualifier. + * Returns map of qualifiers to values, only includes latest values + * @param family - column family. + * @param qualifier - column qualifier + */ + const std::shared_ptr<Cell> ColumnLatestCell(const std::string &family, + const std::string &qualifier) const; + + /** + * @brief Get the latest version of the specified family and qualifier. + * @param family - column family + * @param qualifier - column qualifier + */ + optional<std::string> Value(const std::string &family, const std::string &qualifier) const; + + /** + * @brief Returns if the underlying Cell vector is empty or not + */ + bool IsEmpty() const; + + /** + * @brief Retrieves the row key that corresponds to the row from which this + * Result was created. + */ + const std::string &Row() const; + + /** + * @brief Returns the size of the underlying Cell vector + */ + int Size() const; + + /** + * @brief Map of families to all versions of its qualifiers and values. + * Returns a three level Map of the form: + * Map<family,Map<qualifier,Map<timestamp,value>>>> + * All other map returning methods make use of this map internally + * The Map is created when the Result instance is created + */ + ResultMap Map() const; + + /** + * @brief Map of qualifiers to values. + * Returns a Map of the form: Map<qualifier,value> + * @param family - column family to get + */ + std::map<std::string, std::string> FamilyMap(const std::string &family) const; + + std::string DebugString() const; + + bool Exists() const { return exists_; } + + bool Stale() const { return stale_; } + + bool Partial() const { return partial_; } + + /** Returns estimated size of the Result object including deep heap space usage + * of its Cells and data. Notice that this is a very rough estimate. */ + size_t EstimatedSize() const; + + private: + bool exists_ = false; + bool stale_ = false; + bool partial_ = false; + std::string row_ = ""; + std::vector<std::shared_ptr<Cell> > cells_; +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/row.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/row.h b/hbase-native-client/include/hbase/client/row.h new file mode 100644 index 0000000..2c7bdd1 --- /dev/null +++ b/hbase-native-client/include/hbase/client/row.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <limits> +#include <stdexcept> +#include <string> + +#pragma once + +namespace hbase { + +class Row { + public: + Row() {} + explicit Row(const std::string &row) : row_(row) { CheckRow(row_); } + + /** + * @brief Returns the row for the Row interface. + */ + const std::string &row() const { return row_; } + virtual ~Row() {} + + private: + /** + * @brief Checks if the row for this Get operation is proper or not + * @param row Row to check + * @throws std::runtime_error if row is empty or greater than + * MAX_ROW_LENGTH(i.e. std::numeric_limits<short>::max()) + */ + void CheckRow(const std::string &row) { + const int16_t kMaxRowLength = std::numeric_limits<int16_t>::max(); + size_t row_length = row.size(); + if (0 == row_length) { + throw std::runtime_error("Row length can't be 0"); + } + if (row_length > kMaxRowLength) { + throw std::runtime_error("Length of " + row + " is greater than max row size: " + + std::to_string(kMaxRowLength)); + } + } + + protected: + std::string row_ = ""; +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/scan-result-cache.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/scan-result-cache.h b/hbase-native-client/include/hbase/client/scan-result-cache.h new file mode 100644 index 0000000..e423108 --- /dev/null +++ b/hbase-native-client/include/hbase/client/scan-result-cache.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Logging.h> +#include <algorithm> +#include <chrono> +#include <iterator> +#include <memory> +#include <string> +#include <vector> + +#include "hbase/client/result.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" + +namespace hbase { + +class ScanResultCache { + // In Java, there are 3 different implementations for this. We are not doing partial results, + // or scan batching in native code for now, so this version is simpler and + // only deals with giving back complete rows as Result. It is more or less implementation + // of CompleteScanResultCache.java + + public: + /** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ + std::vector<std::shared_ptr<Result>> AddAndGet( + const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat); + + void Clear(); + + int64_t num_complete_rows() const { return num_complete_rows_; } + + private: + /** + * Forms a single result from the partial results in the partialResults list. This method is + * useful for reconstructing partial results on the client side. + * @param partial_results list of partial results + * @return The complete result that is formed by combining all of the partial results together + */ + static std::shared_ptr<Result> CreateCompleteResult( + const std::vector<std::shared_ptr<Result>> &partial_results); + + std::shared_ptr<Result> Combine(); + + std::vector<std::shared_ptr<Result>> PrependCombined( + const std::vector<std::shared_ptr<Result>> &results, int length); + + std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn( + const std::shared_ptr<Result> &result); + + std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn( + const std::vector<std::shared_ptr<Result>> &results); + + private: + std::vector<std::shared_ptr<Result>> partial_results_; + int64_t num_complete_rows_ = 0; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/scan.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/scan.h b/hbase-native-client/include/hbase/client/scan.h new file mode 100644 index 0000000..90daa31 --- /dev/null +++ b/hbase-native-client/include/hbase/client/scan.h @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <vector> +#include "hbase/client/get.h" +#include "hbase/client/time-range.h" +#include "hbase/if/Client.pb.h" + +namespace hbase { + +class Scan : public Query { + public: + /** + * @brief Constructors. Create a Scan operation across all rows. + */ + Scan(); + Scan(const Scan &scan); + Scan &operator=(const Scan &scan); + + ~Scan(); + + /** + * @brief Create a Scan operation starting at the specified row. If the + * specified row does not exist, + * the Scanner will start from the next closest row after the specified row. + * @param start_row - row to start scanner at or after + */ + explicit Scan(const std::string &start_row); + + /** + * @brief Create a Scan operation for the range of rows specified. + * @param start_row - row to start scanner at or after (inclusive). + * @param stop_row - row to stop scanner before (exclusive). + */ + Scan(const std::string &start_row, const std::string &stop_row); + + /** + * @brief Builds a scan object with the same specs as get. + * @param get - get to model scan after + */ + explicit Scan(const Get &get); + + /** + * @brief Get all columns from the specified family.Overrides previous calls + * to AddColumn for this family. + * @param family - family name + */ + Scan &AddFamily(const std::string &family); + + /** + * @brief Get the column from the specified family with the specified + * qualifier.Overrides previous calls to AddFamily for this family. + * @param family - family name. + * @param qualifier - column qualifier. + */ + Scan &AddColumn(const std::string &family, const std::string &qualifier); + + /** + * @brief Set whether this scan is a reversed one. This is false by default + * which means forward(normal) scan. + * @param reversed - if true, scan will be backward order + */ + void SetReversed(bool reversed); + + /** + * @brief Get whether this scan is a reversed one. Returns true if backward + * scan, false if forward(default) scan + */ + bool IsReversed() const; + + /** + * @brief Set the start row of the scan.If the specified row does not exist, + * the Scanner will start from the next closest row after the specified row. + * @param start_row - row to start scanner at or after + * @throws std::runtime_error if start_row length is 0 or greater than + * MAX_ROW_LENGTH + */ + void SetStartRow(const std::string &start_row); + + /** + * @brief returns start_row of the Scan. + */ + const std::string &StartRow() const; + + /** + * @brief Set the stop row of the scan. The scan will include rows that are + * lexicographically less than the provided stop_row. + * @param stop_row - row to end at (exclusive) + * @throws std::runtime_error if stop_row length is 0 or greater than + * MAX_ROW_LENGTH + */ + void SetStopRow(const std::string &stop_row); + + /** + * @brief returns stop_row of the Scan. + */ + const std::string &StopRow() const; + + /** + * @brief Set the number of rows for caching that will be passed to scanners. + * Higher caching values will enable faster scanners but will use more memory. + * @param caching - the number of rows for caching. + */ + void SetCaching(int caching); + + /** + * @brief caching the number of rows fetched when calling next on a scanner. + */ + int Caching() const; + + /** + * @brief Sets the consistency level for this operation. + * @param consistency - the consistency level + */ + Scan &SetConsistency(const hbase::pb::Consistency consistency); + + /** + * @brief Returns the consistency level for this operation. + */ + hbase::pb::Consistency Consistency() const; + + /** + * @brief Set whether blocks should be cached for this Scan.This is true by + * default. When true, default settings of the table and family are used (this + * will never override caching blocks if the block cache is disabled for that + * family or entirely). + * @param cache_blocks - if false, default settings are overridden and blocks + * will not be cached + */ + void SetCacheBlocks(bool cache_blocks); + + /** + * @brief Get whether blocks should be cached for this Scan. + */ + bool CacheBlocks() const; + + /** + * @brief Setting whether the caller wants to see the partial results that may + * be returned from the server. By default this value is false and the + * complete results will be assembled client side before being delivered to + * the caller. + * @param allow_partial_results - if true partial results will be returned. + */ + void SetAllowPartialResults(bool allow_partial_results); + + /** + * @brief true when the constructor of this scan understands that the results + * they will see may only represent a partial portion of a row. The entire row + * would be retrieved by subsequent calls to ResultScanner.next() + */ + bool AllowPartialResults() const; + + /** + * @brief Set the value indicating whether loading CFs on demand should be + * allowed (cluster default is false). On-demand CF loading doesn't load + * column families until necessary. + * @param load_column_families_on_demand + */ + void SetLoadColumnFamiliesOnDemand(bool load_column_families_on_demand); + + /** + * @brief Get the raw loadColumnFamiliesOnDemand setting. + */ + bool LoadColumnFamiliesOnDemand() const; + + /** + * @brief Get up to the specified number of versions of each column if + * specified else get default i.e. one. + * @param max_versions - maximum versions for each column. + */ + Scan &SetMaxVersions(uint32_t max_versions = 1); + + /** + * @brief the max number of versions to fetch + */ + int MaxVersions() const; + + /** + * @brief Set the maximum result size. The default is -1; this means that no + * specific maximum result size will be set for this scan, and the global + * configured value will be used instead. (Defaults to unlimited). + * @param The maximum result size in bytes. + */ + void SetMaxResultSize(int64_t max_result_size); + + /** + * @brief the maximum result size in bytes. + */ + int64_t MaxResultSize() const; + + /** + * @brief Get versions of columns only within the specified timestamp range, + * [min_stamp, max_stamp). Note, default maximum versions to return is 1. If + * your time range spans more than one version and you want all versions + * returned, up the number of versions beyond the default. + * @param min_stamp - minimum timestamp value, inclusive. + * @param max_stamp - maximum timestamp value, exclusive. + */ + Scan &SetTimeRange(int64_t min_stamp, int64_t max_stamp); + + /** + * @brief Get versions of columns with the specified timestamp. Note, default + * maximum versions to return is 1. If your time range spans more than one + * version and you want all versions returned, up the number of versions + * beyond the defaut. + * @param timestamp - version timestamp + */ + Scan &SetTimeStamp(int64_t timestamp); + + /** + * @brief Return Timerange + */ + const TimeRange &Timerange() const; + + /** + * @brief Returns true if family map is non empty false otherwise + */ + bool HasFamilies() const; + + /** + * @brief Returns the Scan family map for this Scan operation. + */ + const std::map<std::string, std::vector<std::string>> &FamilyMap() const; + + private: + std::string start_row_ = ""; + std::string stop_row_ = ""; + uint32_t max_versions_ = 1; + int32_t caching_ = -1; + int64_t max_result_size_ = -1; + bool cache_blocks_ = true; + bool load_column_families_on_demand_ = false; + bool reversed_ = false; + bool allow_partial_results_ = false; + hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG; + std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>(); + std::map<std::string, std::vector<std::string>> family_map_; + + /** + * @brief Checks for row length validity, throws if length check fails, + * returns null otherwise. + * @param row - row whose validity needs to be checked + * @throws std::runtime_error if row length equals 0 or greater than + * std::numeric_limits<short>::max(); + */ + void CheckRow(const std::string &row); +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/server-request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/server-request.h b/hbase-native-client/include/hbase/client/server-request.h new file mode 100644 index 0000000..6ad8c66 --- /dev/null +++ b/hbase-native-client/include/hbase/client/server-request.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <map> +#include <memory> +#include <stdexcept> +#include <string> +#include "hbase/client/action.h" +#include "hbase/client/region-location.h" +#include "hbase/client/region-request.h" + +namespace hbase { + +class ServerRequest { + public: + // Concurrent + using ActionsByRegion = std::map<std::string, std::shared_ptr<RegionRequest>>; + + explicit ServerRequest(std::shared_ptr<RegionLocation> region_location) { + auto region_name = region_location->region_name(); + auto region_request = std::make_shared<RegionRequest>(region_location); + actions_by_region_[region_name] = region_request; + } + ~ServerRequest() {} + + void AddActionsByRegion(std::shared_ptr<RegionLocation> region_location, + std::shared_ptr<Action> action) { + auto region_name = region_location->region_name(); + auto search = actions_by_region_.find(region_name); + if (search == actions_by_region_.end()) { + auto region_request = std::make_shared<RegionRequest>(region_location); + actions_by_region_[region_name] = region_request; + actions_by_region_[region_name]->AddAction(action); + } else { + search->second->AddAction(action); + } + } + + const ActionsByRegion &actions_by_region() const { return actions_by_region_; } + + private: + ActionsByRegion actions_by_region_; +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/table.h b/hbase-native-client/include/hbase/client/table.h new file mode 100644 index 0000000..338d19f --- /dev/null +++ b/hbase-native-client/include/hbase/client/table.h @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <chrono> +#include <memory> +#include <string> +#include <vector> + +#include "hbase/connection/rpc-client.h" +#include "hbase/client/async-connection.h" +#include "hbase/client/client.h" +#include "hbase/client/configuration.h" +#include "hbase/client/get.h" +#include "hbase/client/location-cache.h" +#include "hbase/client/put.h" +#include "hbase/client/raw-async-table.h" +#include "hbase/client/result-scanner.h" +#include "hbase/client/result.h" +#include "hbase/serde/table-name.h" + +namespace hbase { + +class Client; + +class Table { + public: + /** + * Constructors + */ + Table(const pb::TableName &table_name, std::shared_ptr<AsyncConnection> async_connection); + ~Table(); + + /** + * @brief - Returns a Result object for the constructed Get. + * @param - get Get object to perform HBase Get operation. + */ + std::shared_ptr<hbase::Result> Get(const hbase::Get &get); + + std::vector<std::shared_ptr<hbase::Result>> Get(const std::vector<hbase::Get> &gets); + + /** + * @brief - Puts some data in the table. + * @param - put Put object to perform HBase Put operation. + */ + void Put(const hbase::Put &put); + + /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the put. If the passed value is null, the check + * is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param put data to put if check succeeds + * @param compare_op comparison operator to use + * @throws IOException e + * @return true if the new put was executed, false otherwise + */ + bool CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const hbase::Put &put, + const pb::CompareType &compare_op = pb::CompareType::EQUAL); + /** + * @brief - Deletes some data in the table. + * @param - del Delete object to perform HBase Delete operation. + */ + void Delete(const hbase::Delete &del); + + /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the delete. If the passed value is null, the + * check is for the lack of column (ie: non-existence) + * + * The expected value argument of this call is on the left and the current + * value of the cell is on the right side of the comparison operator. + * + * Ie. eg. GREATER operator means expected value > existing <=> add the delete. + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compare_op comparison operator to use + * @param value the expected value + * @param del data to delete if check succeeds + * @return true if the new delete was executed, false otherwise + */ + bool CheckAndDelete(const std::string &row, const std::string &family, + const std::string &qualifier, const std::string &value, + const hbase::Delete &del, + const pb::CompareType &compare_op = pb::CompareType::EQUAL); + + /** + * @brief - Increments some data in the table. + * @param - increment Increment object to perform HBase Increment operation. + */ + std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment); + + /** + * @brief - Appends some data in the table. + * @param - append Append object to perform HBase Append operation. + */ + std::shared_ptr<hbase::Result> Append(const hbase::Append &append); + + std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan); + + /** + * @brief - Multi Puts. + * @param - puts vector of hbase::Put. + */ + void Put(const std::vector<hbase::Put> &puts); + /** + * @brief - Close the client connection. + */ + void Close(); + + /** + * @brief - Get region location for a row in current table. + */ + std::shared_ptr<RegionLocation> GetRegionLocation(const std::string &row); + + private: + std::shared_ptr<pb::TableName> table_name_; + std::shared_ptr<AsyncConnection> async_connection_; + std::shared_ptr<hbase::Configuration> conf_; + std::unique_ptr<RawAsyncTable> async_table_; + + private: + std::chrono::milliseconds operation_timeout() const; + + int64_t ResultSize2CacheSize(int64_t max_results_size) const; +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/time-range.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/time-range.h b/hbase-native-client/include/hbase/client/time-range.h new file mode 100644 index 0000000..d645ecd --- /dev/null +++ b/hbase-native-client/include/hbase/client/time-range.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <cstdint> + +namespace hbase { +class TimeRange { + public: + /** + * @brief Default constructor. Represents interval [0, + * std::numeric_limits<int64_t>::max()) + * (allTime) + */ + TimeRange(); + TimeRange(const TimeRange &tr); + TimeRange &operator=(const TimeRange &tr); + /** + * @brief Represents interval [minStamp, std::numeric_limits<int64_t>::max()) + * @param minStamp the minimum timestamp value, inclusive + */ + explicit TimeRange(int64_t min_timestamp); + /** + * @brief Represents interval [minStamp, maxStamp) + * @param minStamp the minimum timestamp, inclusive + * @param maxStamp the maximum timestamp, exclusive + * @throws std::runtime_error if min_timestamp < 0 or max_timestamp < 0 or + * max_timestamp < min_timestamp + */ + TimeRange(int64_t min_timestamp, int64_t max_timestamp); + int64_t MinTimeStamp() const; + int64_t MaxTimeStamp() const; + bool IsAllTime() const; + ~TimeRange(); + + private: + int64_t min_timestamp_; + int64_t max_timestamp_; + bool all_time_; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/zk-util.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/zk-util.h b/hbase-native-client/include/hbase/client/zk-util.h new file mode 100644 index 0000000..5d9a099 --- /dev/null +++ b/hbase-native-client/include/hbase/client/zk-util.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <cstdlib> +#include <string> +#include "hbase/client/configuration.h" + +namespace hbase { + +class ZKUtil { + public: + static constexpr const char* kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; + static constexpr const char* kDefHBaseZookeeperQuorum_ = "localhost:2181"; + static constexpr const char* kHBaseZookeeperClientPort_ = "hbase.zookeeper.property.clientPort"; + static constexpr const int32_t kDefHBaseZookeeperClientPort_ = 2181; + static constexpr const char* kHBaseZnodeParent_ = "zookeeper.znode.parent"; + static constexpr const char* kDefHBaseZnodeParent_ = "/hbase"; + static constexpr const char* kHBaseMetaRegionServer_ = "meta-region-server"; + + static constexpr const char* kHBaseZookeeperSessionTimeout_ = "zookeeper.session.timeout"; + static constexpr const int32_t kDefHBaseZookeeperSessionTimeout_ = 90000; + + static std::string ParseZooKeeperQuorum(const hbase::Configuration& conf); + + static std::string MetaZNode(const hbase::Configuration& conf); + + static int32_t SessionTimeout(const hbase::Configuration& conf); +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/connection/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/connection/BUCK b/hbase-native-client/include/hbase/connection/BUCK new file mode 100644 index 0000000..9a0b0cf --- /dev/null +++ b/hbase-native-client/include/hbase/connection/BUCK @@ -0,0 +1,57 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is the library dealing with a single connection +# to a single server. +cxx_library( + name="connection", + header_namespace="hbase/connection", + exported_headers=[ + "client-dispatcher.h", + "client-handler.h", + "sasl-handler.h", + "connection-factory.h", + "connection-pool.h", + "connection-id.h", + "pipeline.h", + "request.h", + "rpc-connection.h", + "response.h", + "service.h", + "rpc-client.h", + "sasl-util.h", + "rpc-test-server.h", + "rpc-test-server-handler.h", + "rpc-fault-injector.h", + "rpc-fault-injector-inl.h", + ], + deps=[ + "//src/hbase/if:if", + "//include/hbase/utils:utils", + "//include/hbase/serde:serde", + "//include/hbase/security:security", + "//third-party:folly", + "//third-party:wangle", + "//include/hbase/exceptions:exceptions", + ], + compiler_flags=['-Weffc++'], + linker_flags=['-L/usr/local/lib', '-lsasl2', '-lkrb5'], + exported_linker_flags=['-L/usr/local/lib', '-lsasl2', '-lkrb5'], + visibility=[ + '//include/hbase/client/...', + '//src/hbase/connection/...', + ],) http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/connection/client-dispatcher.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/connection/client-dispatcher.h b/hbase-native-client/include/hbase/connection/client-dispatcher.h new file mode 100644 index 0000000..33384a7 --- /dev/null +++ b/hbase-native-client/include/hbase/connection/client-dispatcher.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <folly/Logging.h> +#include <wangle/service/ClientDispatcher.h> + +#include <atomic> +#include <map> +#include <memory> +#include <mutex> +#include <string> + +#include "hbase/connection/pipeline.h" +#include "hbase/connection/request.h" +#include "hbase/connection/response.h" +#include "hbase/utils/concurrent-map.h" + +namespace hbase { + +/** + * Dispatcher that assigns a call_id and then routes the response back to the + * future. + */ +class ClientDispatcher + : public wangle::ClientDispatcherBase<SerializePipeline, std::unique_ptr<Request>, + std::unique_ptr<Response>> { + public: + /** Create a new ClientDispatcher */ + explicit ClientDispatcher(const std::string &server); + /** Read a response off the pipeline. */ + void read(Context *ctx, std::unique_ptr<Response> in) override; + void readException(Context *ctx, folly::exception_wrapper e) override; + void readEOF(Context *ctx) override; + /** Take a request as a call and send it down the pipeline. */ + folly::Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> arg) override; + /** Close the dispatcher and the associated pipeline. */ + folly::Future<folly::Unit> close(Context *ctx) override; + /** Close the dispatcher and the associated pipeline. */ + folly::Future<folly::Unit> close() override; + + private: + void CloseAndCleanUpCalls(); + + private: + std::recursive_mutex mutex_; + concurrent_map<uint32_t, folly::Promise<std::unique_ptr<Response>>> requests_; + // Start at some number way above what could + // be there for un-initialized call id counters. + // + // This makes it easier to make sure that the're are + // no access to un-initialized variables. + // + // uint32_t has a max of 4Billion so 10 more or less is + // not a big deal. + std::atomic<uint32_t> current_call_id_; + std::string server_; + bool is_closed_; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/connection/client-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/connection/client-handler.h b/hbase-native-client/include/hbase/connection/client-handler.h new file mode 100644 index 0000000..b3bd2b6 --- /dev/null +++ b/hbase-native-client/include/hbase/connection/client-handler.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <wangle/channel/Handler.h> + +#include <atomic> +#include <memory> +#include <mutex> +#include <string> +#include <utility> + +#include "hbase/client/configuration.h" +#include "hbase/exceptions/exception.h" +#include "hbase/serde/codec.h" +#include "hbase/serde/rpc-serde.h" +#include "hbase/utils/concurrent-map.h" + +// Forward decs. +namespace hbase { +class Request; +class Response; +class HeaderInfo; +} +namespace google { +namespace protobuf { +class Message; +} +} + +namespace hbase { + +/** + * wangle::Handler implementation to convert hbase::Request to IOBuf and + * convert IOBuf to hbase::Response. + * + * This class deals with sending the connection header and preamble + * on first request. + */ +class ClientHandler + : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Response>, + std::unique_ptr<Request>, std::unique_ptr<folly::IOBuf>> { + public: + /** + * Create the handler + * @param user_name the user name of the user running this process. + */ + ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, + std::shared_ptr<Configuration> conf, const std::string &server); + + /** + * Get bytes from the wire. + * This should be the full message as the length field decoder should be + * in the pipeline before this. + */ + void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override; + + /** + * Write the data down the wire. + */ + folly::Future<folly::Unit> write(Context *ctx, std::unique_ptr<Request> r) override; + + private: + std::unique_ptr<std::once_flag> once_flag_; + std::string user_name_; + RpcSerde serde_; + std::string server_; // for logging + std::shared_ptr<Configuration> conf_; + + // in flight requests + std::unique_ptr<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>> resp_msgs_; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/connection/connection-factory.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/connection/connection-factory.h b/hbase-native-client/include/hbase/connection/connection-factory.h new file mode 100644 index 0000000..14b7fda --- /dev/null +++ b/hbase-native-client/include/hbase/connection/connection-factory.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <wangle/concurrent/CPUThreadPoolExecutor.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> +#include <wangle/service/Service.h> + +#include <chrono> +#include <memory> +#include <string> + +#include "hbase/connection/pipeline.h" +#include "hbase/connection/request.h" +#include "hbase/connection/response.h" +#include "hbase/connection/service.h" +#include "hbase/security/user.h" + +namespace hbase { + +class RpcConnection; + +/** + * Class to create a ClientBootstrap and turn it into a connected + * pipeline. + */ +class ConnectionFactory { + public: + /** + * Constructor. + * There should only be one ConnectionFactory per client. + */ + ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, + std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf, + std::chrono::nanoseconds connect_timeout = std::chrono::nanoseconds(0)); + + /** Default Destructor */ + virtual ~ConnectionFactory() = default; + + /** + * Create a BootStrap from which a connection can be made. + */ + virtual std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> MakeBootstrap(); + + /** + * Connect a ClientBootstrap to a server and return the pipeline. + * + * This is mostly visible so that mocks can override socket connections. + */ + virtual std::shared_ptr<HBaseService> Connect( + std::shared_ptr<RpcConnection> rpc_connection, + std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client_bootstrap, + const std::string &hostname, uint16_t port); + + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() { return io_executor_; } + + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() { return cpu_executor_; } + + private: + std::chrono::nanoseconds connect_timeout_; + std::shared_ptr<Configuration> conf_; + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; + std::shared_ptr<RpcPipelineFactory> pipeline_factory_; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/connection/connection-id.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/connection/connection-id.h b/hbase-native-client/include/hbase/connection/connection-id.h new file mode 100644 index 0000000..98f5b47 --- /dev/null +++ b/hbase-native-client/include/hbase/connection/connection-id.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <boost/functional/hash.hpp> + +#include <memory> +#include <string> +#include <utility> + +#include "hbase/if/HBase.pb.h" +#include "hbase/security/user.h" + +namespace hbase { + +class ConnectionId { + public: + ConnectionId(const std::string &host, uint16_t port) + : ConnectionId(host, port, security::User::defaultUser(), "") {} + + ConnectionId(const std::string &host, uint16_t port, std::shared_ptr<security::User> user) + : ConnectionId(host, port, user, "") {} + + ConnectionId(const std::string &host, uint16_t port, std::shared_ptr<security::User> user, + const std::string &service_name) + : user_(user), service_name_(service_name), host_(host), port_(port) {} + + ConnectionId(const std::string &host, uint16_t port, const std::string &service_name) + : user_(security::User::defaultUser()), + service_name_(service_name), + host_(host), + port_(port) {} + + virtual ~ConnectionId() = default; + + std::shared_ptr<security::User> user() const { return user_; } + std::string service_name() const { return service_name_; } + std::string host() { return host_; } + uint16_t port() { return port_; } + + private: + std::shared_ptr<security::User> user_; + std::string service_name_; + std::string host_; + uint16_t port_; +}; + +/* Equals function for ConnectionId */ +struct ConnectionIdEquals { + /** equals */ + bool operator()(const std::shared_ptr<ConnectionId> &lhs, + const std::shared_ptr<ConnectionId> &rhs) const { + return userEquals(lhs->user(), rhs->user()) && lhs->host() == rhs->host() && + lhs->port() == rhs->port() && lhs->service_name() == rhs->service_name(); + } + + private: + bool userEquals(const std::shared_ptr<security::User> &lhs, + const std::shared_ptr<security::User> &rhs) const { + return lhs == nullptr ? rhs == nullptr + : (rhs == nullptr ? false : lhs->user_name() == rhs->user_name()); + } +}; + +/** Hash for ConnectionId. */ +struct ConnectionIdHash { + /** hash */ + std::size_t operator()(const std::shared_ptr<ConnectionId> &ci) const { + std::size_t h = 0; + boost::hash_combine(h, ci->user() == nullptr ? 0 : ci->user()->user_name()); + boost::hash_combine(h, ci->host()); + boost::hash_combine(h, ci->port()); + boost::hash_combine(h, ci->service_name()); + return h; + } +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/connection/connection-pool.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/connection/connection-pool.h b/hbase-native-client/include/hbase/connection/connection-pool.h new file mode 100644 index 0000000..1198c33 --- /dev/null +++ b/hbase-native-client/include/hbase/connection/connection-pool.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/SharedMutex.h> +#include <boost/functional/hash.hpp> +#include <chrono> +#include <memory> +#include <mutex> +#include <unordered_map> + +#include "hbase/connection/connection-factory.h" +#include "hbase/connection/connection-id.h" +#include "hbase/connection/rpc-connection.h" +#include "hbase/connection/service.h" +#include "hbase/if/HBase.pb.h" + +namespace hbase { + +/** + * @brief Connection pooling for HBase rpc connection. + * + * This is a thread safe connection pool. It allows getting + * a shared rpc connection to HBase servers by connection id. + */ +class ConnectionPool { + public: + /** Create connection pool wit default connection factory */ + ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, + std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf, + std::chrono::nanoseconds connect_timeout = std::chrono::nanoseconds(0)); + + /** + * Constructor that allows specifiying the connetion factory. + * This is useful for testing. + */ + explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf); + + /** + * Destructor. + * All connections will be close. + * All connections will be released + */ + ~ConnectionPool(); + + /** + * Get a connection to the server name. Start time is ignored. + * This can be a blocking operation for a short time. + */ + std::shared_ptr<RpcConnection> GetConnection(std::shared_ptr<ConnectionId> remote_id); + + /** + * Close/remove a connection. + */ + void Close(std::shared_ptr<ConnectionId> remote_id); + + /** + * Close the Connection Pool + */ + void Close(); + + private: + std::shared_ptr<RpcConnection> GetCachedConnection(std::shared_ptr<ConnectionId> remote_id); + std::shared_ptr<RpcConnection> GetNewConnection(std::shared_ptr<ConnectionId> remote_id); + std::unordered_map<std::shared_ptr<ConnectionId>, std::shared_ptr<RpcConnection>, + ConnectionIdHash, ConnectionIdEquals> + connections_; + folly::SharedMutexWritePriority map_mutex_; + std::shared_ptr<ConnectionFactory> cf_; + std::shared_ptr<Configuration> conf_; +}; + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/connection/pipeline.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/connection/pipeline.h b/hbase-native-client/include/hbase/connection/pipeline.h new file mode 100644 index 0000000..63e9492 --- /dev/null +++ b/hbase-native-client/include/hbase/connection/pipeline.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/io/IOBufQueue.h> +#include <wangle/service/Service.h> + +#include <memory> + +#include "hbase/connection/request.h" +#include "hbase/connection/response.h" +#include "hbase/client/configuration.h" +#include "hbase/serde/codec.h" +#include "hbase/utils/user-util.h" + +namespace hbase { + +/** Pipeline to turn IOBuf into requests */ +using SerializePipeline = wangle::Pipeline<folly::IOBufQueue &, std::unique_ptr<Request>>; + +/** + * Factory to create new pipelines for HBase RPC's. + */ +class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> { + public: + /** + * Constructor. This will create user util. + */ + explicit RpcPipelineFactory(std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf); + + /** + * Create a new pipeline. + * The pipeline will be: + * + * - Async Socke Handler + * - Event Base Handler + * - Length Field Based Frame Decoder + * - Client Handler + */ + SerializePipeline::Ptr newPipeline(std::shared_ptr<folly::AsyncTransportWrapper> sock) override; + + private: + UserUtil user_util_; + std::shared_ptr<Codec> codec_; + std::shared_ptr<Configuration> conf_; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/connection/request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/connection/request.h b/hbase-native-client/include/hbase/connection/request.h new file mode 100644 index 0000000..4b652c0 --- /dev/null +++ b/hbase-native-client/include/hbase/connection/request.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Conv.h> +#include <google/protobuf/message.h> + +#include <cstdint> +#include <memory> +#include <string> + +namespace hbase { + +/** + * Main request class. + * This holds the request object and the un-filled in approriatley typed + * response object. + */ +class Request { + public: + /** Create a request object for a get */ + static std::unique_ptr<Request> get(); + /** Create a request object for a mutate */ + static std::unique_ptr<Request> mutate(); + /** Create a request object for a scan */ + static std::unique_ptr<Request> scan(); + /** Create a request object for a multi */ + static std::unique_ptr<Request> multi(); + + /** + * This should be private. Do not use this. + * + * + * Constructor that's public for make_unique. This sets all the messages and + * method name. + */ + Request(std::shared_ptr<google::protobuf::Message> req, + std::shared_ptr<google::protobuf::Message> resp, std::string method); + + /** Get the call id. */ + uint32_t call_id() { return call_id_; } + /** Set the call id. This should only be set once. */ + void set_call_id(uint32_t call_id) { call_id_ = call_id; } + /** Get the backing request protobuf message. */ + std::shared_ptr<google::protobuf::Message> req_msg() { return req_msg_; } + /** Get the backing response protobuf message. */ + std::shared_ptr<google::protobuf::Message> resp_msg() { return resp_msg_; } + /** Get the method name. This is used to the the receiving rpc server what + * method type to decode. */ + std::string method() { return method_; } + + std::string DebugString() { + return "call_id:" + folly::to<std::string>(call_id_) + ", req_msg:" + + req_msg_->ShortDebugString() + ", method:" + method_; + } + + private: + uint32_t call_id_; + std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr; + std::shared_ptr<google::protobuf::Message> resp_msg_ = nullptr; + std::string method_ = "Get"; +}; +} // namespace hbase