This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new dfffffef Add plan executors for KQIR via iterator model (#2281)
dfffffef is described below

commit dfffffefffd1bd693eb5ae9078a46debabddb6de
Author: Twice <[email protected]>
AuthorDate: Sat May 4 11:52:31 2024 +0900

    Add plan executors for KQIR via iterator model (#2281)
---
 src/search/executors/filter_executor.h          | 129 ++++++++++
 src/search/executors/full_index_scan_executor.h |  76 ++++++
 src/search/executors/limit_executor.h           |  57 +++++
 src/search/executors/merge_executor.h           |  54 +++++
 src/search/executors/mock_executor.h            |  57 +++++
 src/search/executors/noop_executor.h            |  35 +++
 src/search/executors/projection_executor.h      |  59 +++++
 src/search/executors/sort_executor.h            |  39 ++++
 src/search/executors/topn_sort_executor.h       | 104 +++++++++
 src/search/index_info.h                         |   3 +-
 src/search/indexer.cc                           |   4 +-
 src/search/ir.h                                 |   9 +
 src/search/ir_sema_checker.h                    |   2 +-
 src/search/plan_executor.cc                     | 149 ++++++++++++
 src/search/plan_executor.h                      | 101 ++++++++
 src/search/search_encoding.h                    |  10 +
 tests/cppunit/indexer_test.cc                   |  20 +-
 tests/cppunit/ir_dot_dumper_test.cc             |  16 +-
 tests/cppunit/ir_pass_test.cc                   |  14 +-
 tests/cppunit/ir_sema_checker_test.cc           |  10 +-
 tests/cppunit/plan_executor_test.cc             | 297 ++++++++++++++++++++++++
 21 files changed, 1211 insertions(+), 34 deletions(-)

diff --git a/src/search/executors/filter_executor.h 
b/src/search/executors/filter_executor.h
new file mode 100644
index 00000000..83d45377
--- /dev/null
+++ b/src/search/executors/filter_executor.h
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <variant>
+
+#include "parse_util.h"
+#include "search/ir.h"
+#include "search/plan_executor.h"
+#include "search/search_encoding.h"
+#include "string_util.h"
+
+namespace kqir {
+
+struct QueryExprEvaluator {
+  ExecutorContext *ctx;
+  ExecutorNode::RowType &row;
+
+  StatusOr<bool> Transform(QueryExpr *e) const {
+    if (auto v = dynamic_cast<AndExpr *>(e)) {
+      return Visit(v);
+    }
+    if (auto v = dynamic_cast<OrExpr *>(e)) {
+      return Visit(v);
+    }
+    if (auto v = dynamic_cast<NotExpr *>(e)) {
+      return Visit(v);
+    }
+    if (auto v = dynamic_cast<NumericCompareExpr *>(e)) {
+      return Visit(v);
+    }
+    if (auto v = dynamic_cast<TagContainExpr *>(e)) {
+      return Visit(v);
+    }
+
+    CHECK(false) << "unreachable";
+  }
+
+  StatusOr<bool> Visit(AndExpr *v) const {
+    for (const auto &n : v->inners) {
+      if (!GET_OR_RET(Transform(n.get()))) return false;
+    }
+
+    return true;
+  }
+
+  StatusOr<bool> Visit(OrExpr *v) const {
+    for (const auto &n : v->inners) {
+      if (GET_OR_RET(Transform(n.get()))) return true;
+    }
+
+    return false;
+  }
+
+  StatusOr<bool> Visit(NotExpr *v) const { return 
!GET_OR_RET(Transform(v->inner.get())); }
+
+  StatusOr<bool> Visit(TagContainExpr *v) const {
+    auto val = GET_OR_RET(ctx->Retrieve(row, v->field->info));
+    auto meta = v->field->info->MetadataAs<redis::SearchTagFieldMetadata>();
+
+    auto split = util::Split(val, std::string(1, meta->separator));
+    return std::find(split.begin(), split.end(), v->tag->val) != split.end();
+  }
+
+  StatusOr<bool> Visit(NumericCompareExpr *v) const {
+    auto l_str = GET_OR_RET(ctx->Retrieve(row, v->field->info));
+
+    // TODO: reconsider how to handle failure case here
+    auto l = GET_OR_RET(ParseFloat(l_str));
+    auto r = v->num->val;
+
+    switch (v->op) {
+      case NumericCompareExpr::EQ:
+        return l == r;
+      case NumericCompareExpr::NE:
+        return l != r;
+      case NumericCompareExpr::LT:
+        return l < r;
+      case NumericCompareExpr::LET:
+        return l <= r;
+      case NumericCompareExpr::GT:
+        return l > r;
+      case NumericCompareExpr::GET:
+        return l >= r;
+    }
+  }
+};
+
+struct FilterExecutor : ExecutorNode {
+  Filter *filter;
+
+  FilterExecutor(ExecutorContext *ctx, Filter *filter) : ExecutorNode(ctx), 
filter(filter) {}
+
+  StatusOr<Result> Next() override {
+    while (true) {
+      auto v = GET_OR_RET(ctx->Get(filter->source)->Next());
+
+      if (std::holds_alternative<End>(v)) return end;
+
+      QueryExprEvaluator eval{ctx, std::get<RowType>(v)};
+
+      bool res = GET_OR_RET(eval.Transform(filter->filter_expr.get()));
+
+      if (res) {
+        return v;
+      }
+    }
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/full_index_scan_executor.h 
b/src/search/executors/full_index_scan_executor.h
new file mode 100644
index 00000000..0afeae04
--- /dev/null
+++ b/src/search/executors/full_index_scan_executor.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "db_util.h"
+#include "search/plan_executor.h"
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+#include "storage/storage.h"
+
+namespace kqir {
+
+struct FullIndexScanExecutor : ExecutorNode {
+  FullIndexScan *scan;
+  redis::LatestSnapShot ss;
+  util::UniqueIterator iter{nullptr};
+  const std::string *prefix_iter;
+
+  FullIndexScanExecutor(ExecutorContext *ctx, FullIndexScan *scan)
+      : ExecutorNode(ctx), scan(scan), ss(ctx->storage), 
prefix_iter(scan->index->info->prefixes.begin()) {}
+
+  std::string NSKey(const std::string &user_key) {
+    return ComposeNamespaceKey(scan->index->info->ns, user_key, 
ctx->storage->IsSlotIdEncoded());
+  }
+
+  StatusOr<Result> Next() override {
+    if (prefix_iter == scan->index->info->prefixes.end()) {
+      return end;
+    }
+
+    auto ns_key = NSKey(*prefix_iter);
+    if (!iter) {
+      rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
+      read_options.snapshot = ss.GetSnapShot();
+      iter = util::UniqueIterator(ctx->storage, read_options,
+                                  
ctx->storage->GetCFHandle(engine::kMetadataColumnFamilyName));
+      iter->Seek(ns_key);
+    }
+
+    while (!iter->Valid() || !iter->key().starts_with(ns_key)) {
+      prefix_iter++;
+      if (prefix_iter == scan->index->info->prefixes.end()) {
+        return end;
+      }
+
+      ns_key = NSKey(*prefix_iter);
+      iter->Seek(ns_key);
+    }
+
+    auto [_, key] = ExtractNamespaceKey(iter->key(), 
ctx->storage->IsSlotIdEncoded());
+    auto key_str = key.ToString();
+
+    iter->Next();
+    return RowType{key_str, {}, scan->index->info};
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/limit_executor.h 
b/src/search/executors/limit_executor.h
new file mode 100644
index 00000000..8b1d4916
--- /dev/null
+++ b/src/search/executors/limit_executor.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct LimitExecutor : ExecutorNode {
+  Limit *limit;
+  size_t step = 0;
+
+  LimitExecutor(ExecutorContext *ctx, Limit *limit) : ExecutorNode(ctx), 
limit(limit) {}
+
+  StatusOr<Result> Next() override {
+    auto offset = limit->limit->offset;
+    auto count = limit->limit->count;
+
+    if (step == count) {
+      return end;
+    }
+
+    if (step == 0) {
+      while (offset--) {
+        auto res = GET_OR_RET(ctx->Get(limit->op)->Next());
+
+        if (std::holds_alternative<End>(res)) {
+          return end;
+        }
+      }
+    }
+
+    auto res = GET_OR_RET(ctx->Get(limit->op)->Next());
+    step++;
+    return res;
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/merge_executor.h 
b/src/search/executors/merge_executor.h
new file mode 100644
index 00000000..66b7bb85
--- /dev/null
+++ b/src/search/executors/merge_executor.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <variant>
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct MergeExecutor : ExecutorNode {
+  Merge *merge;
+  decltype(merge->ops)::iterator iter;
+
+  MergeExecutor(ExecutorContext *ctx, Merge *merge) : ExecutorNode(ctx), 
merge(merge), iter(merge->ops.begin()) {}
+
+  StatusOr<Result> Next() override {
+    if (iter == merge->ops.end()) {
+      return end;
+    }
+
+    auto v = GET_OR_RET(ctx->Get(*iter)->Next());
+    while (std::holds_alternative<End>(v)) {
+      iter++;
+      if (iter == merge->ops.end()) {
+        return end;
+      }
+
+      v = GET_OR_RET(ctx->Get(*iter)->Next());
+    }
+
+    return v;
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/mock_executor.h 
b/src/search/executors/mock_executor.h
new file mode 100644
index 00000000..f9cdf57d
--- /dev/null
+++ b/src/search/executors/mock_executor.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "search/ir_plan.h"
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+// this operator is only for executor-testing/debugging purpose
+struct Mock : PlanOperator {
+  std::vector<ExecutorNode::RowType> rows;
+
+  explicit Mock(std::vector<ExecutorNode::RowType> rows) : 
rows(std::move(rows)) {}
+
+  std::string Dump() const override { return "mock"; }
+  std::string_view Name() const override { return "Mock"; }
+
+  std::unique_ptr<Node> Clone() const override { return 
std::make_unique<Mock>(rows); }
+};
+
+struct MockExecutor : ExecutorNode {
+  Mock *mock;
+  decltype(mock->rows)::iterator iter;
+
+  MockExecutor(ExecutorContext *ctx, Mock *mock) : ExecutorNode(ctx), 
mock(mock), iter(mock->rows.begin()) {}
+
+  StatusOr<Result> Next() override {
+    if (iter == mock->rows.end()) {
+      return end;
+    }
+
+    return *(iter++);
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/noop_executor.h 
b/src/search/executors/noop_executor.h
new file mode 100644
index 00000000..1e3685ca
--- /dev/null
+++ b/src/search/executors/noop_executor.h
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct NoopExecutor : ExecutorNode {
+  Noop *noop;
+
+  NoopExecutor(ExecutorContext *ctx, Noop *noop) : ExecutorNode(ctx), 
noop(noop) {}
+
+  StatusOr<Result> Next() override { return end; }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/projection_executor.h 
b/src/search/executors/projection_executor.h
new file mode 100644
index 00000000..fe167334
--- /dev/null
+++ b/src/search/executors/projection_executor.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <variant>
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct ProjectionExecutor : ExecutorNode {
+  Projection *proj;
+
+  ProjectionExecutor(ExecutorContext *ctx, Projection *proj) : 
ExecutorNode(ctx), proj(proj) {}
+
+  StatusOr<Result> Next() override {
+    auto v = GET_OR_RET(ctx->Get(proj->source)->Next());
+
+    if (std::holds_alternative<End>(v)) return end;
+
+    auto &row = std::get<RowType>(v);
+    if (proj->select->fields.empty()) {
+      for (const auto &field : row.index->fields) {
+        GET_OR_RET(ctx->Retrieve(row, &field.second));
+      }
+    } else {
+      std::map<const FieldInfo *, ValueType> res;
+
+      for (const auto &field : proj->select->fields) {
+        auto r = GET_OR_RET(ctx->Retrieve(row, field->info));
+        res.emplace(field->info, std::move(r));
+      }
+
+      return RowType{row.key, res, row.index};
+    }
+
+    return v;
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/sort_executor.h 
b/src/search/executors/sort_executor.h
new file mode 100644
index 00000000..ed4b205d
--- /dev/null
+++ b/src/search/executors/sort_executor.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct SortExecutor : ExecutorNode {
+  Sort *sort;
+
+  SortExecutor(ExecutorContext *ctx, Sort *sort) : ExecutorNode(ctx), 
sort(sort) {}
+
+  StatusOr<Result> Next() override {
+    // most of the sort operator will be eliminated via the optimizer passes,
+    // so currently we don't support this operator since external sort is a 
little complicated
+    return {Status::NotSupported, "sort operator is currently not supported"};
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/executors/topn_sort_executor.h 
b/src/search/executors/topn_sort_executor.h
new file mode 100644
index 00000000..741a9689
--- /dev/null
+++ b/src/search/executors/topn_sort_executor.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <variant>
+
+#include "parse_util.h"
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct TopNSortExecutor : ExecutorNode {
+  TopNSort *sort;
+
+  struct ComparedRow {
+    RowType row;
+    double val;
+
+    ComparedRow(RowType row, double val) : row(std::move(row)), val(val) {}
+
+    friend bool operator<(const ComparedRow &l, const ComparedRow &r) { return 
l.val < r.val; }
+  };
+
+  std::vector<ComparedRow> rows;
+  decltype(rows)::iterator rows_iter;
+  bool initialized = false;
+
+  TopNSortExecutor(ExecutorContext *ctx, TopNSort *sort) : ExecutorNode(ctx), 
sort(sort) {}
+
+  StatusOr<Result> Next() override {
+    if (!initialized) {
+      auto total = sort->limit->offset + sort->limit->count;
+      if (total == 0) return end;
+
+      auto v = GET_OR_RET(ctx->Get(sort->op)->Next());
+
+      while (!std::holds_alternative<End>(v)) {
+        auto &row = std::get<RowType>(v);
+
+        auto get_order = [this](RowType &row) -> StatusOr<double> {
+          auto order_str = GET_OR_RET(ctx->Retrieve(row, 
sort->order->field->info));
+          auto order = GET_OR_RET(ParseFloat(order_str));
+          return order;
+        };
+
+        if (rows.size() == total) {
+          std::make_heap(rows.begin(), rows.end());
+        }
+
+        if (rows.size() < total) {
+          auto order = GET_OR_RET(get_order(row));
+          rows.emplace_back(row, order);
+        } else {
+          auto order = GET_OR_RET(get_order(row));
+
+          if (order < rows[0].val) {
+            std::pop_heap(rows.begin(), rows.end());
+            rows.back() = ComparedRow{row, order};
+            std::push_heap(rows.begin(), rows.end());
+          }
+        }
+
+        v = GET_OR_RET(ctx->Get(sort->op)->Next());
+      }
+
+      if (rows.size() <= sort->limit->offset) {
+        return end;
+      }
+
+      std::sort(rows.begin(), rows.end());
+      rows_iter = rows.begin() + 
static_cast<std::ptrdiff_t>(sort->limit->offset);
+      initialized = true;
+    }
+
+    if (rows_iter == rows.end()) {
+      return end;
+    }
+
+    auto res = rows_iter->row;
+    rows_iter++;
+    return res;
+  }
+};
+
+}  // namespace kqir
diff --git a/src/search/index_info.h b/src/search/index_info.h
index 5b0cb707..1751549d 100644
--- a/src/search/index_info.h
+++ b/src/search/index_info.h
@@ -54,6 +54,7 @@ struct IndexInfo {
   SearchMetadata metadata;
   FieldMap fields;
   redis::SearchPrefixesMetadata prefixes;
+  std::string ns;
 
   IndexInfo(std::string name, SearchMetadata metadata) : 
name(std::move(name)), metadata(std::move(metadata)) {}
 
@@ -64,6 +65,6 @@ struct IndexInfo {
   }
 };
 
-using IndexMap = std::map<std::string, IndexInfo>;
+using IndexMap = std::map<std::string, std::unique_ptr<IndexInfo>>;
 
 }  // namespace kqir
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 3e7bbf1f..4a4a949c 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -66,7 +66,7 @@ rocksdb::Status 
FieldValueRetriever::Retrieve(std::string_view field, std::strin
     return hash.storage_->Get(read_options, sub_key, output);
   } else if (std::holds_alternative<JsonData>(db)) {
     auto &value = std::get<JsonData>(db);
-    auto s = value.Get(field);
+    auto s = value.Get(field.front() == '$' ? field : fmt::format("$.{}", 
field));
     if (!s.IsOK()) return rocksdb::Status::Corruption(s.Msg());
     if (s->value.size() != 1)
       return rocksdb::Status::NotFound("json value specified by the field 
(json path) should exist and be unique");
@@ -231,7 +231,7 @@ Status IndexUpdater::Update(const FieldValues &original, 
std::string_view key, c
 
 void GlobalIndexer::Add(IndexUpdater updater) {
   updater.indexer = this;
-  for (const auto &prefix : updater.info->prefixes.prefixes) {
+  for (const auto &prefix : updater.info->prefixes) {
     prefix_map.insert(prefix, updater);
   }
 }
diff --git a/src/search/ir.h b/src/search/ir.h
index b841c0fa..be235e0d 100644
--- a/src/search/ir.h
+++ b/src/search/ir.h
@@ -76,6 +76,14 @@ struct Node {
     if (casted) original.release();
     return std::unique_ptr<T>(casted);
   }
+
+  template <typename T = Node, typename... Args>
+  static std::vector<std::unique_ptr<T>> List(std::unique_ptr<Args>... args) {
+    std::vector<std::unique_ptr<T>> result;
+    result.reserve(sizeof...(Args));
+    (result.push_back(std::move(args)), ...);
+    return result;
+  }
 };
 
 struct Ref : Node {};
@@ -379,6 +387,7 @@ struct IndexRef : Ref {
   const IndexInfo *info = nullptr;
 
   explicit IndexRef(std::string name) : name(std::move(name)) {}
+  explicit IndexRef(std::string name, const IndexInfo *info) : 
name(std::move(name)), info(info) {}
 
   std::string_view Name() const override { return "IndexRef"; }
   std::string Dump() const override { return name; }
diff --git a/src/search/ir_sema_checker.h b/src/search/ir_sema_checker.h
index d8982e5c..170e646f 100644
--- a/src/search/ir_sema_checker.h
+++ b/src/search/ir_sema_checker.h
@@ -42,7 +42,7 @@ struct SemaChecker {
     if (auto v = dynamic_cast<SearchExpr *>(node)) {
       auto index_name = v->index->name;
       if (auto iter = index_map.find(index_name); iter != index_map.end()) {
-        current_index = &iter->second;
+        current_index = iter->second.get();
         v->index->info = current_index;
 
         GET_OR_RET(Check(v->select.get()));
diff --git a/src/search/plan_executor.cc b/src/search/plan_executor.cc
new file mode 100644
index 00000000..75033f1a
--- /dev/null
+++ b/src/search/plan_executor.cc
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "plan_executor.h"
+
+#include <memory>
+
+#include "search/executors/filter_executor.h"
+#include "search/executors/full_index_scan_executor.h"
+#include "search/executors/limit_executor.h"
+#include "search/executors/merge_executor.h"
+#include "search/executors/mock_executor.h"
+#include "search/executors/noop_executor.h"
+#include "search/executors/projection_executor.h"
+#include "search/executors/sort_executor.h"
+#include "search/executors/topn_sort_executor.h"
+#include "search/indexer.h"
+#include "search/ir_plan.h"
+
+namespace kqir {
+
+namespace details {
+
+struct ExecutorContextVisitor {
+  ExecutorContext *ctx;
+
+  void Transform(PlanOperator *op) {
+    if (auto v = dynamic_cast<Limit *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<Noop *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<Merge *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<Sort *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<Filter *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<Projection *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<TopNSort *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<FullIndexScan *>(op)) {
+      return Visit(v);
+    }
+
+    if (auto v = dynamic_cast<Mock *>(op)) {
+      return Visit(v);
+    }
+
+    CHECK(false) << "unreachable";
+  }
+
+  void Visit(Limit *op) {
+    ctx->nodes[op] = std::make_unique<LimitExecutor>(ctx, op);
+    Transform(op->op.get());
+  }
+
+  void Visit(Sort *op) {
+    ctx->nodes[op] = std::make_unique<SortExecutor>(ctx, op);
+    Transform(op->op.get());
+  }
+
+  void Visit(Noop *op) { ctx->nodes[op] = std::make_unique<NoopExecutor>(ctx, 
op); }
+
+  void Visit(Merge *op) {
+    ctx->nodes[op] = std::make_unique<MergeExecutor>(ctx, op);
+    for (const auto &child : op->ops) Transform(child.get());
+  }
+
+  void Visit(Filter *op) {
+    ctx->nodes[op] = std::make_unique<FilterExecutor>(ctx, op);
+    Transform(op->source.get());
+  }
+
+  void Visit(Projection *op) {
+    ctx->nodes[op] = std::make_unique<ProjectionExecutor>(ctx, op);
+    Transform(op->source.get());
+  }
+
+  void Visit(TopNSort *op) {
+    ctx->nodes[op] = std::make_unique<TopNSortExecutor>(ctx, op);
+    Transform(op->op.get());
+  }
+
+  void Visit(FullIndexScan *op) { ctx->nodes[op] = 
std::make_unique<FullIndexScanExecutor>(ctx, op); }
+
+  void Visit(Mock *op) { ctx->nodes[op] = std::make_unique<MockExecutor>(ctx, 
op); }
+};
+
+}  // namespace details
+
+ExecutorContext::ExecutorContext(PlanOperator *op) : root(op) {
+  details::ExecutorContextVisitor visitor{this};
+  visitor.Transform(root);
+}
+
+ExecutorContext::ExecutorContext(PlanOperator *op, engine::Storage *storage) : 
root(op), storage(storage) {
+  details::ExecutorContextVisitor visitor{this};
+  visitor.Transform(root);
+}
+
+auto ExecutorContext::Retrieve(RowType &row, const FieldInfo *field) -> 
StatusOr<ValueType> {  // NOLINT
+  if (auto iter = row.fields.find(field); iter != row.fields.end()) {
+    return iter->second;
+  }
+
+  auto retriever = GET_OR_RET(
+      redis::FieldValueRetriever::Create(field->index->metadata.on_data_type, 
row.key, storage, field->index->ns));
+
+  std::string result;
+  auto s = retriever.Retrieve(field->name, &result);
+  if (!s.ok()) return {Status::NotOK, s.ToString()};
+
+  row.fields.emplace(field, result);
+  return result;
+}
+
+}  // namespace kqir
diff --git a/src/search/plan_executor.h b/src/search/plan_executor.h
new file mode 100644
index 00000000..82d8e73e
--- /dev/null
+++ b/src/search/plan_executor.h
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <variant>
+
+#include "ir_plan.h"
+#include "search/index_info.h"
+#include "storage/storage.h"
+#include "string_util.h"
+
+namespace kqir {
+
+struct ExecutorContext;
+
+struct ExecutorNode {
+  using KeyType = std::string;
+  using ValueType = std::string;
+  struct RowType {
+    KeyType key;
+    std::map<const FieldInfo *, ValueType> fields;
+    const IndexInfo *index;
+
+    bool operator==(const RowType &another) const {
+      return key == another.key && fields == another.fields && index == 
another.index;
+    }
+
+    bool operator!=(const RowType &another) const { return !(*this == 
another); }
+
+    // for debug purpose
+    friend std::ostream &operator<<(std::ostream &os, const RowType &row) {
+      if (row.index) {
+        os << row.key << "@" << row.index->name;
+      } else {
+        os << row.key;
+      }
+      return os << " {" << util::StringJoin(row.fields, [](const auto &v) { 
return v.first->name + ": " + v.second; })
+                << "}";
+    }
+  };
+
+  static constexpr inline const struct End {
+  } end{};
+  friend constexpr bool operator==(End, End) noexcept { return true; }
+  friend constexpr bool operator!=(End, End) noexcept { return false; }
+
+  using Result = std::variant<End, RowType>;
+
+  ExecutorContext *ctx;
+  explicit ExecutorNode(ExecutorContext *ctx) : ctx(ctx) {}
+
+  virtual StatusOr<Result> Next() = 0;
+  virtual ~ExecutorNode() = default;
+};
+
+struct ExecutorContext {
+  std::map<PlanOperator *, std::unique_ptr<ExecutorNode>> nodes;
+  PlanOperator *root;
+  engine::Storage *storage;
+
+  using Result = ExecutorNode::Result;
+  using RowType = ExecutorNode::RowType;
+  using KeyType = ExecutorNode::KeyType;
+  using ValueType = ExecutorNode::ValueType;
+
+  explicit ExecutorContext(PlanOperator *op);
+  explicit ExecutorContext(PlanOperator *op, engine::Storage *storage);
+
+  ExecutorNode *Get(PlanOperator *op) {
+    if (auto iter = nodes.find(op); iter != nodes.end()) {
+      return iter->second.get();
+    }
+
+    return nullptr;
+  }
+
+  ExecutorNode *Get(const std::unique_ptr<PlanOperator> &op) { return 
Get(op.get()); }
+
+  StatusOr<Result> Next() { return Get(root)->Next(); }
+  StatusOr<ValueType> Retrieve(RowType &row, const FieldInfo *field);
+};
+
+}  // namespace kqir
diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h
index 14bf2923..24731d32 100644
--- a/src/search/search_encoding.h
+++ b/src/search/search_encoding.h
@@ -45,6 +45,16 @@ inline std::string ConstructSearchPrefixesSubkey() { return 
{(char)SearchSubkeyT
 struct SearchPrefixesMetadata {
   std::vector<std::string> prefixes;
 
+  static inline const std::string all[] = {""};
+
+  auto begin() const {  // NOLINT
+    return prefixes.empty() ? std::begin(all) : prefixes.data();
+  }
+
+  auto end() const {  // NOLINT
+    return prefixes.empty() ? std::end(all) : prefixes.data() + 
prefixes.size();
+  }
+
   void Encode(std::string *dst) const {
     for (const auto &prefix : prefixes) {
       PutFixed32(dst, prefix.size());
diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc
index ae5a045e..ced039e0 100644
--- a/tests/cppunit/indexer_test.cc
+++ b/tests/cppunit/indexer_test.cc
@@ -39,26 +39,26 @@ struct IndexerTest : TestBase {
     SearchMetadata hash_field_meta(false);
     hash_field_meta.on_data_type = SearchOnDataType::HASH;
 
-    kqir::IndexInfo hash_info("hashtest", hash_field_meta);
-    hash_info.Add(kqir::FieldInfo("x", 
std::make_unique<redis::SearchTagFieldMetadata>()));
-    hash_info.Add(kqir::FieldInfo("y", 
std::make_unique<redis::SearchNumericFieldMetadata>()));
-    hash_info.prefixes.prefixes.emplace_back("idxtesthash");
+    auto hash_info = std::make_unique<kqir::IndexInfo>("hashtest", 
hash_field_meta);
+    hash_info->Add(kqir::FieldInfo("x", 
std::make_unique<redis::SearchTagFieldMetadata>()));
+    hash_info->Add(kqir::FieldInfo("y", 
std::make_unique<redis::SearchNumericFieldMetadata>()));
+    hash_info->prefixes.prefixes.emplace_back("idxtesthash");
 
     map.emplace("hashtest", std::move(hash_info));
 
-    redis::IndexUpdater hash_updater{&map.at("hashtest")};
+    redis::IndexUpdater hash_updater{map.at("hashtest").get()};
 
     SearchMetadata json_field_meta(false);
     json_field_meta.on_data_type = SearchOnDataType::JSON;
 
-    kqir::IndexInfo json_info("jsontest", json_field_meta);
-    json_info.Add(kqir::FieldInfo("$.x", 
std::make_unique<redis::SearchTagFieldMetadata>()));
-    json_info.Add(kqir::FieldInfo("$.y", 
std::make_unique<redis::SearchNumericFieldMetadata>()));
-    json_info.prefixes.prefixes.emplace_back("idxtestjson");
+    auto json_info = std::make_unique<kqir::IndexInfo>("jsontest", 
json_field_meta);
+    json_info->Add(kqir::FieldInfo("$.x", 
std::make_unique<redis::SearchTagFieldMetadata>()));
+    json_info->Add(kqir::FieldInfo("$.y", 
std::make_unique<redis::SearchNumericFieldMetadata>()));
+    json_info->prefixes.prefixes.emplace_back("idxtestjson");
 
     map.emplace("jsontest", std::move(json_info));
 
-    redis::IndexUpdater json_updater{&map.at("jsontest")};
+    redis::IndexUpdater json_updater{map.at("jsontest").get()};
 
     indexer.Add(std::move(hash_updater));
     indexer.Add(std::move(json_updater));
diff --git a/tests/cppunit/ir_dot_dumper_test.cc 
b/tests/cppunit/ir_dot_dumper_test.cc
index 66a588d9..1615b3ca 100644
--- a/tests/cppunit/ir_dot_dumper_test.cc
+++ b/tests/cppunit/ir_dot_dumper_test.cc
@@ -71,14 +71,14 @@ static IndexMap MakeIndexMap() {
   auto f4 = FieldInfo("n2", 
std::make_unique<redis::SearchNumericFieldMetadata>());
   auto f5 = FieldInfo("n3", 
std::make_unique<redis::SearchNumericFieldMetadata>());
   f5.metadata->noindex = true;
-  auto ia = IndexInfo("ia", SearchMetadata());
-  ia.Add(std::move(f1));
-  ia.Add(std::move(f2));
-  ia.Add(std::move(f3));
-  ia.Add(std::move(f4));
-  ia.Add(std::move(f5));
-
-  auto& name = ia.name;
+  auto ia = std::make_unique<IndexInfo>("ia", SearchMetadata());
+  ia->Add(std::move(f1));
+  ia->Add(std::move(f2));
+  ia->Add(std::move(f3));
+  ia->Add(std::move(f4));
+  ia->Add(std::move(f5));
+
+  auto& name = ia->name;
   IndexMap res;
   res.emplace(name, std::move(ia));
   return res;
diff --git a/tests/cppunit/ir_pass_test.cc b/tests/cppunit/ir_pass_test.cc
index a02dc907..6188bb49 100644
--- a/tests/cppunit/ir_pass_test.cc
+++ b/tests/cppunit/ir_pass_test.cc
@@ -176,14 +176,14 @@ static IndexMap MakeIndexMap() {
   auto f4 = FieldInfo("n2", 
std::make_unique<redis::SearchNumericFieldMetadata>());
   auto f5 = FieldInfo("n3", 
std::make_unique<redis::SearchNumericFieldMetadata>());
   f5.metadata->noindex = true;
-  auto ia = IndexInfo("ia", SearchMetadata());
-  ia.Add(std::move(f1));
-  ia.Add(std::move(f2));
-  ia.Add(std::move(f3));
-  ia.Add(std::move(f4));
-  ia.Add(std::move(f5));
+  auto ia = std::make_unique<IndexInfo>("ia", SearchMetadata());
+  ia->Add(std::move(f1));
+  ia->Add(std::move(f2));
+  ia->Add(std::move(f3));
+  ia->Add(std::move(f4));
+  ia->Add(std::move(f5));
 
-  auto& name = ia.name;
+  auto& name = ia->name;
   IndexMap res;
   res.emplace(name, std::move(ia));
   return res;
diff --git a/tests/cppunit/ir_sema_checker_test.cc 
b/tests/cppunit/ir_sema_checker_test.cc
index 9068a38b..678a0a0f 100644
--- a/tests/cppunit/ir_sema_checker_test.cc
+++ b/tests/cppunit/ir_sema_checker_test.cc
@@ -38,12 +38,12 @@ static IndexMap MakeIndexMap() {
   auto f1 = FieldInfo("f1", std::make_unique<redis::SearchTagFieldMetadata>());
   auto f2 = FieldInfo("f2", 
std::make_unique<redis::SearchNumericFieldMetadata>());
   auto f3 = FieldInfo("f3", 
std::make_unique<redis::SearchNumericFieldMetadata>());
-  auto ia = IndexInfo("ia", SearchMetadata());
-  ia.Add(std::move(f1));
-  ia.Add(std::move(f2));
-  ia.Add(std::move(f3));
+  auto ia = std::make_unique<IndexInfo>("ia", SearchMetadata());
+  ia->Add(std::move(f1));
+  ia->Add(std::move(f2));
+  ia->Add(std::move(f3));
 
-  auto& name = ia.name;
+  auto& name = ia->name;
   IndexMap res;
   res.emplace(name, std::move(ia));
   return res;
diff --git a/tests/cppunit/plan_executor_test.cc 
b/tests/cppunit/plan_executor_test.cc
new file mode 100644
index 00000000..59943d19
--- /dev/null
+++ b/tests/cppunit/plan_executor_test.cc
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "search/plan_executor.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "config/config.h"
+#include "search/executors/mock_executor.h"
+#include "search/ir.h"
+#include "search/ir_plan.h"
+#include "test_base.h"
+#include "types/redis_json.h"
+
+using namespace kqir;
+
+static auto exe_end = ExecutorNode::Result(ExecutorNode::end);
+
+static IndexMap MakeIndexMap() {
+  auto f1 = FieldInfo("f1", std::make_unique<redis::SearchTagFieldMetadata>());
+  auto f2 = FieldInfo("f2", 
std::make_unique<redis::SearchNumericFieldMetadata>());
+  auto f3 = FieldInfo("f3", 
std::make_unique<redis::SearchNumericFieldMetadata>());
+  auto ia = std::make_unique<IndexInfo>("ia", SearchMetadata());
+  ia->ns = "search_ns";
+  ia->metadata.on_data_type = SearchOnDataType::JSON;
+  ia->prefixes.prefixes.emplace_back("test2:");
+  ia->prefixes.prefixes.emplace_back("test4:");
+  ia->Add(std::move(f1));
+  ia->Add(std::move(f2));
+  ia->Add(std::move(f3));
+
+  auto& name = ia->name;
+  IndexMap res;
+  res.emplace(name, std::move(ia));
+  return res;
+}
+
+static auto index_map = MakeIndexMap();
+
+static auto NextRow(ExecutorContext& ctx) {
+  auto n = ctx.Next();
+  EXPECT_EQ(n.Msg(), Status::ok_msg);
+  auto v = std::move(n).GetValue();
+  EXPECT_EQ(v.index(), 1);
+  return std::get<ExecutorNode::RowType>(std::move(v));
+}
+
+TEST(PlanExecutorTest, Mock) {
+  auto op = std::make_unique<Mock>(std::vector<ExecutorNode::RowType>{});
+
+  auto ctx = ExecutorContext(op.get());
+  ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+
+  op = std::make_unique<Mock>(std::vector<ExecutorNode::RowType>{{"a"}, {"b"}, 
{"c"}});
+
+  ctx = ExecutorContext(op.get());
+  ASSERT_EQ(NextRow(ctx).key, "a");
+  ASSERT_EQ(NextRow(ctx).key, "b");
+  ASSERT_EQ(NextRow(ctx).key, "c");
+  ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+}
+
+static auto IndexI() -> const IndexInfo* { return index_map.at("ia").get(); }
+static auto FieldI(const std::string& f) -> const FieldInfo* { return 
&index_map.at("ia")->fields.at(f); }
+
+TEST(PlanExecutorTest, TopNSort) {
+  std::vector<ExecutorNode::RowType> data{
+      {"a", {{FieldI("f3"), "4"}}, IndexI()}, {"b", {{FieldI("f3"), "2"}}, 
IndexI()},
+      {"c", {{FieldI("f3"), "7"}}, IndexI()}, {"d", {{FieldI("f3"), "3"}}, 
IndexI()},
+      {"e", {{FieldI("f3"), "1"}}, IndexI()}, {"f", {{FieldI("f3"), "6"}}, 
IndexI()},
+      {"g", {{FieldI("f3"), "8"}}, IndexI()},
+  };
+  {
+    auto op = std::make_unique<TopNSort>(
+        std::make_unique<Mock>(data),
+        std::make_unique<SortByClause>(SortByClause::ASC, 
std::make_unique<FieldRef>("f3", FieldI("f3"))),
+        std::make_unique<LimitClause>(0, 4));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "e");
+    ASSERT_EQ(NextRow(ctx).key, "b");
+    ASSERT_EQ(NextRow(ctx).key, "d");
+    ASSERT_EQ(NextRow(ctx).key, "a");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+  {
+    auto op = std::make_unique<TopNSort>(
+        std::make_unique<Mock>(data),
+        std::make_unique<SortByClause>(SortByClause::ASC, 
std::make_unique<FieldRef>("f3", FieldI("f3"))),
+        std::make_unique<LimitClause>(1, 4));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "b");
+    ASSERT_EQ(NextRow(ctx).key, "d");
+    ASSERT_EQ(NextRow(ctx).key, "a");
+    ASSERT_EQ(NextRow(ctx).key, "f");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+}
+
+TEST(PlanExecutorTest, Filter) {
+  std::vector<ExecutorNode::RowType> data{
+      {"a", {{FieldI("f3"), "4"}}, IndexI()}, {"b", {{FieldI("f3"), "2"}}, 
IndexI()},
+      {"c", {{FieldI("f3"), "7"}}, IndexI()}, {"d", {{FieldI("f3"), "3"}}, 
IndexI()},
+      {"e", {{FieldI("f3"), "1"}}, IndexI()}, {"f", {{FieldI("f3"), "6"}}, 
IndexI()},
+      {"g", {{FieldI("f3"), "8"}}, IndexI()},
+  };
+  {
+    auto field = std::make_unique<FieldRef>("f3", FieldI("f3"));
+    auto op = std::make_unique<Filter>(
+        std::make_unique<Mock>(data),
+        AndExpr::Create(Node::List<QueryExpr>(
+            std::make_unique<NumericCompareExpr>(NumericCompareExpr::GT, 
field->CloneAs<FieldRef>(),
+                                                 
std::make_unique<NumericLiteral>(2)),
+            std::make_unique<NumericCompareExpr>(NumericCompareExpr::LET, 
field->CloneAs<FieldRef>(),
+                                                 
std::make_unique<NumericLiteral>(6)))));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "a");
+    ASSERT_EQ(NextRow(ctx).key, "d");
+    ASSERT_EQ(NextRow(ctx).key, "f");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+  {
+    auto field = std::make_unique<FieldRef>("f3", FieldI("f3"));
+    auto op = std::make_unique<Filter>(
+        std::make_unique<Mock>(data),
+        OrExpr::Create(Node::List<QueryExpr>(
+            std::make_unique<NumericCompareExpr>(NumericCompareExpr::GET, 
field->CloneAs<FieldRef>(),
+                                                 
std::make_unique<NumericLiteral>(6)),
+            std::make_unique<NumericCompareExpr>(NumericCompareExpr::LT, 
field->CloneAs<FieldRef>(),
+                                                 
std::make_unique<NumericLiteral>(2)))));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "c");
+    ASSERT_EQ(NextRow(ctx).key, "e");
+    ASSERT_EQ(NextRow(ctx).key, "f");
+    ASSERT_EQ(NextRow(ctx).key, "g");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+
+  data = {{"a", {{FieldI("f1"), "cpp,java"}}, IndexI()},    {"b", 
{{FieldI("f1"), "python,cpp,c"}}, IndexI()},
+          {"c", {{FieldI("f1"), "c,perl"}}, IndexI()},      {"d", 
{{FieldI("f1"), "rust,python"}}, IndexI()},
+          {"e", {{FieldI("f1"), "java,kotlin"}}, IndexI()}, {"f", 
{{FieldI("f1"), "c,rust"}}, IndexI()},
+          {"g", {{FieldI("f1"), "c,cpp,java"}}, IndexI()}};
+  {
+    auto field = std::make_unique<FieldRef>("f1", FieldI("f1"));
+    auto op = std::make_unique<Filter>(
+        std::make_unique<Mock>(data),
+        AndExpr::Create(Node::List<QueryExpr>(
+            std::make_unique<TagContainExpr>(field->CloneAs<FieldRef>(), 
std::make_unique<StringLiteral>("c")),
+            std::make_unique<TagContainExpr>(field->CloneAs<FieldRef>(), 
std::make_unique<StringLiteral>("cpp")))));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "b");
+    ASSERT_EQ(NextRow(ctx).key, "g");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+  {
+    auto field = std::make_unique<FieldRef>("f1", FieldI("f1"));
+    auto op = std::make_unique<Filter>(
+        std::make_unique<Mock>(data),
+        OrExpr::Create(Node::List<QueryExpr>(
+            std::make_unique<TagContainExpr>(field->CloneAs<FieldRef>(), 
std::make_unique<StringLiteral>("rust")),
+            std::make_unique<TagContainExpr>(field->CloneAs<FieldRef>(), 
std::make_unique<StringLiteral>("perl")))));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "c");
+    ASSERT_EQ(NextRow(ctx).key, "d");
+    ASSERT_EQ(NextRow(ctx).key, "f");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+}
+
+TEST(PlanExecutorTest, Limit) {
+  std::vector<ExecutorNode::RowType> data{
+      {"a", {{FieldI("f3"), "4"}}, IndexI()}, {"b", {{FieldI("f3"), "2"}}, 
IndexI()},
+      {"c", {{FieldI("f3"), "7"}}, IndexI()}, {"d", {{FieldI("f3"), "3"}}, 
IndexI()},
+      {"e", {{FieldI("f3"), "1"}}, IndexI()}, {"f", {{FieldI("f3"), "6"}}, 
IndexI()},
+      {"g", {{FieldI("f3"), "8"}}, IndexI()},
+  };
+  {
+    auto op = std::make_unique<Limit>(std::make_unique<Mock>(data), 
std::make_unique<LimitClause>(1, 2));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "b");
+    ASSERT_EQ(NextRow(ctx).key, "c");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+  {
+    auto field = std::make_unique<FieldRef>("f3", FieldI("f3"));
+    auto op = std::make_unique<Limit>(std::make_unique<Mock>(data), 
std::make_unique<LimitClause>(0, 3));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "a");
+    ASSERT_EQ(NextRow(ctx).key, "b");
+    ASSERT_EQ(NextRow(ctx).key, "c");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+}
+
+TEST(PlanExecutorTest, Merge) {
+  std::vector<ExecutorNode::RowType> data1{
+      {"a", {{FieldI("f3"), "4"}}, IndexI()},
+      {"b", {{FieldI("f3"), "2"}}, IndexI()},
+  };
+  std::vector<ExecutorNode::RowType> data2{{"c", {{FieldI("f3"), "7"}}, 
IndexI()},
+                                           {"d", {{FieldI("f3"), "3"}}, 
IndexI()},
+                                           {"e", {{FieldI("f3"), "1"}}, 
IndexI()}};
+  {
+    auto op =
+        
std::make_unique<Merge>(Node::List<PlanOperator>(std::make_unique<Mock>(data1), 
std::make_unique<Mock>(data2)));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "a");
+    ASSERT_EQ(NextRow(ctx).key, "b");
+    ASSERT_EQ(NextRow(ctx).key, "c");
+    ASSERT_EQ(NextRow(ctx).key, "d");
+    ASSERT_EQ(NextRow(ctx).key, "e");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+  {
+    auto op = std::make_unique<Merge>(
+        Node::List<PlanOperator>(std::make_unique<Mock>(decltype(data1){}), 
std::make_unique<Mock>(data1)));
+
+    auto ctx = ExecutorContext(op.get());
+    ASSERT_EQ(NextRow(ctx).key, "a");
+    ASSERT_EQ(NextRow(ctx).key, "b");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+}
+
+class PlanExecutorTestC : public TestBase {
+ protected:
+  explicit PlanExecutorTestC() : 
json_(std::make_unique<redis::Json>(storage_.get(), "search_ns")) {}
+  ~PlanExecutorTestC() override = default;
+
+  void SetUp() override {}
+  void TearDown() override {}
+
+  std::unique_ptr<redis::Json> json_;
+};
+
+TEST_F(PlanExecutorTestC, FullIndexScan) {
+  json_->Set("test1:a", "$", "{}");
+  json_->Set("test1:b", "$", "{}");
+  json_->Set("test2:c", "$", "{\"f3\": 6}");
+  json_->Set("test3:d", "$", "{}");
+  json_->Set("test4:e", "$", "{\"f3\": 7}");
+  json_->Set("test4:f", "$", "{\"f3\": 2}");
+  json_->Set("test4:g", "$", "{\"f3\": 8}");
+  json_->Set("test5:h", "$", "{}");
+  json_->Set("test5:i", "$", "{}");
+  json_->Set("test5:g", "$", "{}");
+
+  {
+    auto op = std::make_unique<FullIndexScan>(std::make_unique<IndexRef>("ia", 
IndexI()));
+
+    auto ctx = ExecutorContext(op.get(), storage_.get());
+    ASSERT_EQ(NextRow(ctx).key, "test2:c");
+    ASSERT_EQ(NextRow(ctx).key, "test4:e");
+    ASSERT_EQ(NextRow(ctx).key, "test4:f");
+    ASSERT_EQ(NextRow(ctx).key, "test4:g");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+
+  {
+    auto op = std::make_unique<Filter>(
+        std::make_unique<FullIndexScan>(std::make_unique<IndexRef>("ia", 
IndexI())),
+        std::make_unique<NumericCompareExpr>(NumericCompareExpr::GT, 
std::make_unique<FieldRef>("f3", FieldI("f3")),
+                                             
std::make_unique<NumericLiteral>(3)));
+
+    auto ctx = ExecutorContext(op.get(), storage_.get());
+    ASSERT_EQ(NextRow(ctx).key, "test2:c");
+    ASSERT_EQ(NextRow(ctx).key, "test4:e");
+    ASSERT_EQ(NextRow(ctx).key, "test4:g");
+    ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+  }
+}

Reply via email to