This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 28472f041 [tools] KUDU-1945 Print auto-incrementing counter in kudu
wal dump
28472f041 is described below
commit 28472f041c8eb63f58d1f2075007ef864f8d3f68
Author: Abhishek Chennaka <[email protected]>
AuthorDate: Wed Apr 5 18:06:45 2023 -0700
[tools] KUDU-1945 Print auto-incrementing counter in kudu wal dump
This patch allows the "kudu wal dump" tool to display auto-incrementing
counter value if present in the WAL segment. It displays the counter
value present in each INSERT/INSERT_IGNORE replicate op and also populates
the corresponding auto incrementing column values for each row when
printing rows.
Change-Id: I4e807aaef48683ec7c5317eecdedf8e6e15950e2
Reviewed-on: http://gerrit.cloudera.org:8080/19698
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/common/wire_protocol-test-util.h | 11 +++
src/kudu/tools/kudu-tool-test.cc | 147 ++++++++++++++++++++++++++++++
src/kudu/tools/tool_action_common.cc | 15 ++-
3 files changed, 171 insertions(+), 2 deletions(-)
diff --git a/src/kudu/common/wire_protocol-test-util.h
b/src/kudu/common/wire_protocol-test-util.h
index 2fcf03bf6..f3daf8638 100644
--- a/src/kudu/common/wire_protocol-test-util.h
+++ b/src/kudu/common/wire_protocol-test-util.h
@@ -23,6 +23,7 @@
#include <map>
#include <string>
+#include "kudu/client/schema.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row.h"
#include "kudu/common/row_operations.h"
@@ -36,6 +37,16 @@ inline Schema GetSimpleTestSchema() {
1);
}
+inline client::KuduSchema GetAutoIncrementingTestSchema() {
+ client::KuduSchema kudu_schema;
+ client::KuduSchemaBuilder b;
+
b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
+ b.AddColumn("int_val")->Type(client::KuduColumnSchema::INT32);
+
b.AddColumn("string_val")->Type(client::KuduColumnSchema::STRING)->Nullable();
+ CHECK_OK(b.Build(&kudu_schema));
+ return kudu_schema;
+}
+
inline void RowAppendColumn(KuduPartialRow* row,
const std::map<std::string, std::string>& columns)
{
for (const auto& column : columns) {
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 8cee955c6..c27057007 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2428,6 +2428,8 @@ TEST_F(ToolTest, TestWalDump) {
ASSERT_STR_MATCHES(stdout, "Header:");
ASSERT_STR_MATCHES(stdout, "1\\.1@1");
ASSERT_STR_MATCHES(stdout, "this is a test insert");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
ASSERT_STR_MATCHES(stdout, "Footer:");
@@ -2439,6 +2441,8 @@ TEST_F(ToolTest, TestWalDump) {
SCOPED_TRACE(stdout);
ASSERT_STR_MATCHES(stdout, "Header:");
ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert");
ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
@@ -2450,6 +2454,8 @@ TEST_F(ToolTest, TestWalDump) {
SCOPED_TRACE(stdout);
ASSERT_STR_MATCHES(stdout, "Header:");
ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
ASSERT_STR_MATCHES(stdout, "this is a test insert");
ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
ASSERT_STR_MATCHES(stdout, "row_operations \\{");
@@ -2461,6 +2467,8 @@ TEST_F(ToolTest, TestWalDump) {
SCOPED_TRACE(stdout);
ASSERT_STR_MATCHES(stdout, "Header:");
ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert");
ASSERT_STR_MATCHES(stdout, "t<truncated>");
ASSERT_STR_MATCHES(stdout, "row_operations \\{");
@@ -2472,6 +2480,8 @@ TEST_F(ToolTest, TestWalDump) {
SCOPED_TRACE(stdout);
ASSERT_STR_MATCHES(stdout, "Header:");
ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
ASSERT_STR_NOT_MATCHES(stdout, "this is a test insert");
ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
@@ -2483,6 +2493,8 @@ TEST_F(ToolTest, TestWalDump) {
SCOPED_TRACE(stdout);
ASSERT_STR_NOT_MATCHES(stdout, "Header:");
ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id");
ASSERT_STR_MATCHES(stdout, "this is a test insert");
ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
ASSERT_STR_NOT_MATCHES(stdout, "Footer:");
@@ -2713,6 +2725,141 @@ TEST_F(ToolTest, TestWalDumpWithAlterSchema) {
}
}
+TEST_F(ToolTest, TestWalDumpWithAutoIncrementingColumn) {
+ const string kTestDir = GetTestPath("test");
+ const string kTestTablet = "ffffffffffffffffffffffffffffffff";
+ const client::KuduSchema kSchema(GetAutoIncrementingTestSchema());
+ const Schema schema = client::KuduSchema::ToSchema(kSchema);
+ const Schema schema_with_id =
SchemaBuilder(client::KuduSchema::ToSchema(kSchema)).Build();
+
+ FsManager fs(env_, FsManagerOpts(kTestDir));
+ ASSERT_OK(fs.CreateInitialFileSystemLayout());
+ ASSERT_OK(fs.Open());
+
+ {
+ scoped_refptr<Log> log;
+ ASSERT_OK(Log::Open(LogOptions(),
+ &fs,
+ /*file_cache*/nullptr,
+ kTestTablet,
+ schema_with_id,
+ /*schema_version*/0,
+ /*metric_entity*/nullptr,
+ &log));
+
+ OpId opid = consensus::MakeOpId(1, 1);
+ ReplicateRefPtr replicate =
+ consensus::make_scoped_refptr_replicate(new ReplicateMsg());
+ replicate->get()->set_op_type(consensus::WRITE_OP);
+ replicate->get()->mutable_id()->CopyFrom(opid);
+ replicate->get()->set_timestamp(1);
+ WriteRequestPB* write = replicate->get()->mutable_write_request();
+
write->mutable_auto_incrementing_column()->set_auto_incrementing_counter(0x5a);
+ ASSERT_OK(SchemaToPB(schema, write->mutable_schema()));
+ AddTestRowToPB(RowOperationsPB::INSERT, schema,
+ opid.index(),
+ 0,
+ "this is a test insert",
+ write->mutable_row_operations());
+ AddTestRowToPB(RowOperationsPB::INSERT, schema,
+ opid.index(),
+ 0,
+ "this is a test insert",
+ write->mutable_row_operations());
+ write->set_tablet_id(kTestTablet);
+ Synchronizer s;
+ ASSERT_OK(log->AsyncAppendReplicates({ replicate }, s.AsStatusCallback()));
+ ASSERT_OK(s.Wait());
+ }
+
+ string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1);
+ string encryption_args;
+ if (env_->IsEncryptionEnabled()) {
+ encryption_args = GetEncryptionArgs() + " --instance_file=" +
+ fs.GetInstanceMetadataPath(kTestDir);
+ }
+ string stdout;
+ for (const auto& args : { Substitute("wal dump $0 $1", wal_path,
encryption_args),
+ Substitute("local_replica dump wals
--fs_wal_dir=$0 $1 $2",
+ kTestDir, kTestTablet, encryption_args)
+ }) {
+ SCOPED_TRACE(args);
+ {
+ NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=true",
+ args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_MATCHES(stdout, "Auto Incrementing Counter: 90");
+ ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=91");
+ ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=92");
+ ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=false",
+ args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id=");
+ ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute("$0 --print_entries=pb",
+ args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id=");
+ ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute(
+ "$0 --print_entries=pb --truncate_data=1", args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_NOT_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id=");
+ ASSERT_STR_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute(
+ "$0 --print_entries=id", args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_MATCHES(stdout, "Header:");
+ ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_NOT_MATCHES(stdout, "Auto Incrementing Counter");
+ ASSERT_STR_NOT_MATCHES(stdout, "auto_incrementing_id=");
+ ASSERT_STR_NOT_MATCHES(stdout, "t<truncated>");
+ ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_MATCHES(stdout, "Footer:");
+ }
+ {
+ NO_FATALS(RunActionStdoutString(Substitute(
+ "$0 --print_meta=false", args), &stdout));
+ SCOPED_TRACE(stdout);
+ ASSERT_STR_NOT_MATCHES(stdout, "Header:");
+ ASSERT_STR_MATCHES(stdout, "1\\.1@1");
+ ASSERT_STR_MATCHES(stdout, "Auto Incrementing Counter: 90");
+ ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=91");
+ ASSERT_STR_MATCHES(stdout, "auto_incrementing_id=92");
+ ASSERT_STR_NOT_MATCHES(stdout, "row_operations \\{");
+ ASSERT_STR_NOT_MATCHES(stdout, "Footer:");
+ }
+ }
+}
+
TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff";
constexpr const char* const kTestTableId = "test-table";
diff --git a/src/kudu/tools/tool_action_common.cc
b/src/kudu/tools/tool_action_common.cc
index f54ac1aee..f8413ee7f 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -348,8 +348,15 @@ Status PrintDecodedWriteRequestPB(const string& indent,
Arena arena(32 * 1024);
RowOperationsPBDecoder dec(&write.row_operations(), &request_schema,
&tablet_schema, &arena);
vector<DecodedRowOperation> ops;
- RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops));
-
+ if (write.has_auto_incrementing_column()) {
+ // Define auto_incrementing_counter and use it as in-out parameter during
decoding of the ops
+ // in DecodeOperations().
+ int64_t auto_incrementing_counter =
+ write.auto_incrementing_column().auto_incrementing_counter();
+ RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops,
&auto_incrementing_counter));
+ } else {
+ RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops));
+ }
cout << indent << "Tablet: " << write.tablet_id() << endl;
cout << indent << "RequestId: "
<< (request_id ? SecureShortDebugString(*request_id) : "None") << endl;
@@ -358,6 +365,10 @@ Status PrintDecodedWriteRequestPB(const string& indent,
if (write.has_propagated_timestamp()) {
cout << indent << "Propagated TS: " << write.propagated_timestamp() <<
endl;
}
+ if (write.has_auto_incrementing_column()) {
+ cout << indent << "Auto Incrementing Counter: "
+ << write.auto_incrementing_column().auto_incrementing_counter() <<
endl;
+ }
int i = 0;
for (const DecodedRowOperation& op : ops) {