This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 02ff9789 Add plan executor for KQIR numeric/tag scan operator (#2289)
02ff9789 is described below

commit 02ff978955758adfa689062b9abdd1d4e1c300b8
Author: Twice <[email protected]>
AuthorDate: Sat May 4 21:08:00 2024 +0900

    Add plan executor for KQIR numeric/tag scan operator (#2289)
---
 src/common/encoding.h                              |  15 +++
 src/search/executors/numeric_field_scan_executor.h | 113 +++++++++++++++++++
 src/search/executors/tag_field_scan_executor.h     | 100 +++++++++++++++++
 src/search/plan_executor.cc                        |  14 +++
 src/search/search_encoding.h                       |  15 +--
 tests/cppunit/plan_executor_test.cc                | 119 +++++++++++++++++++++
 6 files changed, 366 insertions(+), 10 deletions(-)

diff --git a/src/common/encoding.h b/src/common/encoding.h
index d4977930..5d05e456 100644
--- a/src/common/encoding.h
+++ b/src/common/encoding.h
@@ -102,6 +102,21 @@ inline bool GetFixed16(rocksdb::Slice *input, uint16_t 
*value) { return GetFixed
 inline bool GetFixed32(rocksdb::Slice *input, uint32_t *value) { return 
GetFixed(input, value); }
 inline bool GetFixed64(rocksdb::Slice *input, uint64_t *value) { return 
GetFixed(input, value); }
 
+inline void PutSizedString(std::string *dst, rocksdb::Slice value) {
+  PutFixed32(dst, value.size());
+  dst->append(value.ToStringView());
+}
+
+inline bool GetSizedString(rocksdb::Slice *input, rocksdb::Slice *value) {
+  uint32_t size = 0;
+  if (!GetFixed32(input, &size)) return false;
+
+  if (input->size() < size) return false;
+  *value = rocksdb::Slice(input->data(), size);
+  input->remove_prefix(size);
+  return true;
+}
+
 char *EncodeDouble(char *buf, double value);
 void PutDouble(std::string *dst, double value);
 double DecodeDouble(const char *ptr);
diff --git a/src/search/executors/numeric_field_scan_executor.h 
b/src/search/executors/numeric_field_scan_executor.h
new file mode 100644
index 00000000..1609f433
--- /dev/null
+++ b/src/search/executors/numeric_field_scan_executor.h
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <string>
+
+#include "db_util.h"
+#include "encoding.h"
+#include "search/plan_executor.h"
+#include "search/search_encoding.h"
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+#include "storage/storage.h"
+
+namespace kqir {
+
+struct NumericFieldScanExecutor : ExecutorNode {
+  NumericFieldScan *scan;
+  redis::LatestSnapShot ss;
+  util::UniqueIterator iter{nullptr};
+
+  IndexInfo *index;
+  std::string ns_key;
+
+  NumericFieldScanExecutor(ExecutorContext *ctx, NumericFieldScan *scan)
+      : ExecutorNode(ctx), scan(scan), ss(ctx->storage), 
index(scan->field->info->index) {
+    ns_key = ComposeNamespaceKey(index->ns, index->name, 
ctx->storage->IsSlotIdEncoded());
+  }
+
+  std::string IndexKey(double num) {
+    return InternalKey(ns_key, 
redis::ConstructNumericFieldSubkey(scan->field->name, num, {}), 
index->metadata.version,
+                       ctx->storage->IsSlotIdEncoded())
+        .Encode();
+  }
+
+  bool InRangeDecode(Slice key, Slice field, double num, double *curr, Slice 
*user_key) {
+    auto ikey = InternalKey(key, ctx->storage->IsSlotIdEncoded());
+    if (ikey.GetVersion() != index->metadata.version) return false;
+    auto subkey = ikey.GetSubKey();
+
+    uint8_t flag = 0;
+    if (!GetFixed8(&subkey, &flag)) return false;
+    if (flag != (uint8_t)redis::SearchSubkeyType::NUMERIC_FIELD) return false;
+
+    Slice value;
+    if (!GetSizedString(&subkey, &value)) return false;
+    if (value != field) return false;
+
+    if (!GetDouble(&subkey, curr)) return false;
+
+    if (!GetSizedString(&subkey, user_key)) return false;
+
+    return true;
+  }
+
+  StatusOr<Result> Next() override {
+    if (!iter) {
+      rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
+      read_options.snapshot = ss.GetSnapShot();
+
+      iter =
+          util::UniqueIterator(ctx->storage, read_options, 
ctx->storage->GetCFHandle(engine::kSearchColumnFamilyName));
+      if (scan->order == SortByClause::ASC) {
+        iter->Seek(IndexKey(scan->range.l));
+      } else {
+        iter->SeekForPrev(IndexKey(IntervalSet::PrevNum(scan->range.r)));
+      }
+    }
+
+    if (!iter->Valid()) {
+      return end;
+    }
+
+    double curr = 0;
+    Slice user_key;
+    if (!InRangeDecode(iter->key(), scan->field->name, scan->range.r, &curr, 
&user_key)) {
+      return end;
+    }
+
+    if (scan->order == SortByClause::ASC ? curr >= scan->range.r : curr < 
scan->range.l) {
+      return end;
+    }
+
+    auto key_str = user_key.ToString();
+
+    if (scan->order == SortByClause::ASC) {
+      iter->Next();
+    } else {
+      iter->Prev();
+    }
+    return RowType{key_str, {{scan->field->info, std::to_string(curr)}}, 
scan->field->info->index};
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/tag_field_scan_executor.h 
b/src/search/executors/tag_field_scan_executor.h
new file mode 100644
index 00000000..a3781c11
--- /dev/null
+++ b/src/search/executors/tag_field_scan_executor.h
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <string>
+
+#include "db_util.h"
+#include "encoding.h"
+#include "search/plan_executor.h"
+#include "search/search_encoding.h"
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+#include "storage/storage.h"
+
+namespace kqir {
+
+struct TagFieldScanExecutor : ExecutorNode {
+  TagFieldScan *scan;
+  redis::LatestSnapShot ss;
+  util::UniqueIterator iter{nullptr};
+
+  IndexInfo *index;
+  std::string ns_key;
+  std::string index_key;
+
+  TagFieldScanExecutor(ExecutorContext *ctx, TagFieldScan *scan)
+      : ExecutorNode(ctx), scan(scan), ss(ctx->storage), 
index(scan->field->info->index) {
+    ns_key = ComposeNamespaceKey(index->ns, index->name, 
ctx->storage->IsSlotIdEncoded());
+    index_key = InternalKey(ns_key, 
redis::ConstructTagFieldSubkey(scan->field->name, scan->tag, {}),
+                            index->metadata.version, 
ctx->storage->IsSlotIdEncoded())
+                    .Encode();
+  }
+
+  bool InRangeDecode(Slice key, Slice field, Slice *user_key) {
+    auto ikey = InternalKey(key, ctx->storage->IsSlotIdEncoded());
+    if (ikey.GetVersion() != index->metadata.version) return false;
+    auto subkey = ikey.GetSubKey();
+
+    uint8_t flag = 0;
+    if (!GetFixed8(&subkey, &flag)) return false;
+    if (flag != (uint8_t)redis::SearchSubkeyType::TAG_FIELD) return false;
+
+    Slice value;
+    if (!GetSizedString(&subkey, &value)) return false;
+    if (value != field) return false;
+
+    Slice tag;
+    if (!GetSizedString(&subkey, &tag)) return false;
+    if (tag != scan->tag) return false;
+
+    if (!GetSizedString(&subkey, user_key)) return false;
+
+    return true;
+  }
+
+  StatusOr<Result> Next() override {
+    if (!iter) {
+      rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
+      read_options.snapshot = ss.GetSnapShot();
+
+      iter =
+          util::UniqueIterator(ctx->storage, read_options, 
ctx->storage->GetCFHandle(engine::kSearchColumnFamilyName));
+      iter->Seek(index_key);
+    }
+
+    if (!iter->Valid()) {
+      return end;
+    }
+
+    Slice user_key;
+    if (!InRangeDecode(iter->key(), scan->field->name, &user_key)) {
+      return end;
+    }
+
+    auto key_str = user_key.ToString();
+
+    iter->Next();
+    return RowType{key_str, {}, scan->field->info->index};
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/plan_executor.cc b/src/search/plan_executor.cc
index 75033f1a..7de9f518 100644
--- a/src/search/plan_executor.cc
+++ b/src/search/plan_executor.cc
@@ -28,8 +28,10 @@
 #include "search/executors/merge_executor.h"
 #include "search/executors/mock_executor.h"
 #include "search/executors/noop_executor.h"
+#include "search/executors/numeric_field_scan_executor.h"
 #include "search/executors/projection_executor.h"
 #include "search/executors/sort_executor.h"
+#include "search/executors/tag_field_scan_executor.h"
 #include "search/executors/topn_sort_executor.h"
 #include "search/indexer.h"
 #include "search/ir_plan.h"
@@ -74,6 +76,14 @@ struct ExecutorContextVisitor {
       return Visit(v);
     }
 
+    if (auto v = dynamic_cast<NumericFieldScan *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<TagFieldScan *>(op)) {
+      return Visit(v);
+    }
+
     if (auto v = dynamic_cast<Mock *>(op)) {
       return Visit(v);
     }
@@ -115,6 +125,10 @@ struct ExecutorContextVisitor {
 
   void Visit(FullIndexScan *op) { ctx->nodes[op] = 
std::make_unique<FullIndexScanExecutor>(ctx, op); }
 
+  void Visit(NumericFieldScan *op) { ctx->nodes[op] = 
std::make_unique<NumericFieldScanExecutor>(ctx, op); }
+
+  void Visit(TagFieldScan *op) { ctx->nodes[op] = 
std::make_unique<TagFieldScanExecutor>(ctx, op); }
+
   void Visit(Mock *op) { ctx->nodes[op] = std::make_unique<MockExecutor>(ctx, 
op); }
 };
 
diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h
index 24731d32..32f244ca 100644
--- a/src/search/search_encoding.h
+++ b/src/search/search_encoding.h
@@ -141,22 +141,17 @@ struct SearchNumericFieldMetadata : 
SearchSortableFieldMetadata {};
 
 inline std::string ConstructTagFieldSubkey(std::string_view field_name, 
std::string_view tag, std::string_view key) {
   std::string res = {(char)SearchSubkeyType::TAG_FIELD};
-  PutFixed32(&res, field_name.size());
-  res.append(field_name);
-  PutFixed32(&res, tag.size());
-  res.append(tag);
-  PutFixed32(&res, key.size());
-  res.append(key);
+  PutSizedString(&res, field_name);
+  PutSizedString(&res, tag);
+  PutSizedString(&res, key);
   return res;
 }
 
 inline std::string ConstructNumericFieldSubkey(std::string_view field_name, 
double number, std::string_view key) {
   std::string res = {(char)SearchSubkeyType::NUMERIC_FIELD};
-  PutFixed32(&res, field_name.size());
-  res.append(field_name);
+  PutSizedString(&res, field_name);
   PutDouble(&res, number);
-  PutFixed32(&res, key.size());
-  res.append(key);
+  PutSizedString(&res, key);
   return res;
 }
 
diff --git a/tests/cppunit/plan_executor_test.cc 
b/tests/cppunit/plan_executor_test.cc
index 59943d19..0b225fc7 100644
--- a/tests/cppunit/plan_executor_test.cc
+++ b/tests/cppunit/plan_executor_test.cc
@@ -25,6 +25,8 @@
 
 #include "config/config.h"
 #include "search/executors/mock_executor.h"
+#include "search/indexer.h"
+#include "search/interval.h"
 #include "search/ir.h"
 #include "search/ir_plan.h"
 #include "test_base.h"
@@ -295,3 +297,120 @@ TEST_F(PlanExecutorTestC, FullIndexScan) {
     ASSERT_EQ(ctx.Next().GetValue(), exe_end);
   }
 }
+
+struct ScopedUpdate {
+  redis::GlobalIndexer::RecordResult rr;
+  std::string_view key;
+  std::string ns;
+
+  static auto Create(redis::GlobalIndexer& indexer, std::string_view key, 
const std::string& ns) {
+    auto s = indexer.Record(key, ns);
+    EXPECT_EQ(s.Msg(), Status::ok_msg);
+    return *s;
+  }
+
+  ScopedUpdate(redis::GlobalIndexer& indexer, std::string_view key, const 
std::string& ns)
+      : rr(Create(indexer, key, ns)), key(key), ns(ns) {}
+
+  ScopedUpdate(const ScopedUpdate&) = delete;
+  ScopedUpdate(ScopedUpdate&&) = delete;
+  ScopedUpdate& operator=(const ScopedUpdate&) = delete;
+  ScopedUpdate& operator=(ScopedUpdate&&) = delete;
+
+  ~ScopedUpdate() {
+    auto s = redis::GlobalIndexer::Update(rr, key, ns);
+    EXPECT_EQ(s.Msg(), Status::ok_msg);
+  }
+};
+
+std::vector<std::unique_ptr<ScopedUpdate>> ScopedUpdates(redis::GlobalIndexer& 
indexer,
+                                                         const 
std::vector<std::string_view>& keys,
+                                                         const std::string& 
ns) {
+  std::vector<std::unique_ptr<ScopedUpdate>> sus;
+
+  sus.reserve(keys.size());
+  for (auto key : keys) {
+    sus.emplace_back(std::make_unique<ScopedUpdate>(indexer, key, ns));
+  }
+
+  return sus;
+}
+
+TEST_F(PlanExecutorTestC, NumericFieldScan) {
+  redis::GlobalIndexer indexer(storage_.get());
+  indexer.Add(redis::IndexUpdater(IndexI()));
+
+  {
+    auto updates = ScopedUpdates(indexer, {"test2:a", "test2:b", "test2:c", 
"test2:d", "test2:e", "test2:f", "test2:g"},
+                                 "search_ns");
+    json_->Set("test2:a", "$", "{\"f2\": 6}");
+    json_->Set("test2:b", "$", "{\"f2\": 3}");
+    json_->Set("test2:c", "$", "{\"f2\": 8}");
+    json_->Set("test2:d", "$", "{\"f2\": 14}");
+    json_->Set("test2:e", "$", "{\"f2\": 1}");
+    json_->Set("test2:f", "$", "{\"f2\": 3}");
+    json_->Set("test2:g", "$", "{\"f2\": 9}");
+  }
+
+  {
+    auto op = 
std::make_unique<NumericFieldScan>(std::make_unique<FieldRef>("f2", 
FieldI("f2")), Interval(3, 9),
+                                                 SortByClause::ASC);
+
+    auto ctx = ExecutorContext(op.get(), storage_.get());
+    ASSERT_EQ(NextRow(ctx).key, "test2:b");
+    ASSERT_EQ(NextRow(ctx).key, "test2:f");
+    ASSERT_EQ(NextRow(ctx).key, "test2:a");
+    ASSERT_EQ(NextRow(ctx).key, "test2:c");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+
+  {
+    auto op = 
std::make_unique<NumericFieldScan>(std::make_unique<FieldRef>("f2", 
FieldI("f2")), Interval(3, 9),
+                                                 SortByClause::DESC);
+
+    auto ctx = ExecutorContext(op.get(), storage_.get());
+    ASSERT_EQ(NextRow(ctx).key, "test2:c");
+    ASSERT_EQ(NextRow(ctx).key, "test2:a");
+    ASSERT_EQ(NextRow(ctx).key, "test2:f");
+    ASSERT_EQ(NextRow(ctx).key, "test2:b");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+}
+
+TEST_F(PlanExecutorTestC, TagFieldScan) {
+  redis::GlobalIndexer indexer(storage_.get());
+  indexer.Add(redis::IndexUpdater(IndexI()));
+
+  {
+    auto updates = ScopedUpdates(indexer, {"test2:a", "test2:b", "test2:c", 
"test2:d", "test2:e", "test2:f", "test2:g"},
+                                 "search_ns");
+    json_->Set("test2:a", "$", "{\"f1\": \"c,cpp,java\"}");
+    json_->Set("test2:b", "$", "{\"f1\": \"python,c\"}");
+    json_->Set("test2:c", "$", "{\"f1\": \"java,scala\"}");
+    json_->Set("test2:d", "$", "{\"f1\": \"rust,python,perl\"}");
+    json_->Set("test2:e", "$", "{\"f1\": \"python,cpp\"}");
+    json_->Set("test2:f", "$", "{\"f1\": \"c,cpp\"}");
+    json_->Set("test2:g", "$", "{\"f1\": \"cpp,rust\"}");
+  }
+
+  {
+    auto op = std::make_unique<TagFieldScan>(std::make_unique<FieldRef>("f1", 
FieldI("f1")), "cpp");
+
+    auto ctx = ExecutorContext(op.get(), storage_.get());
+    ASSERT_EQ(NextRow(ctx).key, "test2:a");
+    ASSERT_EQ(NextRow(ctx).key, "test2:e");
+    ASSERT_EQ(NextRow(ctx).key, "test2:f");
+    ASSERT_EQ(NextRow(ctx).key, "test2:g");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+
+  {
+    auto op = std::make_unique<TagFieldScan>(std::make_unique<FieldRef>("f1", 
FieldI("f1")), "python");
+
+    auto ctx = ExecutorContext(op.get(), storage_.get());
+    ASSERT_EQ(NextRow(ctx).key, "test2:b");
+    ASSERT_EQ(NextRow(ctx).key, "test2:d");
+    ASSERT_EQ(NextRow(ctx).key, "test2:e");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+}
\ No newline at end of file

Reply via email to