This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new dfffffef Add plan executors for KQIR via iterator model (#2281)
dfffffef is described below
commit dfffffefffd1bd693eb5ae9078a46debabddb6de
Author: Twice <[email protected]>
AuthorDate: Sat May 4 11:52:31 2024 +0900
Add plan executors for KQIR via iterator model (#2281)
---
src/search/executors/filter_executor.h | 129 ++++++++++
src/search/executors/full_index_scan_executor.h | 76 ++++++
src/search/executors/limit_executor.h | 57 +++++
src/search/executors/merge_executor.h | 54 +++++
src/search/executors/mock_executor.h | 57 +++++
src/search/executors/noop_executor.h | 35 +++
src/search/executors/projection_executor.h | 59 +++++
src/search/executors/sort_executor.h | 39 ++++
src/search/executors/topn_sort_executor.h | 104 +++++++++
src/search/index_info.h | 3 +-
src/search/indexer.cc | 4 +-
src/search/ir.h | 9 +
src/search/ir_sema_checker.h | 2 +-
src/search/plan_executor.cc | 149 ++++++++++++
src/search/plan_executor.h | 101 ++++++++
src/search/search_encoding.h | 10 +
tests/cppunit/indexer_test.cc | 20 +-
tests/cppunit/ir_dot_dumper_test.cc | 16 +-
tests/cppunit/ir_pass_test.cc | 14 +-
tests/cppunit/ir_sema_checker_test.cc | 10 +-
tests/cppunit/plan_executor_test.cc | 297 ++++++++++++++++++++++++
21 files changed, 1211 insertions(+), 34 deletions(-)
diff --git a/src/search/executors/filter_executor.h
b/src/search/executors/filter_executor.h
new file mode 100644
index 00000000..83d45377
--- /dev/null
+++ b/src/search/executors/filter_executor.h
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <variant>
+
+#include "parse_util.h"
+#include "search/ir.h"
+#include "search/plan_executor.h"
+#include "search/search_encoding.h"
+#include "string_util.h"
+
+namespace kqir {
+
+struct QueryExprEvaluator {
+ ExecutorContext *ctx;
+ ExecutorNode::RowType &row;
+
+ StatusOr<bool> Transform(QueryExpr *e) const {
+ if (auto v = dynamic_cast<AndExpr *>(e)) {
+ return Visit(v);
+ }
+ if (auto v = dynamic_cast<OrExpr *>(e)) {
+ return Visit(v);
+ }
+ if (auto v = dynamic_cast<NotExpr *>(e)) {
+ return Visit(v);
+ }
+ if (auto v = dynamic_cast<NumericCompareExpr *>(e)) {
+ return Visit(v);
+ }
+ if (auto v = dynamic_cast<TagContainExpr *>(e)) {
+ return Visit(v);
+ }
+
+ CHECK(false) << "unreachable";
+ }
+
+ StatusOr<bool> Visit(AndExpr *v) const {
+ for (const auto &n : v->inners) {
+ if (!GET_OR_RET(Transform(n.get()))) return false;
+ }
+
+ return true;
+ }
+
+ StatusOr<bool> Visit(OrExpr *v) const {
+ for (const auto &n : v->inners) {
+ if (GET_OR_RET(Transform(n.get()))) return true;
+ }
+
+ return false;
+ }
+
+ StatusOr<bool> Visit(NotExpr *v) const { return
!GET_OR_RET(Transform(v->inner.get())); }
+
+ StatusOr<bool> Visit(TagContainExpr *v) const {
+ auto val = GET_OR_RET(ctx->Retrieve(row, v->field->info));
+ auto meta = v->field->info->MetadataAs<redis::SearchTagFieldMetadata>();
+
+ auto split = util::Split(val, std::string(1, meta->separator));
+ return std::find(split.begin(), split.end(), v->tag->val) != split.end();
+ }
+
+ StatusOr<bool> Visit(NumericCompareExpr *v) const {
+ auto l_str = GET_OR_RET(ctx->Retrieve(row, v->field->info));
+
+ // TODO: reconsider how to handle failure case here
+ auto l = GET_OR_RET(ParseFloat(l_str));
+ auto r = v->num->val;
+
+ switch (v->op) {
+ case NumericCompareExpr::EQ:
+ return l == r;
+ case NumericCompareExpr::NE:
+ return l != r;
+ case NumericCompareExpr::LT:
+ return l < r;
+ case NumericCompareExpr::LET:
+ return l <= r;
+ case NumericCompareExpr::GT:
+ return l > r;
+ case NumericCompareExpr::GET:
+ return l >= r;
+ }
+ }
+};
+
+struct FilterExecutor : ExecutorNode {
+ Filter *filter;
+
+ FilterExecutor(ExecutorContext *ctx, Filter *filter) : ExecutorNode(ctx),
filter(filter) {}
+
+ StatusOr<Result> Next() override {
+ while (true) {
+ auto v = GET_OR_RET(ctx->Get(filter->source)->Next());
+
+ if (std::holds_alternative<End>(v)) return end;
+
+ QueryExprEvaluator eval{ctx, std::get<RowType>(v)};
+
+ bool res = GET_OR_RET(eval.Transform(filter->filter_expr.get()));
+
+ if (res) {
+ return v;
+ }
+ }
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/full_index_scan_executor.h
b/src/search/executors/full_index_scan_executor.h
new file mode 100644
index 00000000..0afeae04
--- /dev/null
+++ b/src/search/executors/full_index_scan_executor.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "db_util.h"
+#include "search/plan_executor.h"
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+#include "storage/storage.h"
+
+namespace kqir {
+
+struct FullIndexScanExecutor : ExecutorNode {
+ FullIndexScan *scan;
+ redis::LatestSnapShot ss;
+ util::UniqueIterator iter{nullptr};
+ const std::string *prefix_iter;
+
+ FullIndexScanExecutor(ExecutorContext *ctx, FullIndexScan *scan)
+ : ExecutorNode(ctx), scan(scan), ss(ctx->storage),
prefix_iter(scan->index->info->prefixes.begin()) {}
+
+ std::string NSKey(const std::string &user_key) {
+ return ComposeNamespaceKey(scan->index->info->ns, user_key,
ctx->storage->IsSlotIdEncoded());
+ }
+
+ StatusOr<Result> Next() override {
+ if (prefix_iter == scan->index->info->prefixes.end()) {
+ return end;
+ }
+
+ auto ns_key = NSKey(*prefix_iter);
+ if (!iter) {
+ rocksdb::ReadOptions read_options = ctx->storage->DefaultScanOptions();
+ read_options.snapshot = ss.GetSnapShot();
+ iter = util::UniqueIterator(ctx->storage, read_options,
+
ctx->storage->GetCFHandle(engine::kMetadataColumnFamilyName));
+ iter->Seek(ns_key);
+ }
+
+ while (!iter->Valid() || !iter->key().starts_with(ns_key)) {
+ prefix_iter++;
+ if (prefix_iter == scan->index->info->prefixes.end()) {
+ return end;
+ }
+
+ ns_key = NSKey(*prefix_iter);
+ iter->Seek(ns_key);
+ }
+
+ auto [_, key] = ExtractNamespaceKey(iter->key(),
ctx->storage->IsSlotIdEncoded());
+ auto key_str = key.ToString();
+
+ iter->Next();
+ return RowType{key_str, {}, scan->index->info};
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/limit_executor.h
b/src/search/executors/limit_executor.h
new file mode 100644
index 00000000..8b1d4916
--- /dev/null
+++ b/src/search/executors/limit_executor.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct LimitExecutor : ExecutorNode {
+ Limit *limit;
+ size_t step = 0;
+
+ LimitExecutor(ExecutorContext *ctx, Limit *limit) : ExecutorNode(ctx),
limit(limit) {}
+
+ StatusOr<Result> Next() override {
+ auto offset = limit->limit->offset;
+ auto count = limit->limit->count;
+
+ if (step == count) {
+ return end;
+ }
+
+ if (step == 0) {
+ while (offset--) {
+ auto res = GET_OR_RET(ctx->Get(limit->op)->Next());
+
+ if (std::holds_alternative<End>(res)) {
+ return end;
+ }
+ }
+ }
+
+ auto res = GET_OR_RET(ctx->Get(limit->op)->Next());
+ step++;
+ return res;
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/merge_executor.h
b/src/search/executors/merge_executor.h
new file mode 100644
index 00000000..66b7bb85
--- /dev/null
+++ b/src/search/executors/merge_executor.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <variant>
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct MergeExecutor : ExecutorNode {
+ Merge *merge;
+ decltype(merge->ops)::iterator iter;
+
+ MergeExecutor(ExecutorContext *ctx, Merge *merge) : ExecutorNode(ctx),
merge(merge), iter(merge->ops.begin()) {}
+
+ StatusOr<Result> Next() override {
+ if (iter == merge->ops.end()) {
+ return end;
+ }
+
+ auto v = GET_OR_RET(ctx->Get(*iter)->Next());
+ while (std::holds_alternative<End>(v)) {
+ iter++;
+ if (iter == merge->ops.end()) {
+ return end;
+ }
+
+ v = GET_OR_RET(ctx->Get(*iter)->Next());
+ }
+
+ return v;
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/mock_executor.h
b/src/search/executors/mock_executor.h
new file mode 100644
index 00000000..f9cdf57d
--- /dev/null
+++ b/src/search/executors/mock_executor.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "search/ir_plan.h"
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+// this operator is only for executor-testing/debugging purpose
+struct Mock : PlanOperator {
+ std::vector<ExecutorNode::RowType> rows;
+
+ explicit Mock(std::vector<ExecutorNode::RowType> rows) :
rows(std::move(rows)) {}
+
+ std::string Dump() const override { return "mock"; }
+ std::string_view Name() const override { return "Mock"; }
+
+ std::unique_ptr<Node> Clone() const override { return
std::make_unique<Mock>(rows); }
+};
+
+struct MockExecutor : ExecutorNode {
+ Mock *mock;
+ decltype(mock->rows)::iterator iter;
+
+ MockExecutor(ExecutorContext *ctx, Mock *mock) : ExecutorNode(ctx),
mock(mock), iter(mock->rows.begin()) {}
+
+ StatusOr<Result> Next() override {
+ if (iter == mock->rows.end()) {
+ return end;
+ }
+
+ return *(iter++);
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/noop_executor.h
b/src/search/executors/noop_executor.h
new file mode 100644
index 00000000..1e3685ca
--- /dev/null
+++ b/src/search/executors/noop_executor.h
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct NoopExecutor : ExecutorNode {
+ Noop *noop;
+
+ NoopExecutor(ExecutorContext *ctx, Noop *noop) : ExecutorNode(ctx),
noop(noop) {}
+
+ StatusOr<Result> Next() override { return end; }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/projection_executor.h
b/src/search/executors/projection_executor.h
new file mode 100644
index 00000000..fe167334
--- /dev/null
+++ b/src/search/executors/projection_executor.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <variant>
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct ProjectionExecutor : ExecutorNode {
+ Projection *proj;
+
+ ProjectionExecutor(ExecutorContext *ctx, Projection *proj) :
ExecutorNode(ctx), proj(proj) {}
+
+ StatusOr<Result> Next() override {
+ auto v = GET_OR_RET(ctx->Get(proj->source)->Next());
+
+ if (std::holds_alternative<End>(v)) return end;
+
+ auto &row = std::get<RowType>(v);
+ if (proj->select->fields.empty()) {
+ for (const auto &field : row.index->fields) {
+ GET_OR_RET(ctx->Retrieve(row, &field.second));
+ }
+ } else {
+ std::map<const FieldInfo *, ValueType> res;
+
+ for (const auto &field : proj->select->fields) {
+ auto r = GET_OR_RET(ctx->Retrieve(row, field->info));
+ res.emplace(field->info, std::move(r));
+ }
+
+ return RowType{row.key, res, row.index};
+ }
+
+ return v;
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/sort_executor.h
b/src/search/executors/sort_executor.h
new file mode 100644
index 00000000..ed4b205d
--- /dev/null
+++ b/src/search/executors/sort_executor.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct SortExecutor : ExecutorNode {
+ Sort *sort;
+
+ SortExecutor(ExecutorContext *ctx, Sort *sort) : ExecutorNode(ctx),
sort(sort) {}
+
+ StatusOr<Result> Next() override {
+ // most of the sort operator will be eliminated via the optimizer passes,
+ // so currently we don't support this operator since external sort is a
little complicated
+ return {Status::NotSupported, "sort operator is currently not supported"};
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/executors/topn_sort_executor.h
b/src/search/executors/topn_sort_executor.h
new file mode 100644
index 00000000..741a9689
--- /dev/null
+++ b/src/search/executors/topn_sort_executor.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <variant>
+
+#include "parse_util.h"
+#include "search/plan_executor.h"
+
+namespace kqir {
+
+struct TopNSortExecutor : ExecutorNode {
+ TopNSort *sort;
+
+ struct ComparedRow {
+ RowType row;
+ double val;
+
+ ComparedRow(RowType row, double val) : row(std::move(row)), val(val) {}
+
+ friend bool operator<(const ComparedRow &l, const ComparedRow &r) { return
l.val < r.val; }
+ };
+
+ std::vector<ComparedRow> rows;
+ decltype(rows)::iterator rows_iter;
+ bool initialized = false;
+
+ TopNSortExecutor(ExecutorContext *ctx, TopNSort *sort) : ExecutorNode(ctx),
sort(sort) {}
+
+ StatusOr<Result> Next() override {
+ if (!initialized) {
+ auto total = sort->limit->offset + sort->limit->count;
+ if (total == 0) return end;
+
+ auto v = GET_OR_RET(ctx->Get(sort->op)->Next());
+
+ while (!std::holds_alternative<End>(v)) {
+ auto &row = std::get<RowType>(v);
+
+ auto get_order = [this](RowType &row) -> StatusOr<double> {
+ auto order_str = GET_OR_RET(ctx->Retrieve(row,
sort->order->field->info));
+ auto order = GET_OR_RET(ParseFloat(order_str));
+ return order;
+ };
+
+ if (rows.size() == total) {
+ std::make_heap(rows.begin(), rows.end());
+ }
+
+ if (rows.size() < total) {
+ auto order = GET_OR_RET(get_order(row));
+ rows.emplace_back(row, order);
+ } else {
+ auto order = GET_OR_RET(get_order(row));
+
+ if (order < rows[0].val) {
+ std::pop_heap(rows.begin(), rows.end());
+ rows.back() = ComparedRow{row, order};
+ std::push_heap(rows.begin(), rows.end());
+ }
+ }
+
+ v = GET_OR_RET(ctx->Get(sort->op)->Next());
+ }
+
+ if (rows.size() <= sort->limit->offset) {
+ return end;
+ }
+
+ std::sort(rows.begin(), rows.end());
+ rows_iter = rows.begin() +
static_cast<std::ptrdiff_t>(sort->limit->offset);
+ initialized = true;
+ }
+
+ if (rows_iter == rows.end()) {
+ return end;
+ }
+
+ auto res = rows_iter->row;
+ rows_iter++;
+ return res;
+ }
+};
+
+} // namespace kqir
diff --git a/src/search/index_info.h b/src/search/index_info.h
index 5b0cb707..1751549d 100644
--- a/src/search/index_info.h
+++ b/src/search/index_info.h
@@ -54,6 +54,7 @@ struct IndexInfo {
SearchMetadata metadata;
FieldMap fields;
redis::SearchPrefixesMetadata prefixes;
+ std::string ns;
IndexInfo(std::string name, SearchMetadata metadata) :
name(std::move(name)), metadata(std::move(metadata)) {}
@@ -64,6 +65,6 @@ struct IndexInfo {
}
};
-using IndexMap = std::map<std::string, IndexInfo>;
+using IndexMap = std::map<std::string, std::unique_ptr<IndexInfo>>;
} // namespace kqir
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 3e7bbf1f..4a4a949c 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -66,7 +66,7 @@ rocksdb::Status
FieldValueRetriever::Retrieve(std::string_view field, std::strin
return hash.storage_->Get(read_options, sub_key, output);
} else if (std::holds_alternative<JsonData>(db)) {
auto &value = std::get<JsonData>(db);
- auto s = value.Get(field);
+ auto s = value.Get(field.front() == '$' ? field : fmt::format("$.{}",
field));
if (!s.IsOK()) return rocksdb::Status::Corruption(s.Msg());
if (s->value.size() != 1)
return rocksdb::Status::NotFound("json value specified by the field
(json path) should exist and be unique");
@@ -231,7 +231,7 @@ Status IndexUpdater::Update(const FieldValues &original,
std::string_view key, c
void GlobalIndexer::Add(IndexUpdater updater) {
updater.indexer = this;
- for (const auto &prefix : updater.info->prefixes.prefixes) {
+ for (const auto &prefix : updater.info->prefixes) {
prefix_map.insert(prefix, updater);
}
}
diff --git a/src/search/ir.h b/src/search/ir.h
index b841c0fa..be235e0d 100644
--- a/src/search/ir.h
+++ b/src/search/ir.h
@@ -76,6 +76,14 @@ struct Node {
if (casted) original.release();
return std::unique_ptr<T>(casted);
}
+
+ template <typename T = Node, typename... Args>
+ static std::vector<std::unique_ptr<T>> List(std::unique_ptr<Args>... args) {
+ std::vector<std::unique_ptr<T>> result;
+ result.reserve(sizeof...(Args));
+ (result.push_back(std::move(args)), ...);
+ return result;
+ }
};
struct Ref : Node {};
@@ -379,6 +387,7 @@ struct IndexRef : Ref {
const IndexInfo *info = nullptr;
explicit IndexRef(std::string name) : name(std::move(name)) {}
+ explicit IndexRef(std::string name, const IndexInfo *info) :
name(std::move(name)), info(info) {}
std::string_view Name() const override { return "IndexRef"; }
std::string Dump() const override { return name; }
diff --git a/src/search/ir_sema_checker.h b/src/search/ir_sema_checker.h
index d8982e5c..170e646f 100644
--- a/src/search/ir_sema_checker.h
+++ b/src/search/ir_sema_checker.h
@@ -42,7 +42,7 @@ struct SemaChecker {
if (auto v = dynamic_cast<SearchExpr *>(node)) {
auto index_name = v->index->name;
if (auto iter = index_map.find(index_name); iter != index_map.end()) {
- current_index = &iter->second;
+ current_index = iter->second.get();
v->index->info = current_index;
GET_OR_RET(Check(v->select.get()));
diff --git a/src/search/plan_executor.cc b/src/search/plan_executor.cc
new file mode 100644
index 00000000..75033f1a
--- /dev/null
+++ b/src/search/plan_executor.cc
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "plan_executor.h"
+
+#include <memory>
+
+#include "search/executors/filter_executor.h"
+#include "search/executors/full_index_scan_executor.h"
+#include "search/executors/limit_executor.h"
+#include "search/executors/merge_executor.h"
+#include "search/executors/mock_executor.h"
+#include "search/executors/noop_executor.h"
+#include "search/executors/projection_executor.h"
+#include "search/executors/sort_executor.h"
+#include "search/executors/topn_sort_executor.h"
+#include "search/indexer.h"
+#include "search/ir_plan.h"
+
+namespace kqir {
+
+namespace details {
+
+struct ExecutorContextVisitor {
+ ExecutorContext *ctx;
+
+ void Transform(PlanOperator *op) {
+ if (auto v = dynamic_cast<Limit *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<Noop *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<Merge *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<Sort *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<Filter *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<Projection *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<TopNSort *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<FullIndexScan *>(op)) {
+ return Visit(v);
+ }
+
+ if (auto v = dynamic_cast<Mock *>(op)) {
+ return Visit(v);
+ }
+
+ CHECK(false) << "unreachable";
+ }
+
+ void Visit(Limit *op) {
+ ctx->nodes[op] = std::make_unique<LimitExecutor>(ctx, op);
+ Transform(op->op.get());
+ }
+
+ void Visit(Sort *op) {
+ ctx->nodes[op] = std::make_unique<SortExecutor>(ctx, op);
+ Transform(op->op.get());
+ }
+
+ void Visit(Noop *op) { ctx->nodes[op] = std::make_unique<NoopExecutor>(ctx,
op); }
+
+ void Visit(Merge *op) {
+ ctx->nodes[op] = std::make_unique<MergeExecutor>(ctx, op);
+ for (const auto &child : op->ops) Transform(child.get());
+ }
+
+ void Visit(Filter *op) {
+ ctx->nodes[op] = std::make_unique<FilterExecutor>(ctx, op);
+ Transform(op->source.get());
+ }
+
+ void Visit(Projection *op) {
+ ctx->nodes[op] = std::make_unique<ProjectionExecutor>(ctx, op);
+ Transform(op->source.get());
+ }
+
+ void Visit(TopNSort *op) {
+ ctx->nodes[op] = std::make_unique<TopNSortExecutor>(ctx, op);
+ Transform(op->op.get());
+ }
+
+ void Visit(FullIndexScan *op) { ctx->nodes[op] =
std::make_unique<FullIndexScanExecutor>(ctx, op); }
+
+ void Visit(Mock *op) { ctx->nodes[op] = std::make_unique<MockExecutor>(ctx,
op); }
+};
+
+} // namespace details
+
+ExecutorContext::ExecutorContext(PlanOperator *op) : root(op) {
+ details::ExecutorContextVisitor visitor{this};
+ visitor.Transform(root);
+}
+
+ExecutorContext::ExecutorContext(PlanOperator *op, engine::Storage *storage) :
root(op), storage(storage) {
+ details::ExecutorContextVisitor visitor{this};
+ visitor.Transform(root);
+}
+
+auto ExecutorContext::Retrieve(RowType &row, const FieldInfo *field) ->
StatusOr<ValueType> { // NOLINT
+ if (auto iter = row.fields.find(field); iter != row.fields.end()) {
+ return iter->second;
+ }
+
+ auto retriever = GET_OR_RET(
+ redis::FieldValueRetriever::Create(field->index->metadata.on_data_type,
row.key, storage, field->index->ns));
+
+ std::string result;
+ auto s = retriever.Retrieve(field->name, &result);
+ if (!s.ok()) return {Status::NotOK, s.ToString()};
+
+ row.fields.emplace(field, result);
+ return result;
+}
+
+} // namespace kqir
diff --git a/src/search/plan_executor.h b/src/search/plan_executor.h
new file mode 100644
index 00000000..82d8e73e
--- /dev/null
+++ b/src/search/plan_executor.h
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <variant>
+
+#include "ir_plan.h"
+#include "search/index_info.h"
+#include "storage/storage.h"
+#include "string_util.h"
+
+namespace kqir {
+
+struct ExecutorContext;
+
+struct ExecutorNode {
+ using KeyType = std::string;
+ using ValueType = std::string;
+ struct RowType {
+ KeyType key;
+ std::map<const FieldInfo *, ValueType> fields;
+ const IndexInfo *index;
+
+ bool operator==(const RowType &another) const {
+ return key == another.key && fields == another.fields && index ==
another.index;
+ }
+
+ bool operator!=(const RowType &another) const { return !(*this ==
another); }
+
+ // for debug purpose
+ friend std::ostream &operator<<(std::ostream &os, const RowType &row) {
+ if (row.index) {
+ os << row.key << "@" << row.index->name;
+ } else {
+ os << row.key;
+ }
+ return os << " {" << util::StringJoin(row.fields, [](const auto &v) {
return v.first->name + ": " + v.second; })
+ << "}";
+ }
+ };
+
+ static constexpr inline const struct End {
+ } end{};
+ friend constexpr bool operator==(End, End) noexcept { return true; }
+ friend constexpr bool operator!=(End, End) noexcept { return false; }
+
+ using Result = std::variant<End, RowType>;
+
+ ExecutorContext *ctx;
+ explicit ExecutorNode(ExecutorContext *ctx) : ctx(ctx) {}
+
+ virtual StatusOr<Result> Next() = 0;
+ virtual ~ExecutorNode() = default;
+};
+
+struct ExecutorContext {
+ std::map<PlanOperator *, std::unique_ptr<ExecutorNode>> nodes;
+ PlanOperator *root;
+ engine::Storage *storage;
+
+ using Result = ExecutorNode::Result;
+ using RowType = ExecutorNode::RowType;
+ using KeyType = ExecutorNode::KeyType;
+ using ValueType = ExecutorNode::ValueType;
+
+ explicit ExecutorContext(PlanOperator *op);
+ explicit ExecutorContext(PlanOperator *op, engine::Storage *storage);
+
+ ExecutorNode *Get(PlanOperator *op) {
+ if (auto iter = nodes.find(op); iter != nodes.end()) {
+ return iter->second.get();
+ }
+
+ return nullptr;
+ }
+
+ ExecutorNode *Get(const std::unique_ptr<PlanOperator> &op) { return
Get(op.get()); }
+
+ StatusOr<Result> Next() { return Get(root)->Next(); }
+ StatusOr<ValueType> Retrieve(RowType &row, const FieldInfo *field);
+};
+
+} // namespace kqir
diff --git a/src/search/search_encoding.h b/src/search/search_encoding.h
index 14bf2923..24731d32 100644
--- a/src/search/search_encoding.h
+++ b/src/search/search_encoding.h
@@ -45,6 +45,16 @@ inline std::string ConstructSearchPrefixesSubkey() { return
{(char)SearchSubkeyT
struct SearchPrefixesMetadata {
std::vector<std::string> prefixes;
+ static inline const std::string all[] = {""};
+
+ auto begin() const { // NOLINT
+ return prefixes.empty() ? std::begin(all) : prefixes.data();
+ }
+
+ auto end() const { // NOLINT
+ return prefixes.empty() ? std::end(all) : prefixes.data() +
prefixes.size();
+ }
+
void Encode(std::string *dst) const {
for (const auto &prefix : prefixes) {
PutFixed32(dst, prefix.size());
diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc
index ae5a045e..ced039e0 100644
--- a/tests/cppunit/indexer_test.cc
+++ b/tests/cppunit/indexer_test.cc
@@ -39,26 +39,26 @@ struct IndexerTest : TestBase {
SearchMetadata hash_field_meta(false);
hash_field_meta.on_data_type = SearchOnDataType::HASH;
- kqir::IndexInfo hash_info("hashtest", hash_field_meta);
- hash_info.Add(kqir::FieldInfo("x",
std::make_unique<redis::SearchTagFieldMetadata>()));
- hash_info.Add(kqir::FieldInfo("y",
std::make_unique<redis::SearchNumericFieldMetadata>()));
- hash_info.prefixes.prefixes.emplace_back("idxtesthash");
+ auto hash_info = std::make_unique<kqir::IndexInfo>("hashtest",
hash_field_meta);
+ hash_info->Add(kqir::FieldInfo("x",
std::make_unique<redis::SearchTagFieldMetadata>()));
+ hash_info->Add(kqir::FieldInfo("y",
std::make_unique<redis::SearchNumericFieldMetadata>()));
+ hash_info->prefixes.prefixes.emplace_back("idxtesthash");
map.emplace("hashtest", std::move(hash_info));
- redis::IndexUpdater hash_updater{&map.at("hashtest")};
+ redis::IndexUpdater hash_updater{map.at("hashtest").get()};
SearchMetadata json_field_meta(false);
json_field_meta.on_data_type = SearchOnDataType::JSON;
- kqir::IndexInfo json_info("jsontest", json_field_meta);
- json_info.Add(kqir::FieldInfo("$.x",
std::make_unique<redis::SearchTagFieldMetadata>()));
- json_info.Add(kqir::FieldInfo("$.y",
std::make_unique<redis::SearchNumericFieldMetadata>()));
- json_info.prefixes.prefixes.emplace_back("idxtestjson");
+ auto json_info = std::make_unique<kqir::IndexInfo>("jsontest",
json_field_meta);
+ json_info->Add(kqir::FieldInfo("$.x",
std::make_unique<redis::SearchTagFieldMetadata>()));
+ json_info->Add(kqir::FieldInfo("$.y",
std::make_unique<redis::SearchNumericFieldMetadata>()));
+ json_info->prefixes.prefixes.emplace_back("idxtestjson");
map.emplace("jsontest", std::move(json_info));
- redis::IndexUpdater json_updater{&map.at("jsontest")};
+ redis::IndexUpdater json_updater{map.at("jsontest").get()};
indexer.Add(std::move(hash_updater));
indexer.Add(std::move(json_updater));
diff --git a/tests/cppunit/ir_dot_dumper_test.cc
b/tests/cppunit/ir_dot_dumper_test.cc
index 66a588d9..1615b3ca 100644
--- a/tests/cppunit/ir_dot_dumper_test.cc
+++ b/tests/cppunit/ir_dot_dumper_test.cc
@@ -71,14 +71,14 @@ static IndexMap MakeIndexMap() {
auto f4 = FieldInfo("n2",
std::make_unique<redis::SearchNumericFieldMetadata>());
auto f5 = FieldInfo("n3",
std::make_unique<redis::SearchNumericFieldMetadata>());
f5.metadata->noindex = true;
- auto ia = IndexInfo("ia", SearchMetadata());
- ia.Add(std::move(f1));
- ia.Add(std::move(f2));
- ia.Add(std::move(f3));
- ia.Add(std::move(f4));
- ia.Add(std::move(f5));
-
- auto& name = ia.name;
+ auto ia = std::make_unique<IndexInfo>("ia", SearchMetadata());
+ ia->Add(std::move(f1));
+ ia->Add(std::move(f2));
+ ia->Add(std::move(f3));
+ ia->Add(std::move(f4));
+ ia->Add(std::move(f5));
+
+ auto& name = ia->name;
IndexMap res;
res.emplace(name, std::move(ia));
return res;
diff --git a/tests/cppunit/ir_pass_test.cc b/tests/cppunit/ir_pass_test.cc
index a02dc907..6188bb49 100644
--- a/tests/cppunit/ir_pass_test.cc
+++ b/tests/cppunit/ir_pass_test.cc
@@ -176,14 +176,14 @@ static IndexMap MakeIndexMap() {
auto f4 = FieldInfo("n2",
std::make_unique<redis::SearchNumericFieldMetadata>());
auto f5 = FieldInfo("n3",
std::make_unique<redis::SearchNumericFieldMetadata>());
f5.metadata->noindex = true;
- auto ia = IndexInfo("ia", SearchMetadata());
- ia.Add(std::move(f1));
- ia.Add(std::move(f2));
- ia.Add(std::move(f3));
- ia.Add(std::move(f4));
- ia.Add(std::move(f5));
+ auto ia = std::make_unique<IndexInfo>("ia", SearchMetadata());
+ ia->Add(std::move(f1));
+ ia->Add(std::move(f2));
+ ia->Add(std::move(f3));
+ ia->Add(std::move(f4));
+ ia->Add(std::move(f5));
- auto& name = ia.name;
+ auto& name = ia->name;
IndexMap res;
res.emplace(name, std::move(ia));
return res;
diff --git a/tests/cppunit/ir_sema_checker_test.cc
b/tests/cppunit/ir_sema_checker_test.cc
index 9068a38b..678a0a0f 100644
--- a/tests/cppunit/ir_sema_checker_test.cc
+++ b/tests/cppunit/ir_sema_checker_test.cc
@@ -38,12 +38,12 @@ static IndexMap MakeIndexMap() {
auto f1 = FieldInfo("f1", std::make_unique<redis::SearchTagFieldMetadata>());
auto f2 = FieldInfo("f2",
std::make_unique<redis::SearchNumericFieldMetadata>());
auto f3 = FieldInfo("f3",
std::make_unique<redis::SearchNumericFieldMetadata>());
- auto ia = IndexInfo("ia", SearchMetadata());
- ia.Add(std::move(f1));
- ia.Add(std::move(f2));
- ia.Add(std::move(f3));
+ auto ia = std::make_unique<IndexInfo>("ia", SearchMetadata());
+ ia->Add(std::move(f1));
+ ia->Add(std::move(f2));
+ ia->Add(std::move(f3));
- auto& name = ia.name;
+ auto& name = ia->name;
IndexMap res;
res.emplace(name, std::move(ia));
return res;
diff --git a/tests/cppunit/plan_executor_test.cc
b/tests/cppunit/plan_executor_test.cc
new file mode 100644
index 00000000..59943d19
--- /dev/null
+++ b/tests/cppunit/plan_executor_test.cc
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "search/plan_executor.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "config/config.h"
+#include "search/executors/mock_executor.h"
+#include "search/ir.h"
+#include "search/ir_plan.h"
+#include "test_base.h"
+#include "types/redis_json.h"
+
+using namespace kqir;
+
+static auto exe_end = ExecutorNode::Result(ExecutorNode::end);
+
+static IndexMap MakeIndexMap() {
+ auto f1 = FieldInfo("f1", std::make_unique<redis::SearchTagFieldMetadata>());
+ auto f2 = FieldInfo("f2",
std::make_unique<redis::SearchNumericFieldMetadata>());
+ auto f3 = FieldInfo("f3",
std::make_unique<redis::SearchNumericFieldMetadata>());
+ auto ia = std::make_unique<IndexInfo>("ia", SearchMetadata());
+ ia->ns = "search_ns";
+ ia->metadata.on_data_type = SearchOnDataType::JSON;
+ ia->prefixes.prefixes.emplace_back("test2:");
+ ia->prefixes.prefixes.emplace_back("test4:");
+ ia->Add(std::move(f1));
+ ia->Add(std::move(f2));
+ ia->Add(std::move(f3));
+
+ auto& name = ia->name;
+ IndexMap res;
+ res.emplace(name, std::move(ia));
+ return res;
+}
+
+static auto index_map = MakeIndexMap();
+
+static auto NextRow(ExecutorContext& ctx) {
+ auto n = ctx.Next();
+ EXPECT_EQ(n.Msg(), Status::ok_msg);
+ auto v = std::move(n).GetValue();
+ EXPECT_EQ(v.index(), 1);
+ return std::get<ExecutorNode::RowType>(std::move(v));
+}
+
+TEST(PlanExecutorTest, Mock) {
+ auto op = std::make_unique<Mock>(std::vector<ExecutorNode::RowType>{});
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+
+ op = std::make_unique<Mock>(std::vector<ExecutorNode::RowType>{{"a"}, {"b"},
{"c"}});
+
+ ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "a");
+ ASSERT_EQ(NextRow(ctx).key, "b");
+ ASSERT_EQ(NextRow(ctx).key, "c");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+}
+
+static auto IndexI() -> const IndexInfo* { return index_map.at("ia").get(); }
+static auto FieldI(const std::string& f) -> const FieldInfo* { return
&index_map.at("ia")->fields.at(f); }
+
+TEST(PlanExecutorTest, TopNSort) {
+ std::vector<ExecutorNode::RowType> data{
+ {"a", {{FieldI("f3"), "4"}}, IndexI()}, {"b", {{FieldI("f3"), "2"}},
IndexI()},
+ {"c", {{FieldI("f3"), "7"}}, IndexI()}, {"d", {{FieldI("f3"), "3"}},
IndexI()},
+ {"e", {{FieldI("f3"), "1"}}, IndexI()}, {"f", {{FieldI("f3"), "6"}},
IndexI()},
+ {"g", {{FieldI("f3"), "8"}}, IndexI()},
+ };
+ {
+ auto op = std::make_unique<TopNSort>(
+ std::make_unique<Mock>(data),
+ std::make_unique<SortByClause>(SortByClause::ASC,
std::make_unique<FieldRef>("f3", FieldI("f3"))),
+ std::make_unique<LimitClause>(0, 4));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "e");
+ ASSERT_EQ(NextRow(ctx).key, "b");
+ ASSERT_EQ(NextRow(ctx).key, "d");
+ ASSERT_EQ(NextRow(ctx).key, "a");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+ {
+ auto op = std::make_unique<TopNSort>(
+ std::make_unique<Mock>(data),
+ std::make_unique<SortByClause>(SortByClause::ASC,
std::make_unique<FieldRef>("f3", FieldI("f3"))),
+ std::make_unique<LimitClause>(1, 4));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "b");
+ ASSERT_EQ(NextRow(ctx).key, "d");
+ ASSERT_EQ(NextRow(ctx).key, "a");
+ ASSERT_EQ(NextRow(ctx).key, "f");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+}
+
+TEST(PlanExecutorTest, Filter) {
+ std::vector<ExecutorNode::RowType> data{
+ {"a", {{FieldI("f3"), "4"}}, IndexI()}, {"b", {{FieldI("f3"), "2"}},
IndexI()},
+ {"c", {{FieldI("f3"), "7"}}, IndexI()}, {"d", {{FieldI("f3"), "3"}},
IndexI()},
+ {"e", {{FieldI("f3"), "1"}}, IndexI()}, {"f", {{FieldI("f3"), "6"}},
IndexI()},
+ {"g", {{FieldI("f3"), "8"}}, IndexI()},
+ };
+ {
+ auto field = std::make_unique<FieldRef>("f3", FieldI("f3"));
+ auto op = std::make_unique<Filter>(
+ std::make_unique<Mock>(data),
+ AndExpr::Create(Node::List<QueryExpr>(
+ std::make_unique<NumericCompareExpr>(NumericCompareExpr::GT,
field->CloneAs<FieldRef>(),
+
std::make_unique<NumericLiteral>(2)),
+ std::make_unique<NumericCompareExpr>(NumericCompareExpr::LET,
field->CloneAs<FieldRef>(),
+
std::make_unique<NumericLiteral>(6)))));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "a");
+ ASSERT_EQ(NextRow(ctx).key, "d");
+ ASSERT_EQ(NextRow(ctx).key, "f");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+ {
+ auto field = std::make_unique<FieldRef>("f3", FieldI("f3"));
+ auto op = std::make_unique<Filter>(
+ std::make_unique<Mock>(data),
+ OrExpr::Create(Node::List<QueryExpr>(
+ std::make_unique<NumericCompareExpr>(NumericCompareExpr::GET,
field->CloneAs<FieldRef>(),
+
std::make_unique<NumericLiteral>(6)),
+ std::make_unique<NumericCompareExpr>(NumericCompareExpr::LT,
field->CloneAs<FieldRef>(),
+
std::make_unique<NumericLiteral>(2)))));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "c");
+ ASSERT_EQ(NextRow(ctx).key, "e");
+ ASSERT_EQ(NextRow(ctx).key, "f");
+ ASSERT_EQ(NextRow(ctx).key, "g");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+
+ data = {{"a", {{FieldI("f1"), "cpp,java"}}, IndexI()}, {"b",
{{FieldI("f1"), "python,cpp,c"}}, IndexI()},
+ {"c", {{FieldI("f1"), "c,perl"}}, IndexI()}, {"d",
{{FieldI("f1"), "rust,python"}}, IndexI()},
+ {"e", {{FieldI("f1"), "java,kotlin"}}, IndexI()}, {"f",
{{FieldI("f1"), "c,rust"}}, IndexI()},
+ {"g", {{FieldI("f1"), "c,cpp,java"}}, IndexI()}};
+ {
+ auto field = std::make_unique<FieldRef>("f1", FieldI("f1"));
+ auto op = std::make_unique<Filter>(
+ std::make_unique<Mock>(data),
+ AndExpr::Create(Node::List<QueryExpr>(
+ std::make_unique<TagContainExpr>(field->CloneAs<FieldRef>(),
std::make_unique<StringLiteral>("c")),
+ std::make_unique<TagContainExpr>(field->CloneAs<FieldRef>(),
std::make_unique<StringLiteral>("cpp")))));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "b");
+ ASSERT_EQ(NextRow(ctx).key, "g");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+ {
+ auto field = std::make_unique<FieldRef>("f1", FieldI("f1"));
+ auto op = std::make_unique<Filter>(
+ std::make_unique<Mock>(data),
+ OrExpr::Create(Node::List<QueryExpr>(
+ std::make_unique<TagContainExpr>(field->CloneAs<FieldRef>(),
std::make_unique<StringLiteral>("rust")),
+ std::make_unique<TagContainExpr>(field->CloneAs<FieldRef>(),
std::make_unique<StringLiteral>("perl")))));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "c");
+ ASSERT_EQ(NextRow(ctx).key, "d");
+ ASSERT_EQ(NextRow(ctx).key, "f");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+}
+
+TEST(PlanExecutorTest, Limit) {
+ std::vector<ExecutorNode::RowType> data{
+ {"a", {{FieldI("f3"), "4"}}, IndexI()}, {"b", {{FieldI("f3"), "2"}},
IndexI()},
+ {"c", {{FieldI("f3"), "7"}}, IndexI()}, {"d", {{FieldI("f3"), "3"}},
IndexI()},
+ {"e", {{FieldI("f3"), "1"}}, IndexI()}, {"f", {{FieldI("f3"), "6"}},
IndexI()},
+ {"g", {{FieldI("f3"), "8"}}, IndexI()},
+ };
+ {
+ auto op = std::make_unique<Limit>(std::make_unique<Mock>(data),
std::make_unique<LimitClause>(1, 2));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "b");
+ ASSERT_EQ(NextRow(ctx).key, "c");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+ {
+ auto field = std::make_unique<FieldRef>("f3", FieldI("f3"));
+ auto op = std::make_unique<Limit>(std::make_unique<Mock>(data),
std::make_unique<LimitClause>(0, 3));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "a");
+ ASSERT_EQ(NextRow(ctx).key, "b");
+ ASSERT_EQ(NextRow(ctx).key, "c");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+}
+
+TEST(PlanExecutorTest, Merge) {
+ std::vector<ExecutorNode::RowType> data1{
+ {"a", {{FieldI("f3"), "4"}}, IndexI()},
+ {"b", {{FieldI("f3"), "2"}}, IndexI()},
+ };
+ std::vector<ExecutorNode::RowType> data2{{"c", {{FieldI("f3"), "7"}},
IndexI()},
+ {"d", {{FieldI("f3"), "3"}},
IndexI()},
+ {"e", {{FieldI("f3"), "1"}},
IndexI()}};
+ {
+ auto op =
+
std::make_unique<Merge>(Node::List<PlanOperator>(std::make_unique<Mock>(data1),
std::make_unique<Mock>(data2)));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "a");
+ ASSERT_EQ(NextRow(ctx).key, "b");
+ ASSERT_EQ(NextRow(ctx).key, "c");
+ ASSERT_EQ(NextRow(ctx).key, "d");
+ ASSERT_EQ(NextRow(ctx).key, "e");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+ {
+ auto op = std::make_unique<Merge>(
+ Node::List<PlanOperator>(std::make_unique<Mock>(decltype(data1){}),
std::make_unique<Mock>(data1)));
+
+ auto ctx = ExecutorContext(op.get());
+ ASSERT_EQ(NextRow(ctx).key, "a");
+ ASSERT_EQ(NextRow(ctx).key, "b");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+}
+
+class PlanExecutorTestC : public TestBase {
+ protected:
+ explicit PlanExecutorTestC() :
json_(std::make_unique<redis::Json>(storage_.get(), "search_ns")) {}
+ ~PlanExecutorTestC() override = default;
+
+ void SetUp() override {}
+ void TearDown() override {}
+
+ std::unique_ptr<redis::Json> json_;
+};
+
+TEST_F(PlanExecutorTestC, FullIndexScan) {
+ json_->Set("test1:a", "$", "{}");
+ json_->Set("test1:b", "$", "{}");
+ json_->Set("test2:c", "$", "{\"f3\": 6}");
+ json_->Set("test3:d", "$", "{}");
+ json_->Set("test4:e", "$", "{\"f3\": 7}");
+ json_->Set("test4:f", "$", "{\"f3\": 2}");
+ json_->Set("test4:g", "$", "{\"f3\": 8}");
+ json_->Set("test5:h", "$", "{}");
+ json_->Set("test5:i", "$", "{}");
+ json_->Set("test5:g", "$", "{}");
+
+ {
+ auto op = std::make_unique<FullIndexScan>(std::make_unique<IndexRef>("ia",
IndexI()));
+
+ auto ctx = ExecutorContext(op.get(), storage_.get());
+ ASSERT_EQ(NextRow(ctx).key, "test2:c");
+ ASSERT_EQ(NextRow(ctx).key, "test4:e");
+ ASSERT_EQ(NextRow(ctx).key, "test4:f");
+ ASSERT_EQ(NextRow(ctx).key, "test4:g");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+
+ {
+ auto op = std::make_unique<Filter>(
+ std::make_unique<FullIndexScan>(std::make_unique<IndexRef>("ia",
IndexI())),
+ std::make_unique<NumericCompareExpr>(NumericCompareExpr::GT,
std::make_unique<FieldRef>("f3", FieldI("f3")),
+
std::make_unique<NumericLiteral>(3)));
+
+ auto ctx = ExecutorContext(op.get(), storage_.get());
+ ASSERT_EQ(NextRow(ctx).key, "test2:c");
+ ASSERT_EQ(NextRow(ctx).key, "test4:e");
+ ASSERT_EQ(NextRow(ctx).key, "test4:g");
+ ASSERT_EQ(ctx.Next().GetValue(), exe_end);
+ }
+}