This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 02ff9789 Add plan executor for KQIR numeric/tag scan operator (#2289)
02ff9789 is described below
commit 02ff978955758adfa689062b9abdd1d4e1c300b8
Author: Twice <[email protected]>
AuthorDate: Sat May 4 21:08:00 2024 +0900
Add plan executor for KQIR numeric/tag scan operator (#2289)
---
src/common/encoding.h | 15 +++
src/search/executors/numeric_field_scan_executor.h | 113 +++++++++++++++++++
src/search/executors/tag_field_scan_executor.h | 100 +++++++++++++++++
src/search/plan_executor.cc | 14 +++
src/search/search_encoding.h | 15 +--
tests/cppunit/plan_executor_test.cc | 119 +++++++++++++++++++++
6 files changed, 366 insertions(+), 10 deletions(-)
diff --git a/src/common/encoding.h b/src/common/encoding.h
index d4977930..5d05e456 100644
--- a/src/common/encoding.h
+++ b/src/common/encoding.h
@@ -102,6 +102,21 @@ inline bool GetFixed16(rocksdb::Slice *input, uint16_t
*value) { return GetFixed
inline bool GetFixed32(rocksdb::Slice *input, uint32_t *value) { return
GetFixed(input, value); }
inline bool GetFixed64(rocksdb::Slice *input, uint64_t *value) { return
GetFixed(input, value); }
+inline void PutSizedString(std::string *dst, rocksdb::Slice value) {
+ PutFixed32(dst, value.size());
+ dst->append(value.ToStringView());
+}
+
+inline bool GetSizedString(rocksdb::Slice *input, rocksdb::Slice *value) {
+ uint32_t size = 0;
+ if (!GetFixed32(input, &size)) return false;
+
+ if (input->size() < size) return false;
+ *value = rocksdb::Slice(input->data(), size);
+ input->remove_prefix(size);
+ return true;
+}
+
char *EncodeDouble(char *buf, double value);
void PutDouble(std::string *dst, double value);
double DecodeDouble(const char *ptr);
diff --git a/src/search/executors/numeric_field_scan_executor.h
b/src/search/executors/numeric_field_scan_executor.h
new file mode 100644
index 00000000..1609f433
--- /dev/null
+++ b/src/search/executors/numeric_field_scan_executor.h
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <string>
+
+#include "db_util.h"
+#include "encoding.h"
+#include "search/plan_executor.h"
+#include "search/search_encoding.h"
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+#include "storage/storage.h"
+
+namespace kqir {
+
+struct NumericFieldScanExecutor : ExecutorNode {
+ NumericFieldScan *scan;
+ redis::LatestSnapShot ss;
+ util::UniqueIterator iter{nullptr};
+
+ IndexInfo *index;
+ std::string ns_key;
+
+ NumericFieldScanExecutor(ExecutorContext *ctx, NumericFieldScan *scan)
+ : ExecutorNode(ctx), scan(scan), ss(ctx->storage),
index(scan->field->info->index) {
+ ns_key = ComposeNamespaceKey(index->ns, index->name,
ctx->storage->IsSlotIdEncoded());
+ }
+
+ std::string IndexKey(double num) {
+ return InternalKey(ns_key,
redis::ConstructNumericFieldSubkey(scan->field->name, num, {}),
index->metadata.version,
+ ctx->storage->IsSlotIdEncoded())
+ .Encode();
+ }
+
+ bool InRangeDecode(Slice key, Slice field, double num, double *curr, Slice
*user_key) {
+ auto ikey = InternalKey(key, ctx->storage->IsSlotIdEncoded());
+ if (ikey.GetVersion() != index->metadata.version) return false;
+ auto subkey = ikey.GetSubKey();
+
+ uint8_t flag = 0;
+ if (!GetFixed8(&subkey, &flag)) return false;
+ if (flag != (uint8_t)redis::SearchSubkeyType::NUMERIC_FIELD) return false;
+
+ Slice value;
+ if (!GetSizedString(&subkey, &value)) return false;
+ if (value != field) return false;
+
+ if (!GetDouble(&subkey, curr)) return false;
+
+ if (!GetSizedString(&subkey, user_key)) return false;
+
+ return true;
+ }
+
+ StatusOr<Result> Next() override {
+ if (!iter) {
+ rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
+ read_options.snapshot = ss.GetSnapShot();
+
+ iter =
+ util::UniqueIterator(ctx->storage, read_options,
ctx->storage->GetCFHandle(engine::kSearchColumnFamilyName));
+ if (scan->order == SortByClause::ASC) {
+ iter->Seek(IndexKey(scan->range.l));
+ } else {
+ iter->SeekForPrev(IndexKey(IntervalSet::PrevNum(scan->range.r)));
+ }
+ }
+
+ if (!iter->Valid()) {
+ return end;
+ }
+
+ double curr = 0;
+ Slice user_key;
+ if (!InRangeDecode(iter->key(), scan->field->name, scan->range.r, &curr,
&user_key)) {
+ return end;
+ }
+
+ if (scan->order == SortByClause::ASC ? curr >= scan->range.r : curr <
scan->range.l) {
+ return end;
+ }
+
+ auto key_str = user_key.ToString();
+
+ if (scan->order == SortByClause::ASC) {
+ iter->Next();
+ } else {
+ iter->Prev();
+ }
+ return RowType{key_str, {{scan->field->info, std::to_string(curr)}},
scan->field->info->index};
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/tag_field_scan_executor.h
b/src/search/executors/tag_field_scan_executor.h
new file mode 100644
index 00000000..a3781c11
--- /dev/null
+++ b/src/search/executors/tag_field_scan_executor.h
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <string>
+
+#include "db_util.h"
+#include "encoding.h"
+#include "search/plan_executor.h"
+#include "search/search_encoding.h"
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+#include "storage/storage.h"
+
+namespace kqir {
+
+struct TagFieldScanExecutor : ExecutorNode {
+ TagFieldScan *scan;
+ redis::LatestSnapShot ss;
+ util::UniqueIterator iter{nullptr};
+
+ IndexInfo *index;
+ std::string ns_key;
+ std::string index_key;
+
+ TagFieldScanExecutor(ExecutorContext *ctx, TagFieldScan *scan)
+ : ExecutorNode(ctx), scan(scan), ss(ctx->storage),
index(scan->field->info->index) {
+ ns_key = ComposeNamespaceKey(index->ns, index->name,
ctx->storage->IsSlotIdEncoded());
+ index_key = InternalKey(ns_key,
redis::ConstructTagFieldSubkey(scan->field->name, scan->tag, {}),
+ index->metadata.version,
ctx->storage->IsSlotIdEncoded())
+ .Encode();
+ }
+
+ bool InRangeDecode(Slice key, Slice field, Slice *user_key) {
+ auto ikey = InternalKey(key, ctx->storage->IsSlotIdEncoded());
+ if (ikey.GetVersion() != index->metadata.version) return false;
+ auto subkey = ikey.GetSubKey();
+
+ uint8_t flag = 0;
+ if (!GetFixed8(&subkey, &flag)) return false;
+ if (flag != (uint8_t)redis::SearchSubkeyType::TAG_FIELD) return false;
+
+ Slice value;
+ if (!GetSizedString(&subkey, &value)) return false;
+ if (value != field) return false;
+
+ Slice tag;
+ if (!GetSizedString(&subkey, &tag)) return false;
+ if (tag != scan->tag) return false;
+
+ if (!GetSizedString(&subkey, user_key)) return false;
+
+ return true;
+ }
+
+ StatusOr<Result> Next() override {
+ if (!iter) {
+ rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
+ read_options.snapshot = ss.GetSnapShot();
+
+ iter =
+ util::UniqueIterator(ctx->storage, read_options,
ctx->storage->GetCFHandle(engine::kSearchColumnFamilyName));
+ iter->Seek(index_key);
+ }
+
+ if (!iter->Valid()) {
+ return end;
+ }
+
+ Slice user_key;
+ if (!InRangeDecode(iter->key(), scan->field->name, &user_key)) {
+ return end;
+ }
+
+ auto key_str = user_key.ToString();
+
+ iter->Next();
+ return RowType{key_str, {}, scan->field->info->index};
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/plan_executor.cc b/src/search/plan_executor.cc
index 75033f1a..7de9f518 100644
--- a/src/search/plan_executor.cc
+++ b/src/search/plan_executor.cc
@@ -28,8 +28,10 @@
#include "search/executors/merge_executor.h"
#include "search/executors/mock_executor.h"
#include "search/executors/noop_executor.h"
+#include "search/executors/numeric_field_scan_executor.h"
#include "search/executors/projection_executor.h"
#include "search/executors/sort_executor.h"
+#include "search/executors/tag_field_scan_executor.h"
#include "search/executors/topn_sort_executor.h"
#include "search/indexer.h"
#include "search/ir_plan.h"
@@ -74,6 +76,14 @@ struct ExecutorContextVisitor {
return Visit(v);
}
+ if (auto v = dynamic_cast<NumericFieldScan *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<TagFieldScan *>(op)) {
+ return Visit(v);
+ }
+
if (auto v = dynamic_cast<Mock *>(op)) {
return Visit(v);
}
@@ -115,6 +125,10 @@ struct ExecutorContextVisitor {
void Visit(FullIndexScan *op) { ctx->nodes[op] =
std::make_unique<FullIndexScanExecutor>(ctx, op); }
+ void Visit(NumericFieldScan *op) { ctx->nodes[op] =
std::make_unique<NumericFieldScanExecutor>(ctx, op); }
+
+ void Visit(TagFieldScan *op) { ctx->nodes[op] =
std::make_unique<TagFieldScanExecutor>(ctx, op); }
+
void Visit(Mock *op) { ctx->nodes[op] = std::make_unique<MockExecutor>(ctx,
op); }
};
diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h
index 24731d32..32f244ca 100644
--- a/src/search/search_encoding.h
+++ b/src/search/search_encoding.h
@@ -141,22 +141,17 @@ struct SearchNumericFieldMetadata :
SearchSortableFieldMetadata {};
inline std::string ConstructTagFieldSubkey(std::string_view field_name,
std::string_view tag, std::string_view key) {
std::string res = {(char)SearchSubkeyType::TAG_FIELD};
- PutFixed32(&res, field_name.size());
- res.append(field_name);
- PutFixed32(&res, tag.size());
- res.append(tag);
- PutFixed32(&res, key.size());
- res.append(key);
+ PutSizedString(&res, field_name);
+ PutSizedString(&res, tag);
+ PutSizedString(&res, key);
return res;
}
inline std::string ConstructNumericFieldSubkey(std::string_view field_name,
double number, std::string_view key) {
std::string res = {(char)SearchSubkeyType::NUMERIC_FIELD};
- PutFixed32(&res, field_name.size());
- res.append(field_name);
+ PutSizedString(&res, field_name);
PutDouble(&res, number);
- PutFixed32(&res, key.size());
- res.append(key);
+ PutSizedString(&res, key);
return res;
}
diff --git a/tests/cppunit/plan_executor_test.cc
b/tests/cppunit/plan_executor_test.cc
index 59943d19..0b225fc7 100644
--- a/tests/cppunit/plan_executor_test.cc
+++ b/tests/cppunit/plan_executor_test.cc
@@ -25,6 +25,8 @@
#include "config/config.h"
#include "search/executors/mock_executor.h"
+#include "search/indexer.h"
+#include "search/interval.h"
#include "search/ir.h"
#include "search/ir_plan.h"
#include "test_base.h"
@@ -295,3 +297,120 @@ TEST_F(PlanExecutorTestC, FullIndexScan) {
ASSERT_EQ(ctx.Next().GetValue(), exe_end);
}
}
+
+struct ScopedUpdate {
+ redis::GlobalIndexer::RecordResult rr;
+ std::string_view key;
+ std::string ns;
+
+ static auto Create(redis::GlobalIndexer& indexer, std::string_view key,
const std::string& ns) {
+ auto s = indexer.Record(key, ns);
+ EXPECT_EQ(s.Msg(), Status::ok_msg);
+ return *s;
+ }
+
+ ScopedUpdate(redis::GlobalIndexer& indexer, std::string_view key, const
std::string& ns)
+ : rr(Create(indexer, key, ns)), key(key), ns(ns) {}
+
+ ScopedUpdate(const ScopedUpdate&) = delete;
+ ScopedUpdate(ScopedUpdate&&) = delete;
+ ScopedUpdate& operator=(const ScopedUpdate&) = delete;
+ ScopedUpdate& operator=(ScopedUpdate&&) = delete;
+
+ ~ScopedUpdate() {
+ auto s = redis::GlobalIndexer::Update(rr, key, ns);
+ EXPECT_EQ(s.Msg(), Status::ok_msg);
+ }
+};
+
+std::vector<std::unique_ptr<ScopedUpdate>> ScopedUpdates(redis::GlobalIndexer&
indexer,
+ const
std::vector<std::string_view>& keys,
+ const std::string&
ns) {
+ std::vector<std::unique_ptr<ScopedUpdate>> sus;
+
+ sus.reserve(keys.size());
+ for (auto key : keys) {
+ sus.emplace_back(std::make_unique<ScopedUpdate>(indexer, key, ns));
+ }
+
+ return sus;
+}
+
+TEST_F(PlanExecutorTestC, NumericFieldScan) {
+ redis::GlobalIndexer indexer(storage_.get());
+ indexer.Add(redis::IndexUpdater(IndexI()));
+
+ {
+ auto updates = ScopedUpdates(indexer, {"test2:a", "test2:b", "test2:c",
"test2:d", "test2:e", "test2:f", "test2:g"},
+ "search_ns");
+ json_->Set("test2:a", "$", "{\"f2\": 6}");
+ json_->Set("test2:b", "$", "{\"f2\": 3}");
+ json_->Set("test2:c", "$", "{\"f2\": 8}");
+ json_->Set("test2:d", "$", "{\"f2\": 14}");
+ json_->Set("test2:e", "$", "{\"f2\": 1}");
+ json_->Set("test2:f", "$", "{\"f2\": 3}");
+ json_->Set("test2:g", "$", "{\"f2\": 9}");
+ }
+
+ {
+ auto op =
std::make_unique<NumericFieldScan>(std::make_unique<FieldRef>("f2",
FieldI("f2")), Interval(3, 9),
+ SortByClause::ASC);
+
+ auto ctx = ExecutorContext(op.get(), storage_.get());
+ ASSERT_EQ(NextRow(ctx).key, "test2:b");
+ ASSERT_EQ(NextRow(ctx).key, "test2:f");
+ ASSERT_EQ(NextRow(ctx).key, "test2:a");
+ ASSERT_EQ(NextRow(ctx).key, "test2:c");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+
+ {
+ auto op =
std::make_unique<NumericFieldScan>(std::make_unique<FieldRef>("f2",
FieldI("f2")), Interval(3, 9),
+ SortByClause::DESC);
+
+ auto ctx = ExecutorContext(op.get(), storage_.get());
+ ASSERT_EQ(NextRow(ctx).key, "test2:c");
+ ASSERT_EQ(NextRow(ctx).key, "test2:a");
+ ASSERT_EQ(NextRow(ctx).key, "test2:f");
+ ASSERT_EQ(NextRow(ctx).key, "test2:b");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+}
+
+TEST_F(PlanExecutorTestC, TagFieldScan) {
+ redis::GlobalIndexer indexer(storage_.get());
+ indexer.Add(redis::IndexUpdater(IndexI()));
+
+ {
+ auto updates = ScopedUpdates(indexer, {"test2:a", "test2:b", "test2:c",
"test2:d", "test2:e", "test2:f", "test2:g"},
+ "search_ns");
+ json_->Set("test2:a", "$", "{\"f1\": \"c,cpp,java\"}");
+ json_->Set("test2:b", "$", "{\"f1\": \"python,c\"}");
+ json_->Set("test2:c", "$", "{\"f1\": \"java,scala\"}");
+ json_->Set("test2:d", "$", "{\"f1\": \"rust,python,perl\"}");
+ json_->Set("test2:e", "$", "{\"f1\": \"python,cpp\"}");
+ json_->Set("test2:f", "$", "{\"f1\": \"c,cpp\"}");
+ json_->Set("test2:g", "$", "{\"f1\": \"cpp,rust\"}");
+ }
+
+ {
+ auto op = std::make_unique<TagFieldScan>(std::make_unique<FieldRef>("f1",
FieldI("f1")), "cpp");
+
+ auto ctx = ExecutorContext(op.get(), storage_.get());
+ ASSERT_EQ(NextRow(ctx).key, "test2:a");
+ ASSERT_EQ(NextRow(ctx).key, "test2:e");
+ ASSERT_EQ(NextRow(ctx).key, "test2:f");
+ ASSERT_EQ(NextRow(ctx).key, "test2:g");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+
+ {
+ auto op = std::make_unique<TagFieldScan>(std::make_unique<FieldRef>("f1",
FieldI("f1")), "python");
+
+ auto ctx = ExecutorContext(op.get(), storage_.get());
+ ASSERT_EQ(NextRow(ctx).key, "test2:b");
+ ASSERT_EQ(NextRow(ctx).key, "test2:d");
+ ASSERT_EQ(NextRow(ctx).key, "test2:e");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+}
\ No newline at end of file