This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch HBASE-14850 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit d31153afbafedf8e573d497e6be2c31003ed2780 Author: tedyu <[email protected]> AuthorDate: Tue Jun 6 16:42:45 2017 -0700 HBASE-18173 Append class --- hbase-native-client/core/BUCK | 11 +++ hbase-native-client/core/append-test.cc | 106 ++++++++++++++++++++++++++ hbase-native-client/core/append.cc | 54 +++++++++++++ hbase-native-client/core/append.h | 56 ++++++++++++++ hbase-native-client/core/client-test.cc | 30 ++++++++ hbase-native-client/core/raw-async-table.cc | 14 ++++ hbase-native-client/core/raw-async-table.h | 2 + hbase-native-client/core/request-converter.cc | 12 +++ hbase-native-client/core/request-converter.h | 3 + hbase-native-client/core/table.cc | 5 ++ hbase-native-client/core/table.h | 6 ++ 11 files changed, 299 insertions(+) diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 81fd4a7..47e97f5 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -43,6 +43,7 @@ cxx_library( "put.h", "delete.h", "scan.h", + "append.h", "result.h", "result-scanner.h", "request-converter.h", @@ -82,6 +83,7 @@ cxx_library( "put.cc", "delete.cc", "scan.cc", + "append.cc", "scan-result-cache.cc", "raw-async-table.cc", "result.cc", @@ -192,6 +194,15 @@ cxx_test( ], run_test_separately=True,) cxx_test( + name="append-test", + srcs=[ + "append-test.cc", + ], + deps=[ + ":core", + ], + run_test_separately=True,) +cxx_test( name="retry-test", srcs=[ "async-rpc-retrying-test.cc", diff --git a/hbase-native-client/core/append-test.cc b/hbase-native-client/core/append-test.cc new file mode 100644 index 0000000..619826c --- /dev/null +++ b/hbase-native-client/core/append-test.cc @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "core/mutation.h" +#include "core/append.h" +#include "utils/time-util.h" + +using hbase::Append; +using hbase::Cell; +using hbase::CellType; +using hbase::Mutation; +using hbase::TimeUtil; + +const constexpr int64_t Mutation::kLatestTimestamp; + +TEST(Append, Row) { + Append append{"foo"}; + EXPECT_EQ("foo", append.row()); +} + +TEST(Append, Durability) { + Append append{"row"}; + EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, append.Durability()); + + auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL; + append.SetDurability(skipWal); + EXPECT_EQ(skipWal, append.Durability()); +} + +TEST(Append, Timestamp) { + Append append{"row"}; + + // test default timestamp + EXPECT_EQ(Mutation::kLatestTimestamp, append.TimeStamp()); + + // set custom timestamp + auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos()); + append.SetTimeStamp(ts); + EXPECT_EQ(ts, append.TimeStamp()); + + // Add a column with custom timestamp + append.Add("f", "q", "v"); + auto &cell = append.FamilyMap().at("f")[0]; + EXPECT_EQ(ts, cell->Timestamp()); +} + +TEST(Append, HasFamilies) { + Append append{"row"}; + + EXPECT_EQ(false, append.HasFamilies()); + + append.Add("f", "q", "v"); + EXPECT_EQ(true, append.HasFamilies()); +} + +TEST(Append, Add) { + CellType cell_type = CellType::PUT; + std::string row = "row"; + std::string family = "family"; + std::string column = "column"; + std::string value = "value"; + int64_t timestamp = std::numeric_limits<int64_t>::max(); + auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); + + // add first cell + Append append{"row"}; + append.Add(std::move(cell)); + EXPECT_EQ(1, append.FamilyMap().size()); + EXPECT_EQ(1, append.FamilyMap().at(family).size()); + + // add a non-matching row + auto cell2 = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); + Append append2{"foo"}; + ASSERT_THROW(append2.Add(std::move(cell2)), std::runtime_error); // rows don't match + + // add a second cell with same family + auto cell3 = std::make_unique<Cell>(row, family, "column-2", timestamp, value, cell_type); + append.Add(std::move(cell3)); + EXPECT_EQ(1, append.FamilyMap().size()); + EXPECT_EQ(2, append.FamilyMap().at(family).size()); + + // add a cell to a different family + auto cell4 = std::make_unique<Cell>(row, "family-2", "column-2", timestamp, value, cell_type); + append.Add(std::move(cell4)); + EXPECT_EQ(2, append.FamilyMap().size()); + EXPECT_EQ(1, append.FamilyMap().at("family-2").size()); +} + diff --git a/hbase-native-client/core/append.cc b/hbase-native-client/core/append.cc new file mode 100644 index 0000000..18ee45a --- /dev/null +++ b/hbase-native-client/core/append.cc @@ -0,0 +1,54 @@ + + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/append.h" +#include <folly/Conv.h> +#include <algorithm> +#include <limits> +#include <stdexcept> +#include <utility> + +namespace hbase { + +/** + * @brief Append to the column from the specific family with the specified qualifier + * @param family family name + * @param qualifier column qualifier + * @param value value to append + */ +Append& Append::Add(const std::string& family, const std::string& qualifier, + const std::string& value) { + family_map_[family].push_back(std::move( + std::make_unique<Cell>(row_, family, qualifier, timestamp_, value, + hbase::CellType::PUT))); + return *this; +} +Append& Append::Add(std::unique_ptr<Cell> cell) { + if (cell->Row() != row_) { + throw std::runtime_error("The row in " + cell->DebugString() + + " doesn't match the original one " + row_); + } + + family_map_[cell->Family()].push_back(std::move(cell)); + return *this; +} + +} // namespace hbase diff --git a/hbase-native-client/core/append.h b/hbase-native-client/core/append.h new file mode 100644 index 0000000..cf9ac24 --- /dev/null +++ b/hbase-native-client/core/append.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <vector> +#include "core/cell.h" +#include "core/mutation.h" + +namespace hbase { + +class Append : public Mutation { + public: + /** + * Constructors + */ + explicit Append(const std::string& row) : Mutation(row) {} + Append(const Append& cappend) : Mutation(cappend) {} + Append& operator=(const Append& cappend) { + Mutation::operator=(cappend); + return *this; + } + + ~Append() = default; + + /** + * @brief Add the specified column and value to this Append operation. + * @param family family name + * @param qualifier column qualifier + * @param value value to append + */ + Append& Add(const std::string& family, const std::string& qualifier, const std::string& value); + Append& Add(std::unique_ptr<Cell> cell); +}; + +} // namespace hbase diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index d166f1c..1ee0a83 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -19,6 +19,7 @@ #include <gtest/gtest.h> +#include "core/append.h" #include "core/cell.h" #include "core/client.h" #include "core/configuration.h" @@ -118,6 +119,35 @@ TEST_F(ClientTest, DefaultConfiguration) { client.Close(); } +TEST_F(ClientTest, Append) { + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable("t", "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>("t"); + auto row = "test1"; + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + std::string val1 = "a"; + auto result = table->Append(hbase::Append{row}.Add("d", "1", val1)); + + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ(row, result->Row()); + EXPECT_EQ(val1, *(result->Value("d", "1"))); + + std::string val2 = "b"; + result = table->Append(hbase::Append{row}.Add("d", "1", val2)); + + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ(row, result->Row()); + EXPECT_EQ("ab", *(result->Value("d", "1"))); +} + TEST_F(ClientTest, PutGetDelete) { // Using TestUtil to populate test data std::string tableName = "t1"; diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index 998e2f1..2a98d54 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -124,6 +124,20 @@ folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) { return caller->Call().then([caller](const auto r) { return r; }); } +folly::Future<std::shared_ptr<Result>> RawAsyncTable::Append(const hbase::Append& append) { + auto caller = + CreateCallerBuilder<std::shared_ptr<Result>>(append.row(), connection_conf_->write_rpc_timeout()) + ->action([=, &append](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<std::shared_ptr<Result>> { + return Call<hbase::Append, hbase::Request, hbase::Response, std::shared_ptr<Result>>( + rpc_client, controller, loc, append, &hbase::RequestConverter::AppendToMutateRequest, + &hbase::ResponseConverter::FromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Get( const std::vector<hbase::Get>& gets) { return this->Batch(gets); diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index 8c40dae..6088d1b 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -61,6 +61,8 @@ class RawAsyncTable { folly::Future<folly::Unit> Delete(const hbase::Delete& del); + folly::Future<std::shared_ptr<hbase::Result>> Append(const hbase::Append& append); + folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment); folly::Future<folly::Unit> Put(const hbase::Put& put); diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 6eb2f04..54fdfc5 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -301,4 +301,16 @@ std::unique_ptr<Request> RequestConverter::IncrementToMutateRequest(const Increm return pb_req; } +std::unique_ptr<Request> RequestConverter::AppendToMutateRequest(const Append &append, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_APPEND, append, -1).release()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} } /* namespace hbase */ diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index c807f45..a9d65d6 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -25,6 +25,7 @@ #include "connection/request.h" #include "core/action.h" #include "core/cell.h" +#include "core/append.h" #include "core/delete.h" #include "core/get.h" #include "core/increment.h" @@ -90,6 +91,8 @@ class RequestConverter { const Mutation &mutation, const int64_t nonce); + static std::unique_ptr<Request> AppendToMutateRequest(const Append &append, + const std::string ®ion_name); private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index 9cdd8b0..bf22169 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -83,6 +83,11 @@ std::shared_ptr<hbase::Result> Table::Increment(const hbase::Increment &incremen return context.get(operation_timeout()); } +std::shared_ptr<hbase::Result> Table::Append(const hbase::Append &append) { + auto context = async_table_->Append(append); + return context.get(operation_timeout()); +} + milliseconds Table::operation_timeout() const { return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout()); } diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 1f6d9b7..781c6f1 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -73,6 +73,12 @@ class Table { * @param - increment Increment object to perform HBase Increment operation. */ std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment); + + /** + * @brief - Appends some data in the table. + * @param - append Append object to perform HBase Append operation. + */ + std::shared_ptr<hbase::Result> Append(const hbase::Append &append); // TODO: Batch Puts std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan);
