This is an automated email from the ASF dual-hosted git repository.
yaozhq pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new e8382931 [ISSUE-630]optimize: extract vertex/edge projector rules
e8382931 is described below
commit e83829319fc76483fe397b2a96d56ab4a1a2a3e4
Author: hey-money <[email protected]>
AuthorDate: Thu Oct 30 17:01:38 2025 +0800
[ISSUE-630]optimize: extract vertex/edge projector rules
* 更改
* version 1.0 with `List` and passed `UT#001`~`UT#004`
* version 1.1: `Edge` encoding is implemented
* version 1.2: `RexField List` is replaced with Set()
Bug Fix: `isEmpty()` reach condition of SelectFieldPruneRule.onMatch()
* version 2.0: Handle `condition` issue but failed in #006 & #012
* version 3.0: Pass all tests
* version 3.1: Delete `笔记` folder
* version 3.2: Project fields of edges
* version 3.3: Bug fix for Null `edge` reference on multiple test suites
* test commit for email
* v5.0: Pass all tests in geaflow.dsl.runtime
* v5.1: style: format code
* Resolve conversations:
* Comments in English
* Remove Rule states
* Revert to original project files
* Use constants as field names
* Improve code readability and standards compliance:
- Create `VertexProjectorUtil` and `EdgeProjectorUtil` class to encapsulate
projection logic
- Separate optimization rules regarding field extraction into individual
files
* Add unit test for prune-field plans and switch to constant value to
declare special fields.
---
build.sh | 2 +-
.../apache/geaflow/dsl/optimize/OptimizeRules.java | 42 ++--
.../optimize/rule/GraphMatchFieldPruneRule.java | 236 +++++++++++++++++++
.../dsl/optimize/rule/ProjectFieldPruneRule.java | 262 +++++++++++++++++++++
.../org/apache/geaflow/dsl/rel/GraphMatch.java | 55 ++++-
.../apache/geaflow/dsl/rel/match/EdgeMatch.java | 31 ++-
.../geaflow/dsl/rel/match/OptionalEdgeMatch.java | 5 +-
.../apache/geaflow/dsl/rel/match/VertexMatch.java | 35 ++-
.../apache/geaflow/dsl/GQLFieldExtractorTest.java | 73 ++++++
.../java/org/apache/geaflow/dsl/PlanTester.java | 19 ++
geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml | 120 +++++-----
.../geaflow/dsl/runtime/command/QueryCommand.java | 1 -
.../dsl/runtime/traversal/StepLogicalPlan.java | 231 +++++++++---------
.../traversal/StepLogicalPlanTranslator.java | 6 +-
.../traversal/operator/AbstractStepOperator.java | 1 -
.../operator/FilteredFieldsOperator.java} | 24 +-
.../traversal/operator/MatchEdgeOperator.java | 30 ++-
.../traversal/operator/MatchVertexOperator.java | 35 ++-
.../runtime/traversal/operator/StepOperator.java | 10 +-
.../runtime/traversal/path/AbstractTreePath.java | 16 +-
.../dsl/runtime/traversal/path/EmptyTreePath.java | 2 +-
.../dsl/runtime/traversal/path/UnionTreePath.java | 10 +-
.../dsl/runtime/util/EdgeProjectorUtil.java | 176 ++++++++++++++
.../dsl/runtime/util/VertexProjectorUtil.java | 183 ++++++++++++++
.../src/test/resources/query/gql_match_001.sql | 2 +-
25 files changed, 1335 insertions(+), 272 deletions(-)
diff --git a/build.sh b/build.sh
index cc5410ca..f2a8f00c 100755
--- a/build.sh
+++ b/build.sh
@@ -241,4 +241,4 @@ if [[ -n "$BUILD_IMAGE" ]]; then
fi
fi
-echo -e "\033[32mbuild success !\033[0m"
+echo -e "\033[32mbuild success !\033[0m"
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/OptimizeRules.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/OptimizeRules.java
index 780c3514..77183e14 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/OptimizeRules.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/OptimizeRules.java
@@ -19,29 +19,27 @@
package org.apache.geaflow.dsl.optimize;
+
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.rules.*;
-import org.apache.geaflow.dsl.optimize.rule.AddVertexResetRule;
-import org.apache.geaflow.dsl.optimize.rule.FilterMatchNodeTransposeRule;
-import org.apache.geaflow.dsl.optimize.rule.FilterToMatchRule;
-import org.apache.geaflow.dsl.optimize.rule.GQLAggregateProjectMergeRule;
-import org.apache.geaflow.dsl.optimize.rule.GQLMatchUnionMergeRule;
-import org.apache.geaflow.dsl.optimize.rule.GQLProjectRemoveRule;
-import org.apache.geaflow.dsl.optimize.rule.MatchEdgeLabelFilterRemoveRule;
-import org.apache.geaflow.dsl.optimize.rule.MatchFilterMergeRule;
-import org.apache.geaflow.dsl.optimize.rule.MatchIdFilterSimplifyRule;
-import org.apache.geaflow.dsl.optimize.rule.MatchJoinMatchMergeRule;
-import org.apache.geaflow.dsl.optimize.rule.MatchJoinTableToGraphMatchRule;
-import org.apache.geaflow.dsl.optimize.rule.MatchSortToLogicalSortRule;
-import org.apache.geaflow.dsl.optimize.rule.PathInputReplaceRule;
-import org.apache.geaflow.dsl.optimize.rule.PathModifyMergeRule;
-import org.apache.geaflow.dsl.optimize.rule.PushConsecutiveJoinConditionRule;
-import org.apache.geaflow.dsl.optimize.rule.PushJoinFilterConditionRule;
-import org.apache.geaflow.dsl.optimize.rule.TableJoinMatchToGraphMatchRule;
-import org.apache.geaflow.dsl.optimize.rule.TableJoinTableToGraphRule;
-import org.apache.geaflow.dsl.optimize.rule.TableScanToGraphRule;
+import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule;
+import org.apache.calcite.rel.rules.AggregateRemoveRule;
+import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
+import org.apache.calcite.rel.rules.FilterCorrelateRule;
+import org.apache.calcite.rel.rules.FilterMergeRule;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
+import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
+import org.apache.calcite.rel.rules.ProjectMergeRule;
+import org.apache.calcite.rel.rules.ProjectSortTransposeRule;
+import org.apache.calcite.rel.rules.ProjectToWindowRule;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.rules.UnionEliminatorRule;
+import org.apache.calcite.rel.rules.UnionToDistinctRule;
+import org.apache.geaflow.dsl.optimize.rule.*;
public class OptimizeRules {
@@ -91,7 +89,9 @@ public class OptimizeRules {
MatchFilterMergeRule.INSTANCE,
TableScanToGraphRule.INSTANCE,
MatchIdFilterSimplifyRule.INSTANCE,
- MatchEdgeLabelFilterRemoveRule.INSTANCE
+ MatchEdgeLabelFilterRemoveRule.INSTANCE,
+ GraphMatchFieldPruneRule.INSTANCE,
+ ProjectFieldPruneRule.INSTANCE
);
private static final List<RelOptRule> POST_OPTIMIZE_RULES =
ImmutableList.of(
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/GraphMatchFieldPruneRule.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/GraphMatchFieldPruneRule.java
new file mode 100644
index 00000000..005d91be
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/GraphMatchFieldPruneRule.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.optimize.rule;
+
+import java.util.*;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.*;
+import org.apache.geaflow.dsl.rel.GraphMatch;
+import org.apache.geaflow.dsl.rel.PathModify.PathModifyExpression;
+import org.apache.geaflow.dsl.rel.logical.LogicalGraphMatch;
+import org.apache.geaflow.dsl.rel.match.*;
+import org.apache.geaflow.dsl.rex.PathInputRef;
+import org.apache.geaflow.dsl.rex.RexObjectConstruct;
+
+/**
+ * Rule to prune unnecessary fields within GraphMatch operations by analyzing
+ * filter conditions, path modifications, joins, and extends.
+ */
+public class GraphMatchFieldPruneRule extends RelOptRule {
+
+ public static final GraphMatchFieldPruneRule INSTANCE = new
GraphMatchFieldPruneRule();
+
+ private GraphMatchFieldPruneRule() {
+ // Match only a single LogicalGraphMatch node
+ super(operand(LogicalGraphMatch.class, any()),
"GraphMatchFieldPruneRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalGraphMatch graphMatch = call.rel(0);
+
+ // 1. Extract field access information from LogicalGraphMatch
+ Set<RexFieldAccess> filteredElements = getFilteredElements(graphMatch);
+
+ // 2. Pass the filtered field information to the path pattern
+ if (!filteredElements.isEmpty()) {
+ traverseAndPruneFields(filteredElements,
graphMatch.getPathPattern());
+ }
+ }
+
+ /**
+ * Extract filtered field elements from GraphMatch.
+ */
+ public Set<RexFieldAccess> getFilteredElements(GraphMatch graphMatch) {
+ // Recursively extract field usage from conditions in the match node
+ return extractFromMatchNode(graphMatch.getPathPattern());
+ }
+
+ /**
+ * Recursively traverse the MatchNode to extract RexFieldAccess.
+ */
+ private Set<RexFieldAccess> extractFromMatchNode(IMatchNode matchNode) {
+ Set<RexFieldAccess> allFilteredFields = new HashSet<>();
+
+ if (matchNode == null) {
+ return allFilteredFields;
+ }
+
+ // Process expressions in the current node
+ if (matchNode instanceof MatchFilter) {
+ MatchFilter filterNode = (MatchFilter) matchNode;
+ Set<RexFieldAccess> rawFields =
extractFromRexNode(filterNode.getCondition());
+ allFilteredFields.addAll(convertToPathRefs(rawFields, filterNode));
+
+ } else if (matchNode instanceof MatchPathModify) {
+ MatchPathModify pathModifyNode = (MatchPathModify) matchNode;
+ RexObjectConstruct expression =
pathModifyNode.getExpressions().get(0).getObjectConstruct();
+ Set<RexFieldAccess> rawFields = extractFromRexNode(expression);
+ allFilteredFields.addAll(convertToPathRefs(rawFields, matchNode));
+
+ } else if (matchNode instanceof MatchJoin) {
+ MatchJoin joinNode = (MatchJoin) matchNode;
+ if (joinNode.getCondition() != null) {
+ Set<RexFieldAccess> rawFields =
extractFromRexNode(joinNode.getCondition());
+ allFilteredFields.addAll(convertToPathRefs(rawFields,
joinNode));
+ }
+
+ } else if (matchNode instanceof MatchExtend) {
+ // For MatchExtend, check CAST attributes
+ MatchExtend extendNode = (MatchExtend) matchNode;
+ for (PathModifyExpression expression :
extendNode.getExpressions()) {
+ for (RexNode extendOperands :
expression.getObjectConstruct().getOperands()) {
+ if (extendOperands instanceof RexCall) {
+ // Only consider non-primitive property projections
(CAST)
+ Set<RexFieldAccess> rawFields =
extractFromRexNode(extendOperands);
+ allFilteredFields.addAll(convertToPathRefs(rawFields,
extendNode));
+ }
+ }
+ }
+ }
+
+ // Recursively process all child nodes
+ if (matchNode.getInputs() != null && !matchNode.getInputs().isEmpty())
{
+ for (RelNode input : matchNode.getInputs()) {
+ if (input instanceof IMatchNode) {
+ // Conversion is handled at leaf nodes, so no need for
convertToPathRefs here
+ allFilteredFields.addAll(extractFromMatchNode((IMatchNode)
input));
+ }
+ }
+ }
+
+ return allFilteredFields;
+ }
+
+ /**
+ * Extract RexFieldAccess from the target RexNode.
+ */
+ private Set<RexFieldAccess> extractFromRexNode(RexNode rexNode) {
+ Set<RexFieldAccess> fields = new HashSet<>();
+
+ if (rexNode instanceof RexLiteral || rexNode instanceof RexInputRef) {
+ return fields;
+ } else {
+ RexCall rexCall = (RexCall) rexNode;
+ for (RexNode operand : rexCall.getOperands()) {
+ if (operand instanceof RexFieldAccess) {
+ fields.add((RexFieldAccess) operand);
+ } else if (operand instanceof RexCall) {
+ // Recursively process nested RexCall
+ fields.addAll(extractFromRexNode(operand));
+ }
+ }
+ }
+ return fields;
+ }
+
+ /**
+ * Convert index-only field accesses to complete fields with labels.
+ */
+ private static Set<RexFieldAccess> convertToPathRefs(Set<RexFieldAccess>
fieldAccesses, RelNode node) {
+ Set<RexFieldAccess> convertedFieldAccesses = new HashSet<>();
+ RelDataType pathRecordType = node.getRowType(); // Get the record type
at current level
+ RexBuilder rexBuilder = node.getCluster().getRexBuilder(); // Builder
for creating new fields
+
+ for (RexFieldAccess fieldAccess : fieldAccesses) {
+ RexNode referenceExpr = fieldAccess.getReferenceExpr();
+
+ // Only process field accesses of input reference type
+ if (referenceExpr instanceof RexInputRef) {
+ RexInputRef inputRef = (RexInputRef) referenceExpr;
+
+ // If index exceeds field list size, it comes from a subquery,
skip it
+ if (pathRecordType.getFieldList().size() <=
inputRef.getIndex()) {
+ continue;
+ }
+
+ // Get the corresponding path field information from
PathRecordType
+ RelDataTypeField pathField =
pathRecordType.getFieldList().get(inputRef.getIndex());
+
+ // Create the actual PathInputRef
+ PathInputRef pathInputRef = new PathInputRef(
+ pathField.getName(), // Path variable name (e.g.,
"a", "b", "c")
+ pathField.getIndex(), // Field index
+ pathField.getType() // Field type
+ );
+
+ // Recreate RexFieldAccess with the new path reference
+ RexFieldAccess newFieldAccess = (RexFieldAccess)
rexBuilder.makeFieldAccess(
+ pathInputRef,
+ fieldAccess.getField().getIndex()
+ );
+ convertedFieldAccesses.add(newFieldAccess);
+ }
+ }
+
+ return convertedFieldAccesses;
+ }
+
+ /**
+ * Traverse the path pattern and add filtered fields to matching nodes.
+ */
+ private static void traverseAndPruneFields(Set<RexFieldAccess> fields,
IMatchNode pathPattern) {
+ Queue<IMatchNode> queue = new LinkedList<>(); // Queue for nodes to
visit
+ Set<IMatchNode> visited = new HashSet<>(); // Mark visited nodes
+
+ queue.offer(pathPattern);
+ visited.add(pathPattern);
+
+ // Visit all nodes in the path, and for each field: if label matches,
add the field to .fields
+ while (!queue.isEmpty()) {
+ IMatchNode currentPathPattern = queue.poll();
+
+ if (currentPathPattern instanceof VertexMatch) {
+ VertexMatch vertexMatch = (VertexMatch) currentPathPattern;
+ String vertexLabel = vertexMatch.getLabel();
+ for (RexFieldAccess fieldElement : fields) {
+ PathInputRef inputRef = (PathInputRef)
fieldElement.getReferenceExpr();
+ if (inputRef.getLabel().equals(vertexLabel)) {
+ vertexMatch.addField(fieldElement);
+ }
+ }
+ }
+
+ if (currentPathPattern instanceof EdgeMatch) {
+ EdgeMatch edgeMatch = (EdgeMatch) currentPathPattern;
+ String edgeLabel = edgeMatch.getLabel();
+ for (RexFieldAccess fieldElement : fields) {
+ PathInputRef inputRef = (PathInputRef)
fieldElement.getReferenceExpr();
+ if (inputRef.getLabel().equals(edgeLabel)) {
+ edgeMatch.addField(fieldElement);
+ }
+ }
+ }
+
+ // Iterate through possible child nodes
+ List<RelNode> inputs = currentPathPattern.getInputs();
+ for (RelNode candidateInput : inputs) {
+ if (candidateInput != null && !visited.contains((IMatchNode)
candidateInput)) {
+ queue.offer((IMatchNode) candidateInput);
+ visited.add((IMatchNode) candidateInput);
+ }
+ }
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/ProjectFieldPruneRule.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/ProjectFieldPruneRule.java
new file mode 100644
index 00000000..2281827a
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/ProjectFieldPruneRule.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.optimize.rule;
+
+import static org.apache.geaflow.dsl.common.types.EdgeType.*;
+import static
org.apache.geaflow.dsl.common.types.VertexType.DEFAULT_ID_FIELD_NAME;
+
+import java.util.*;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.*;
+import org.apache.geaflow.dsl.rel.logical.LogicalGraphMatch;
+import org.apache.geaflow.dsl.rel.match.*;
+import org.apache.geaflow.dsl.rex.PathInputRef;
+import org.apache.geaflow.dsl.rex.RexParameterRef;
+
+/**
+ * Rule to prune unnecessary fields from LogicalProject and push down field
requirements
+ * to LogicalGraphMatch.
+ */
+public class ProjectFieldPruneRule extends RelOptRule {
+
+ public static final ProjectFieldPruneRule INSTANCE = new
ProjectFieldPruneRule();
+
+ // Mapping for special field names
+ private static final Map<String, String> SPECIAL_FIELD_MAP;
+
+ static {
+ SPECIAL_FIELD_MAP = new HashMap<>();
+ SPECIAL_FIELD_MAP.put("id", DEFAULT_ID_FIELD_NAME);
+ SPECIAL_FIELD_MAP.put("label", DEFAULT_LABEL_NAME);
+ SPECIAL_FIELD_MAP.put("srcId", DEFAULT_SRC_ID_NAME );
+ SPECIAL_FIELD_MAP.put("targetId", DEFAULT_TARGET_ID_NAME);
+ }
+
+ private ProjectFieldPruneRule() {
+ super(operand(LogicalProject.class, operand(LogicalGraphMatch.class,
any())),
+ "ProjectFieldPruneRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalProject project = call.rel(0); // Get LogicalProject
+ LogicalGraphMatch graphMatch = call.rel(1); // Get
LogicalGraphMatch (direct child)
+
+ // 1. Extract field access information from LogicalProject
+ Set<RexFieldAccess> filteredElements = extractFields(project);
+
+ // 2. Pass the filtered field information to LogicalGraphMatch
+ if (!filteredElements.isEmpty()) {
+ traverseAndPruneFields(filteredElements,
graphMatch.getPathPattern());
+ }
+ }
+
+ /**
+ * Extract fields from LogicalProject and convert to semantic information
(e.g., $0.id -> a.id).
+ */
+ private Set<RexFieldAccess> extractFields(LogicalProject project) {
+ List<RexNode> fields = project.getChildExps();
+ Set<RexFieldAccess> fieldAccesses = new HashSet<>();
+
+ for (RexNode node : fields) {
+ // Recursively collect all field accesses
+ fieldAccesses.addAll(collectAllFieldAccesses(
+ project.getCluster().getRexBuilder(), node));
+ }
+
+ // Convert index-based references to label-based path references
+ return convertToPathRefs(fieldAccesses, project.getInput(0));
+ }
+
+ /**
+ * Recursively collect all RexFieldAccess nodes from a RexNode tree.
+ */
+ private static Set<RexFieldAccess> collectAllFieldAccesses(RexBuilder
rexBuilder, RexNode rootNode) {
+ Set<RexFieldAccess> fieldAccesses = new HashSet<>();
+ Queue<RexNode> queue = new LinkedList<>();
+ queue.offer(rootNode);
+
+ while (!queue.isEmpty()) {
+ RexNode node = queue.poll();
+
+ if (node instanceof RexFieldAccess) {
+ // Direct field access
+ fieldAccesses.add((RexFieldAccess) node);
+
+ } else if (node instanceof RexCall) {
+ // Custom function call, need to extract and convert elements
+ RexCall rexCall = (RexCall) node;
+
+ // Check if it's a field access type call (operand[0] is ref,
operator is field name)
+ if (rexCall.getOperands().size() > 0) {
+ RexNode ref = rexCall.getOperands().get(0);
+ String fieldName = rexCall.getOperator().getName();
+
+ // Handle special fields with mapping
+ if (SPECIAL_FIELD_MAP.containsKey(fieldName)) {
+ String mappedFieldName =
SPECIAL_FIELD_MAP.get(fieldName);
+ fieldAccesses.add((RexFieldAccess)
rexBuilder.makeFieldAccess(ref, mappedFieldName, false));
+
+ } else if (ref instanceof RexInputRef) {
+ // Other non-nested custom functions: enumerate all
fields of ref and add them all
+ RelDataType refType = ref.getType();
+ List<RelDataTypeField> refFields =
refType.getFieldList();
+
+ for (RelDataTypeField field : refFields) {
+ RexFieldAccess fieldAccess = (RexFieldAccess)
rexBuilder.makeFieldAccess(
+ ref,
+ field.getName(),
+ false
+ );
+ fieldAccesses.add(fieldAccess);
+ }
+
+ } else {
+ // ref itself might be a complex expression, continue
recursive processing
+ queue.add(ref);
+ }
+
+ // Add other operands to the queue for continued processing
+ for (int i = 1; i < rexCall.getOperands().size(); i++) {
+ queue.add(rexCall.getOperands().get(i));
+ }
+ }
+
+ } else if (node instanceof RexInputRef) {
+ // RexInputRef directly references input, enumerate all its
fields
+ RelDataType refType = node.getType();
+ List<RelDataTypeField> refFields = refType.getFieldList();
+
+ for (RelDataTypeField field : refFields) {
+ RexFieldAccess fieldAccess = (RexFieldAccess)
rexBuilder.makeFieldAccess(
+ node,
+ field.getName(),
+ false
+ );
+ fieldAccesses.add(fieldAccess);
+ }
+
+ } else if (node instanceof RexLiteral || node instanceof
RexParameterRef) {
+ // Literals, skip
+ continue;
+
+ } else {
+ // Other unknown types, can choose to throw exception or log
+ throw new IllegalArgumentException("Unsupported type: " +
node.getClass());
+ }
+ }
+
+ return fieldAccesses;
+ }
+
+ /**
+ * Convert index-only field accesses to complete fields with labels.
+ */
+ private static Set<RexFieldAccess> convertToPathRefs(Set<RexFieldAccess>
fieldAccesses, RelNode node) {
+ Set<RexFieldAccess> convertedFieldAccesses = new HashSet<>();
+ RelDataType pathRecordType = node.getRowType(); // Get the record type
at current level
+ RexBuilder rexBuilder = node.getCluster().getRexBuilder(); // Builder
for creating new fields
+
+ for (RexFieldAccess fieldAccess : fieldAccesses) {
+ RexNode referenceExpr = fieldAccess.getReferenceExpr();
+
+ // Only process field accesses of input reference type
+ if (referenceExpr instanceof RexInputRef) {
+ RexInputRef inputRef = (RexInputRef) referenceExpr;
+
+ // If index exceeds field list size, it comes from a subquery,
skip it
+ if (pathRecordType.getFieldList().size() <=
inputRef.getIndex()) {
+ continue;
+ }
+
+ // Get the corresponding path field information from
PathRecordType
+ RelDataTypeField pathField =
pathRecordType.getFieldList().get(inputRef.getIndex());
+
+ // Create the actual PathInputRef
+ PathInputRef pathInputRef = new PathInputRef(
+ pathField.getName(), // Path variable name (e.g.,
"a", "b", "c")
+ pathField.getIndex(), // Field index
+ pathField.getType() // Field type
+ );
+
+ // Recreate RexFieldAccess with the new path reference
+ RexFieldAccess newFieldAccess = (RexFieldAccess)
rexBuilder.makeFieldAccess(
+ pathInputRef,
+ fieldAccess.getField().getIndex()
+ );
+ convertedFieldAccesses.add(newFieldAccess);
+ }
+ }
+
+ return convertedFieldAccesses;
+ }
+
+ /**
+ * Traverse the path pattern and add filtered fields to matching nodes.
+ */
+ private static void traverseAndPruneFields(Set<RexFieldAccess> fields,
IMatchNode pathPattern) {
+ Queue<IMatchNode> queue = new LinkedList<>(); // Queue for nodes to
visit
+ Set<IMatchNode> visited = new HashSet<>(); // Mark visited nodes
+
+ queue.offer(pathPattern);
+ visited.add(pathPattern);
+
+ // Visit all nodes in the path, and for each field: if label matches,
add the field to .fields
+ while (!queue.isEmpty()) {
+ IMatchNode currentPathPattern = queue.poll();
+
+ if (currentPathPattern instanceof VertexMatch) {
+ VertexMatch vertexMatch = (VertexMatch) currentPathPattern;
+ String vertexLabel = vertexMatch.getLabel();
+ for (RexFieldAccess fieldElement : fields) {
+ PathInputRef inputRef = (PathInputRef)
fieldElement.getReferenceExpr();
+ if (inputRef.getLabel().equals(vertexLabel)) {
+ vertexMatch.addField(fieldElement);
+ }
+ }
+ }
+
+ if (currentPathPattern instanceof EdgeMatch) {
+ EdgeMatch edgeMatch = (EdgeMatch) currentPathPattern;
+ String edgeLabel = edgeMatch.getLabel();
+ for (RexFieldAccess fieldElement : fields) {
+ PathInputRef inputRef = (PathInputRef)
fieldElement.getReferenceExpr();
+ if (inputRef.getLabel().equals(edgeLabel)) {
+ edgeMatch.addField(fieldElement);
+ }
+ }
+ }
+
+ // Iterate through possible child nodes
+ List<RelNode> inputs = currentPathPattern.getInputs();
+ for (RelNode candidateInput : inputs) {
+ if (candidateInput != null && !visited.contains((IMatchNode)
candidateInput)) {
+ queue.offer((IMatchNode) candidateInput);
+ visited.add((IMatchNode) candidateInput);
+ }
+ }
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/GraphMatch.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/GraphMatch.java
index 125f2175..dd7b524a 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/GraphMatch.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/GraphMatch.java
@@ -19,8 +19,7 @@
package org.apache.geaflow.dsl.rel;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
@@ -30,6 +29,7 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -238,4 +238,55 @@ public abstract class GraphMatch extends SingleRel {
public RelNode accept(RelShuttle shuttle) {
return copy(traitSet, input, (IMatchNode) pathPattern.accept(shuttle),
rowType);
}
+
+ /**
+ * Returns a string representation of filtered fields in dictionary order
+ * for both graph nodes and fields within each node.
+ */
+ public String getFilteredFields() {
+ Map<String, Set<String>> nodeFieldsMap = new TreeMap<>();
+
+ Queue<IMatchNode> nodeQueue = new LinkedList<>();
+ Set<IMatchNode> visitedNodes = new HashSet<>();
+
+ nodeQueue.offer(this.pathPattern);
+ visitedNodes.add(this.pathPattern);
+
+ while (!nodeQueue.isEmpty()) {
+ IMatchNode currentNode = nodeQueue.poll();
+ String nodeLabel = null;
+ Set<RexFieldAccess> nodeFields = null;
+
+ if (currentNode instanceof VertexMatch) {
+ VertexMatch vertexMatch = (VertexMatch) currentNode;
+ nodeLabel = vertexMatch.getLabel();
+ nodeFields = vertexMatch.getFields();
+ } else if (currentNode instanceof EdgeMatch) {
+ EdgeMatch edgeMatch = (EdgeMatch) currentNode;
+ nodeLabel = edgeMatch.getLabel();
+ nodeFields = edgeMatch.getFields();
+ }
+
+ if (nodeLabel != null) {
+ Set<String> fields = nodeFieldsMap.computeIfAbsent(nodeLabel,
k -> new TreeSet<>());
+
+ if (nodeFields != null && !nodeFields.isEmpty()) {
+ for (RexFieldAccess field : nodeFields) {
+ fields.add(nodeLabel + "." +
field.getField().getName());
+ }
+ } else {
+ fields.add("null");
+ }
+ }
+
+ for (RelNode inputNode : currentNode.getInputs()) {
+ if (inputNode != null && !visitedNodes.contains((IMatchNode)
inputNode)) {
+ nodeQueue.offer((IMatchNode) inputNode);
+ visitedNodes.add((IMatchNode) inputNode);
+ }
+ }
+ }
+ return nodeFieldsMap.toString();
+ }
+
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/EdgeMatch.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/EdgeMatch.java
index ed2ca2b7..b4f6ff47 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/EdgeMatch.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/EdgeMatch.java
@@ -23,17 +23,14 @@ import static org.apache.geaflow.dsl.util.GQLRelUtil.match;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.geaflow.dsl.calcite.PathRecordType;
import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
@@ -46,6 +43,8 @@ public class EdgeMatch extends AbstractRelNode implements
SingleMatchNode, IMatc
private final String label;
+ private Set<RexFieldAccess> pushDownFields;
+
private final ImmutableSet<String> edgeTypes;
private final EdgeDirection direction;
@@ -56,7 +55,7 @@ public class EdgeMatch extends AbstractRelNode implements
SingleMatchNode, IMatc
protected EdgeMatch(RelOptCluster cluster, RelTraitSet traitSet, RelNode
input, String label,
Collection<String> edgeTypes, EdgeDirection direction,
RelDataType nodeType,
- PathRecordType pathType) {
+ PathRecordType pathType, Set<RexFieldAccess>
pushDownFields) {
super(cluster, traitSet);
this.input = input;
this.label = label;
@@ -69,6 +68,14 @@ public class EdgeMatch extends AbstractRelNode implements
SingleMatchNode, IMatc
this.rowType = Objects.requireNonNull(pathType);
this.pathType = Objects.requireNonNull(pathType);
this.nodeType = Objects.requireNonNull(nodeType);
+ this.pushDownFields = pushDownFields;
+ }
+
+ public void addField(RexFieldAccess field) {
+ if (pushDownFields == null) {
+ pushDownFields = new HashSet<>();
+ }
+ pushDownFields.add(field);
}
@Override
@@ -97,7 +104,7 @@ public class EdgeMatch extends AbstractRelNode implements
SingleMatchNode, IMatc
@Override
public IMatchNode copy(List<RelNode> inputs, PathRecordType pathSchema) {
return new EdgeMatch(getCluster(), traitSet, sole(inputs), label,
edgeTypes,
- direction, nodeType, pathSchema);
+ direction, nodeType, pathSchema, pushDownFields);
}
public EdgeDirection getDirection() {
@@ -108,7 +115,7 @@ public class EdgeMatch extends AbstractRelNode implements
SingleMatchNode, IMatc
public EdgeMatch copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.size() == 1;
return new EdgeMatch(getCluster(), getTraitSet(), sole(inputs),
- label, edgeTypes, direction, nodeType, pathType);
+ label, edgeTypes, direction, nodeType, pathType, pushDownFields);
}
@Override
@@ -136,9 +143,15 @@ public class EdgeMatch extends AbstractRelNode implements
SingleMatchNode, IMatc
EdgeDirection direction, RelDataType
nodeType,
PathRecordType pathType) {
return new EdgeMatch(cluster, cluster.traitSet(), input, label,
edgeTypes,
- direction, nodeType, pathType);
+ direction, nodeType, pathType, null);
}
+
+ public Set<RexFieldAccess> getFields() {
+ return pushDownFields;
+ }
+
+
@Override
public PathRecordType getPathSchema() {
return pathType;
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/OptionalEdgeMatch.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/OptionalEdgeMatch.java
index 0603879a..8de656e6 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/OptionalEdgeMatch.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/OptionalEdgeMatch.java
@@ -19,7 +19,9 @@
package org.apache.geaflow.dsl.rel.match;
+
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
@@ -31,13 +33,14 @@ import org.apache.geaflow.dsl.calcite.PathRecordType;
import org.apache.geaflow.dsl.rel.MatchNodeVisitor;
import org.apache.geaflow.dsl.sqlnode.SqlMatchEdge.EdgeDirection;
+
public class OptionalEdgeMatch extends EdgeMatch {
private OptionalEdgeMatch(RelOptCluster cluster, RelTraitSet traitSet,
RelNode input, String label,
Collection<String> edgeTypes, EdgeDirection
direction,
RelDataType nodeType, PathRecordType pathType) {
- super(cluster, traitSet, input, label, edgeTypes, direction, nodeType,
pathType);
+ super(cluster, traitSet, input, label, edgeTypes, direction, nodeType,
pathType, new HashSet<>());
}
@Override
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/VertexMatch.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/VertexMatch.java
index dd8049fc..b6f42b99 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/VertexMatch.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/rel/match/VertexMatch.java
@@ -23,18 +23,14 @@ import static org.apache.geaflow.dsl.util.GQLRelUtil.match;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -49,6 +45,8 @@ public class VertexMatch extends AbstractRelNode implements
SingleMatchNode, IMa
private final String label;
+ private Set<RexFieldAccess> pushDownFields;
+
private final ImmutableSet<String> vertexTypes;
private final PathRecordType pathType;
@@ -72,12 +70,13 @@ public class VertexMatch extends AbstractRelNode implements
SingleMatchNode, IMa
String label, Collection<String> vertexTypes,
RelDataType nodeType,
PathRecordType pathType, RexNode pushDownFilter) {
this(cluster, traitSet, input, label, vertexTypes, nodeType, pathType,
pushDownFilter,
- new HashSet<>());
+ new HashSet<>(), null);
}
public VertexMatch(RelOptCluster cluster, RelTraitSet traitSet, RelNode
input,
String label, Collection<String> vertexTypes,
RelDataType nodeType,
- PathRecordType pathType, RexNode pushDownFilter,
Set<Object> idSet) {
+ PathRecordType pathType, RexNode pushDownFilter,
Set<Object> idSet,
+ Set<RexFieldAccess> pushDownFields) {
super(cluster, traitSet);
this.input = input;
this.label = label;
@@ -93,6 +92,14 @@ public class VertexMatch extends AbstractRelNode implements
SingleMatchNode, IMa
this.nodeType = Objects.requireNonNull(nodeType);
this.pushDownFilter = pushDownFilter;
this.idSet = idSet;
+ this.pushDownFields = pushDownFields;
+ }
+
+ public void addField(RexFieldAccess field) {
+ if (pushDownFields == null) {
+ pushDownFields = new HashSet<>();
+ }
+ pushDownFields.add(field);
}
@Override
@@ -126,29 +133,33 @@ public class VertexMatch extends AbstractRelNode
implements SingleMatchNode, IMa
return idSet;
}
+ public Set<RexFieldAccess> getFields() {
+ return pushDownFields;
+ }
+
@Override
public SingleMatchNode copy(List<RelNode> inputs, PathRecordType
pathSchema) {
assert inputs.size() <= 1;
RelNode input = inputs.isEmpty() ? null : inputs.get(0);
return new VertexMatch(getCluster(), traitSet, input, label,
- vertexTypes, nodeType, pathSchema, pushDownFilter, idSet);
+ vertexTypes, nodeType, pathSchema, pushDownFilter, idSet,
pushDownFields);
}
@Override
public VertexMatch copy(RelTraitSet traitSet, List<RelNode> inputs) {
RelNode input = GQLRelUtil.oneInput(inputs);
return new VertexMatch(getCluster(), getTraitSet(), input,
- label, vertexTypes, nodeType, pathType, pushDownFilter, idSet);
+ label, vertexTypes, nodeType, pathType, pushDownFilter, idSet,
pushDownFields);
}
public VertexMatch copy(RexNode pushDownFilter) {
return new VertexMatch(getCluster(), getTraitSet(), input,
- label, vertexTypes, nodeType, pathType, pushDownFilter, idSet);
+ label, vertexTypes, nodeType, pathType, pushDownFilter, idSet,
pushDownFields);
}
public VertexMatch copy(Set<Object> idSet) {
return new VertexMatch(getCluster(), getTraitSet(), input,
- label, vertexTypes, nodeType, pathType, pushDownFilter, idSet);
+ label, vertexTypes, nodeType, pathType, pushDownFilter, idSet,
pushDownFields);
}
@Override
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/GQLFieldExtractorTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/GQLFieldExtractorTest.java
new file mode 100644
index 00000000..3a2ddaa2
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/GQLFieldExtractorTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl;
+
+import org.apache.geaflow.dsl.optimize.rule.GraphMatchFieldPruneRule;
+import org.apache.geaflow.dsl.optimize.rule.ProjectFieldPruneRule;
+import org.testng.annotations.Test;
+
+public class GQLFieldExtractorTest {
+
+ private static final String GRAPH_G1 = "create graph g1("
+ + "vertex user("
+ + " id bigint ID,"
+ + "name varchar"
+ + "),"
+ + "vertex person("
+ + " id bigint ID,"
+ + "name varchar,"
+ + "gender int,"
+ + "age integer"
+ + "),"
+ + "edge knows("
+ + " src_id bigint SOURCE ID,"
+ + " target_id bigint DESTINATION ID,"
+ + " time bigint TIMESTAMP,"
+ + " weight double"
+ + ")"
+ + ")";
+
+ @Test
+ public void testGraphMatchFieldPrune() {
+ PlanTester.build()
+ .registerGraph(GRAPH_G1)
+ .gql("MATCH (a:person WHERE a.age > 18)" +
+ "-[e:knows WHERE e.weight > 0.5]" +
+ "->(b:user WHERE b.id != 0 AND name like 'MARKO')\n")
+ .toRel()
+ .checkFilteredFields("{a=[null], b=[null], e=[null]}")
+ .opt(GraphMatchFieldPruneRule.INSTANCE)
+ .checkFilteredFields("{a=[a.age], b=[b.id, b.name],
e=[e.weight]}");
+ }
+
+ @Test
+ public void testProjectFieldPrune() {
+ PlanTester.build()
+ .registerGraph(GRAPH_G1)
+ .gql("MATCH (a:person)-[e:knows]->(b:user)\n" +
+ " RETURN e.src_id as src_id, e.target_id as target_id," +
+ " a.gender as a_gender, b.id as b_id")
+ .toRel()
+ .checkFilteredFields("{a=[null], b=[null], e=[null]}")
+ .opt(ProjectFieldPruneRule.INSTANCE)
+ .checkFilteredFields("{a=[a.gender], b=[b.id], e=[e.src_id,
e.target_id]}");
+ }
+
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/PlanTester.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/PlanTester.java
index 7a203431..f1f1e93b 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/PlanTester.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/test/java/org/apache/geaflow/dsl/PlanTester.java
@@ -34,6 +34,7 @@ import org.apache.geaflow.dsl.optimize.GQLOptimizer;
import org.apache.geaflow.dsl.optimize.RuleGroup;
import org.apache.geaflow.dsl.parser.GeaFlowDSLParser;
import org.apache.geaflow.dsl.planner.GQLContext;
+import org.apache.geaflow.dsl.rel.logical.LogicalGraphMatch;
import org.apache.geaflow.dsl.schema.GeaFlowGraph;
import org.apache.geaflow.dsl.schema.GeaFlowTable;
import org.apache.geaflow.dsl.sqlnode.SqlCreateGraph;
@@ -180,4 +181,22 @@ public class PlanTester {
public String getDefaultGraphDDL() {
return defaultGraphDDL;
}
+
+ public PlanTester checkFilteredFields(String expectFields) {
+ // Transverse until relNode is a LogicalGraphMatch node
+ // Only apply for simple case with unique LogicalGraphMatch node and
linear hierarchy
+ RelNode currentNode = relNode;
+
+ while (currentNode != null && !(currentNode instanceof
LogicalGraphMatch)) {
+ currentNode = currentNode.getInputs().get(0);
+ }
+ if (currentNode == null) {
+ throw new GeaFlowDSLException("No matching fields found.");
+ }
+
+ LogicalGraphMatch matchNode = (LogicalGraphMatch) currentNode;
+ String actualFields = matchNode.getFilteredFields();
+ Assert.assertEquals(actualFields, expectFields);
+ return this;
+ }
}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
index 042dfcfc..c3c34b7f 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
@@ -155,66 +155,66 @@
</dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire.version}</version>
- <configuration>
- <!-- Increase memory allocation for GeaFlow framework -->
- <argLine>-Xmx1024m -XX:MaxMetaspaceSize=512m -XX:+UseG1GC
-XX:MaxGCPauseMillis=200 -Djava.awt.headless=true
-Dfile.encoding=UTF-8</argLine>
-
- <!-- Disable parallel execution to avoid resource
conflicts -->
- <forkCount>1</forkCount>
- <reuseForks>false</reuseForks>
- <parallel>none</parallel>
- <threadCount>1</threadCount>
- <perCoreThreadCount>false</perCoreThreadCount>
-
- <!-- Increase timeout for distributed operations -->
-
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
- <testFailureIgnore>false</testFailureIgnore>
- <skipAfterFailureCount>1</skipAfterFailureCount>
-
- <!-- System properties for stability -->
- <systemPropertyVariables>
- <java.awt.headless>true</java.awt.headless>
- <file.encoding>UTF-8</file.encoding>
- <user.timezone>UTC</user.timezone>
- <geaflow.cluster.type>local</geaflow.cluster.type>
- <geaflow.debug.mode>false</geaflow.debug.mode>
- </systemPropertyVariables>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
- <profile>
- <id>integration-tests</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <!-- Increase memory allocation for GeaFlow framework -->
+ <argLine>-Xmx1024m -XX:MaxMetaspaceSize=512m -XX:+UseG1GC
-XX:MaxGCPauseMillis=200 -Djava.awt.headless=true
-Dfile.encoding=UTF-8</argLine>
+
+ <!-- Disable parallel execution to avoid resource
conflicts -->
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <parallel>none</parallel>
+ <threadCount>1</threadCount>
+ <perCoreThreadCount>false</perCoreThreadCount>
+
+ <!-- Increase timeout for distributed operations -->
+
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
+ <testFailureIgnore>false</testFailureIgnore>
+ <skipAfterFailureCount>1</skipAfterFailureCount>
+
+ <!-- System properties for stability -->
+ <systemPropertyVariables>
+ <java.awt.headless>true</java.awt.headless>
+ <file.encoding>UTF-8</file.encoding>
+ <user.timezone>UTC</user.timezone>
+ <geaflow.cluster.type>local</geaflow.cluster.type>
+ <geaflow.debug.mode>false</geaflow.debug.mode>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>integration-tests</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire.version}</version>
- <configuration>
- <argLine>-Xmx1024m -XX:MaxMetaspaceSize=512m
-XX:+UseG1GC</argLine>
- <forkCount>1</forkCount>
- <reuseForks>false</reuseForks>
- <parallel>none</parallel>
- <includes>
- <include>**/GQLInsertTest.java</include>
- </includes>
- <excludes>
- <exclude>none</exclude>
- </excludes>
-
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
+ <configuration>
+ <argLine>-Xmx1024m -XX:MaxMetaspaceSize=512m
-XX:+UseG1GC</argLine>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <parallel>none</parallel>
+ <includes>
+ <include>**/GQLInsertTest.java</include>
+ </includes>
+ <excludes>
+ <exclude>none</exclude>
+ </excludes>
+
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
-</project>
+</project>
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/command/QueryCommand.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/command/QueryCommand.java
index 5b1e9029..7b4101fb 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/command/QueryCommand.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/command/QueryCommand.java
@@ -71,7 +71,6 @@ public class QueryCommand implements IQueryCommand {
physicNode = (PhysicRelNode<?>)
context.getPathAnalyzer().analyze(physicNode);
LOGGER.info("After path analyzer:\n{}",
RelOptUtil.toString(physicNode));
-
RDataView dataView = physicNode.translate(context);
context.setCurrentResultType(physicNode.getRowType());
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/StepLogicalPlan.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/StepLogicalPlan.java
index 0bb408e2..8510ed22 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/StepLogicalPlan.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/StepLogicalPlan.java
@@ -34,6 +34,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import org.apache.calcite.rex.RexFieldAccess;
import org.apache.geaflow.common.type.IType;
import org.apache.geaflow.dsl.common.types.EdgeType;
import org.apache.geaflow.dsl.common.types.GraphSchema;
@@ -56,6 +57,7 @@ import
org.apache.geaflow.dsl.runtime.function.graph.StepPathModifyFunction;
import org.apache.geaflow.dsl.runtime.function.graph.StepSortFunction;
import org.apache.geaflow.dsl.runtime.function.graph.StepSortFunctionImpl;
import
org.apache.geaflow.dsl.runtime.function.graph.TraversalFromVertexFunction;
+import
org.apache.geaflow.dsl.runtime.traversal.operator.FilteredFieldsOperator;
import org.apache.geaflow.dsl.runtime.traversal.operator.MatchEdgeOperator;
import org.apache.geaflow.dsl.runtime.traversal.operator.MatchVertexOperator;
import
org.apache.geaflow.dsl.runtime.traversal.operator.MatchVirtualEdgeOperator;
@@ -159,38 +161,38 @@ public class StepLogicalPlan implements Serializable {
public StepLogicalPlan end() {
StepEndOperator operator = new StepEndOperator(nextPlanId());
return new StepLogicalPlan(this, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(this.getOutputPathSchema())
- .withOutputType(VoidType.INSTANCE)
- ;
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(this.getOutputPathSchema())
+ .withOutputType(VoidType.INSTANCE)
+ ;
}
public static StepLogicalPlan subQueryStart(String queryName) {
StepSubQueryStartOperator operator = new
StepSubQueryStartOperator(nextPlanId(),
- queryName);
+ queryName);
return new StepLogicalPlan(Collections.emptyList(), operator);
}
public StepLogicalPlan vertexMatch(MatchVertexFunction function) {
MatchVertexOperator operator = new MatchVertexOperator(nextPlanId(),
function);
return new StepLogicalPlan(this, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema());
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema());
}
public StepLogicalPlan edgeMatch(MatchEdgeFunction function) {
MatchEdgeOperator operator = new MatchEdgeOperator(nextPlanId(),
function);
return new StepLogicalPlan(this, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema());
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema());
}
public StepLogicalPlan virtualEdgeMatch(MatchVirtualEdgeFunction function)
{
MatchVirtualEdgeOperator operator = new
MatchVirtualEdgeOperator(nextPlanId(), function);
return new StepLogicalPlan(this, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema());
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema());
}
public StepLogicalPlan startFrom(String label) {
@@ -199,14 +201,14 @@ public class StepLogicalPlan implements Serializable {
IType<?> fieldType = getOutputPathSchema().getType(fieldIndex);
if (!(fieldType instanceof VertexType)) {
throw new IllegalArgumentException(
- "Only can start traversal from vertex, current type is: "
+ fieldType);
+ "Only can start traversal from vertex, current type
is: " + fieldType);
}
return this.virtualEdgeMatch(new
TraversalFromVertexFunction(fieldIndex, fieldType))
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(PathType.EMPTY)
-
.withOutputType(EdgeType.emptyEdge(getGraphSchema().getIdType()))
- ;
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(PathType.EMPTY)
+
.withOutputType(EdgeType.emptyEdge(getGraphSchema().getIdType()))
+ ;
} else { // start from a new label.
return this.getHeadPlan();
}
@@ -215,39 +217,39 @@ public class StepLogicalPlan implements Serializable {
public StepLogicalPlan filter(StepBoolFunction function) {
StepFilterOperator operator = new StepFilterOperator(nextPlanId(),
function);
return new StepLogicalPlan(this, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputType(this.getOutputType())
- ;
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputType(this.getOutputType())
+ ;
}
public StepLogicalPlan filterNode(StepNodeFilterFunction function) {
StepNodeFilterOperator operator = new
StepNodeFilterOperator(nextPlanId(), function);
return new StepLogicalPlan(this, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(this.getOutputPathSchema())
- .withOutputType(this.getOutputType());
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(this.getOutputPathSchema())
+ .withOutputType(this.getOutputType());
}
public StepLogicalPlan distinct(StepKeyFunction keyFunction) {
StepDistinctOperator localOperator = new
StepDistinctOperator(nextPlanId(), keyFunction);
StepLogicalPlan localDistinct = new StepLogicalPlan(this,
localOperator)
- .withName("StepLocalDistinct-" + localOperator.getId())
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(this.getOutputPathSchema())
- .withOutputType(this.getOutputType());
+ .withName("StepLocalDistinct-" + localOperator.getId())
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(this.getOutputPathSchema())
+ .withOutputType(this.getOutputType());
StepLogicalPlan exchange = localDistinct.exchange(keyFunction);
StepDistinctOperator globalOperator = new
StepDistinctOperator(nextPlanId(), keyFunction);
return new StepLogicalPlan(exchange, globalOperator)
- .withName("StepGlobalDistinct-" + globalOperator.getId())
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(this.getOutputPathSchema())
- .withOutputType(this.getOutputType());
+ .withName("StepGlobalDistinct-" + globalOperator.getId())
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(this.getOutputPathSchema())
+ .withOutputType(this.getOutputType());
}
public StepLogicalPlan loopUtil(StepLogicalPlan loopBody, StepBoolFunction
utilCondition,
@@ -263,38 +265,38 @@ public class StepLogicalPlan implements Serializable {
// append loop body the current plan.
bodyStart.setInputs(Collections.singletonList(this));
StepLoopUntilOperator operator = new StepLoopUntilOperator(
- nextPlanId(),
- bodyStart.getId(),
- loopBody.getId(),
- utilCondition,
- minLoopCount,
- maxLoopCount,
- loopStartPathFieldCount,
- loopBodyPathFieldCount);
+ nextPlanId(),
+ bodyStart.getId(),
+ loopBody.getId(),
+ utilCondition,
+ minLoopCount,
+ maxLoopCount,
+ loopStartPathFieldCount,
+ loopBodyPathFieldCount);
List<StepLogicalPlan> inputs = new ArrayList<>();
inputs.add(loopBody);
if (minLoopCount == 0) {
inputs.add(this);
}
return new StepLogicalPlan(inputs, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(loopBody.getOutputPathSchema());
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(loopBody.getOutputPathSchema());
}
public StepLogicalPlan map(StepPathModifyFunction function, boolean
isGlobal) {
StepMapOperator operator = new StepMapOperator(nextPlanId(), function,
isGlobal);
return new StepLogicalPlan(this, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- ;
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ ;
}
public StepLogicalPlan mapRow(StepMapRowFunction function) {
StepMapRowOperator operator = new StepMapRowOperator(nextPlanId(),
function);
return new StepLogicalPlan(this, operator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- ;
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ ;
}
public StepLogicalPlan union(List<StepLogicalPlan> inputs) {
@@ -304,123 +306,123 @@ public class StepLogicalPlan implements Serializable {
totalInputs.add(this);
totalInputs.addAll(inputs);
List<PathType> inputPathTypes = totalInputs.stream()
- .map(StepLogicalPlan::getOutputPathSchema)
- .collect(Collectors.toList());
+ .map(StepLogicalPlan::getOutputPathSchema)
+ .collect(Collectors.toList());
return new StepLogicalPlan(totalInputs, operator)
- .withGraphSchema(getGraphSchema())
- .withInputPathSchema(inputPathTypes)
- ;
+ .withGraphSchema(getGraphSchema())
+ .withInputPathSchema(inputPathTypes)
+ ;
}
public StepLogicalPlan exchange(StepKeyFunction keyFunction) {
StepExchangeOperator exchange = new StepExchangeOperator(nextPlanId(),
keyFunction);
return new StepLogicalPlan(this, exchange)
- .withGraphSchema(getGraphSchema())
- .withInputPathSchema(getOutputPathSchema())
- .withOutputPathSchema(getOutputPathSchema())
- .withOutputType(getOutputType())
- ;
+ .withGraphSchema(getGraphSchema())
+ .withInputPathSchema(getOutputPathSchema())
+ .withOutputPathSchema(getOutputPathSchema())
+ .withOutputType(getOutputType())
+ ;
}
public StepLogicalPlan localExchange(StepKeyFunction keyFunction) {
StepLocalExchangeOperator exchange = new
StepLocalExchangeOperator(nextPlanId(), keyFunction);
return new StepLogicalPlan(this, exchange)
- .withGraphSchema(getGraphSchema())
- .withInputPathSchema(getOutputPathSchema())
- .withOutputPathSchema(getOutputPathSchema())
- .withOutputType(getOutputType())
- ;
+ .withGraphSchema(getGraphSchema())
+ .withInputPathSchema(getOutputPathSchema())
+ .withOutputPathSchema(getOutputPathSchema())
+ .withOutputType(getOutputType())
+ ;
}
public StepLogicalPlan join(StepLogicalPlan right, StepKeyFunction leftKey,
StepKeyFunction rightKey, StepJoinFunction
joinFunction,
PathType inputJoinPathSchema, boolean
isLocalJoin) {
StepLogicalPlan leftExchange = isLocalJoin
- ? this.localExchange(leftKey) : this.exchange(leftKey);
+ ? this.localExchange(leftKey) : this.exchange(leftKey);
StepLogicalPlan rightExchange = isLocalJoin
- ? right.localExchange(rightKey) : right.exchange(rightKey);
+ ? right.localExchange(rightKey) : right.exchange(rightKey);
List<PathType> joinInputPaths =
Lists.newArrayList(leftExchange.getOutputPathSchema(),
- rightExchange.getOutputPathSchema());
+ rightExchange.getOutputPathSchema());
StepJoinOperator joinOperator = new StepJoinOperator(nextPlanId(),
joinFunction,
- inputJoinPathSchema, joinInputPaths, isLocalJoin);
+ inputJoinPathSchema, joinInputPaths, isLocalJoin);
return new StepLogicalPlan(Lists.newArrayList(leftExchange,
rightExchange), joinOperator)
- .withGraphSchema(getGraphSchema())
- .withInputPathSchema(joinInputPaths)
-
.withOutputType(VertexType.emptyVertex(getGraphSchema().getIdType()))
- ;
+ .withGraphSchema(getGraphSchema())
+ .withInputPathSchema(joinInputPaths)
+
.withOutputType(VertexType.emptyVertex(getGraphSchema().getIdType()))
+ ;
}
public StepLogicalPlan sort(StepSortFunction sortFunction) {
StepSortOperator localSortOperator = new
StepSortOperator(nextPlanId(), sortFunction);
StepLogicalPlan localSortPlan = new StepLogicalPlan(this,
localSortOperator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(this.getOutputPathSchema())
- .withOutputType(this.getOutputType());
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(this.getOutputPathSchema())
+ .withOutputType(this.getOutputType());
StepLogicalPlan exchangePlan = localSortPlan.exchange(new
StepKeyFunctionImpl(new int[0], new IType[0]));
StepSortFunction globalSortFunction = ((StepSortFunctionImpl)
sortFunction).copy(true);
StepGlobalSortOperator globalSortOperator = new
StepGlobalSortOperator(nextPlanId(),
- globalSortFunction, this.getOutputType(),
this.getOutputPathSchema());
+ globalSortFunction, this.getOutputType(),
this.getOutputPathSchema());
return new StepLogicalPlan(exchangePlan, globalSortOperator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(this.getOutputPathSchema())
- .withOutputType(this.getOutputType());
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(this.getOutputPathSchema())
+ .withOutputType(this.getOutputType());
}
public StepLogicalPlan aggregate(StepAggregateFunction aggFunction) {
StepLocalSingleValueAggregateOperator localAggOp = new
StepLocalSingleValueAggregateOperator(nextPlanId(), aggFunction);
IType<?> localAggOutputType = ObjectType.INSTANCE;
StepLogicalPlan localAggPlan = new StepLogicalPlan(this, localAggOp)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(PathType.EMPTY)
- .withOutputType(StructType.singleValue(localAggOutputType, false));
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(PathType.EMPTY)
+ .withOutputType(StructType.singleValue(localAggOutputType,
false));
StepLogicalPlan exchangePlan = localAggPlan.exchange(new
StepKeyFunctionImpl(new int[0], new IType[0]));
StepGlobalSingleValueAggregateOperator globalAggOp = new
StepGlobalSingleValueAggregateOperator(nextPlanId(), localAggOutputType,
- aggFunction);
+ aggFunction);
return new StepLogicalPlan(exchangePlan, globalAggOp)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(exchangePlan.getOutputPathSchema())
- .withOutputPathSchema(PathType.EMPTY)
- ;
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(exchangePlan.getOutputPathSchema())
+ .withOutputPathSchema(PathType.EMPTY)
+ ;
}
public StepLogicalPlan aggregate(PathType inputPath, PathType outputPath,
StepKeyFunction keyFunction,
StepAggregateFunction aggFn) {
StepLocalAggregateOperator localAggOp = new
StepLocalAggregateOperator(nextPlanId(),
- keyFunction, aggFn);
+ keyFunction, aggFn);
StepLogicalPlan localAggPlan = new StepLogicalPlan(this, localAggOp)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(inputPath)
- .withOutputPathSchema(inputPath)
- .withOutputType(getOutputType());
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(inputPath)
+ .withOutputPathSchema(inputPath)
+ .withOutputType(getOutputType());
StepLogicalPlan exchangePlan = localAggPlan.exchange(keyFunction);
StepGlobalAggregateOperator globalAggOp = new
StepGlobalAggregateOperator(nextPlanId(),
- keyFunction, aggFn);
+ keyFunction, aggFn);
return new StepLogicalPlan(exchangePlan, globalAggOp)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(outputPath)
- .withOutputPathSchema(outputPath)
- .withOutputType(this.getOutputType())
- ;
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(outputPath)
+ .withOutputPathSchema(outputPath)
+ .withOutputType(this.getOutputType())
+ ;
}
public StepLogicalPlan ret() {
StepReturnOperator returnOperator = new
StepReturnOperator(nextPlanId());
return new StepLogicalPlan(this, returnOperator)
- .withGraphSchema(this.getGraphSchema())
- .withInputPathSchema(this.getOutputPathSchema())
- .withOutputPathSchema(this.getOutputPathSchema())
- .withOutputType(this.getOutputType())
- ;
+ .withGraphSchema(this.getGraphSchema())
+ .withInputPathSchema(this.getOutputPathSchema())
+ .withOutputPathSchema(this.getOutputPathSchema())
+ .withOutputType(this.getOutputType())
+ ;
}
private void addOutput(StepLogicalPlan output) {
@@ -428,6 +430,13 @@ public class StepLogicalPlan implements Serializable {
outputs.add(output);
}
+ public StepLogicalPlan withFilteredFields(Set<RexFieldAccess> fields) {
+ if (operator instanceof FilteredFieldsOperator) {
+ ((FilteredFieldsOperator) operator).withFilteredFields(fields);
+ }
+ return this;
+ }
+
public StepLogicalPlan withName(String name) {
operator.withName(name);
return this;
@@ -597,8 +606,8 @@ public class StepLogicalPlan implements Serializable {
private StepLogicalPlan copy(Map<Long, StepLogicalPlan> copyPlanCache) {
List<StepLogicalPlan> inputsCopy = inputs.stream()
- .map(input -> input.copy(copyPlanCache))
- .collect(Collectors.toList());
+ .map(input -> input.copy(copyPlanCache))
+ .collect(Collectors.toList());
if (copyPlanCache.containsKey(getId())) {
return copyPlanCache.get(getId());
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/StepLogicalPlanTranslator.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/StepLogicalPlanTranslator.java
index 621c8e2d..ea74d2d2 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/StepLogicalPlanTranslator.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/StepLogicalPlanTranslator.java
@@ -232,7 +232,8 @@ public class StepLogicalPlanTranslator {
StepLogicalPlan plan = input.vertexMatch(mvf)
.withModifyGraphSchema(input.getModifyGraphSchema())
.withOutputPathSchema(outputPath)
- .withOutputType(nodeType);
+ .withOutputType(nodeType)
+ .withFilteredFields(vertexMatch.getFields());
planCache.put(label, plan);
return plan;
}
@@ -285,7 +286,8 @@ public class StepLogicalPlanTranslator {
StepLogicalPlan plan = input.edgeMatch(mef)
.withModifyGraphSchema(input.getModifyGraphSchema())
.withOutputPathSchema(outputPath)
- .withOutputType(nodeType);
+ .withOutputType(nodeType)
+ .withFilteredFields(edgeMatch.getFields());;
planCache.put(label, plan);
return plan;
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/AbstractStepOperator.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/AbstractStepOperator.java
index 2aa75bd5..4686bca3 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/AbstractStepOperator.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/AbstractStepOperator.java
@@ -86,7 +86,6 @@ public abstract class AbstractStepOperator<FUNC extends
StepFunction, IN extends
protected final long id;
protected String name;
-
protected final FUNC function;
private final Map<Long, List<EndOfData>> caller2ReceiveEods = new
HashMap<>();
protected List<PathType> inputPathSchemas;
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_match_001.sql
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/FilteredFieldsOperator.java
similarity index 71%
copy from
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_match_001.sql
copy to
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/FilteredFieldsOperator.java
index c0f69388..55c42731 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_match_001.sql
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/FilteredFieldsOperator.java
@@ -17,23 +17,11 @@
* under the License.
*/
-CREATE TABLE tbl_result (
- a_id bigint,
- weight double,
- b_id bigint
-) WITH (
- type='file',
- geaflow.dsl.file.path='${target}'
-);
+package org.apache.geaflow.dsl.runtime.traversal.operator;
-USE GRAPH modern;
+import java.util.Set;
+import org.apache.calcite.rex.RexFieldAccess;
-INSERT INTO tbl_result
-SELECT
- a_id,
- weight,
- b_id
-FROM (
- MATCH (a) -[e:knows]->(b:person where b.id != 1)
- RETURN a.id as a_id, e.weight as weight, b.id as b_id
-)
\ No newline at end of file
+public interface FilteredFieldsOperator {
+ StepOperator<?, ?> withFilteredFields(Set<RexFieldAccess> fields);
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchEdgeOperator.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchEdgeOperator.java
index 754bc759..374f618f 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchEdgeOperator.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchEdgeOperator.java
@@ -19,8 +19,8 @@
package org.apache.geaflow.dsl.runtime.traversal.operator;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
+import org.apache.calcite.rex.RexFieldAccess;
import org.apache.geaflow.dsl.common.data.RowEdge;
import org.apache.geaflow.dsl.runtime.function.graph.MatchEdgeFunction;
import org.apache.geaflow.dsl.runtime.function.graph.MatchEdgeFunctionImpl;
@@ -29,18 +29,29 @@ import
org.apache.geaflow.dsl.runtime.traversal.data.EdgeGroup;
import org.apache.geaflow.dsl.runtime.traversal.data.EdgeGroupRecord;
import org.apache.geaflow.dsl.runtime.traversal.data.VertexRecord;
import org.apache.geaflow.dsl.runtime.traversal.path.ITreePath;
+import org.apache.geaflow.dsl.runtime.util.EdgeProjectorUtil;
import org.apache.geaflow.dsl.sqlnode.SqlMatchEdge.EdgeDirection;
import org.apache.geaflow.metrics.common.MetricNameFormatter;
import org.apache.geaflow.metrics.common.api.Histogram;
public class MatchEdgeOperator extends AbstractStepOperator<MatchEdgeFunction,
VertexRecord, EdgeGroupRecord>
- implements LabeledStepOperator {
+ implements FilteredFieldsOperator, LabeledStepOperator {
private Histogram loadEdgeHg;
private Histogram loadEdgeRt;
private final boolean isOptionMatch;
+ private Set<RexFieldAccess> fields;
+
+ private EdgeProjectorUtil edgeProjector = null;
+
+ @Override
+ public StepOperator<VertexRecord, EdgeGroupRecord>
withFilteredFields(Set<RexFieldAccess> fields) {
+ this.fields = fields;
+ return this;
+ }
+
public MatchEdgeOperator(long id, MatchEdgeFunction function) {
super(id, function);
isOptionMatch = function instanceof MatchEdgeFunctionImpl
@@ -64,13 +75,26 @@ public class MatchEdgeOperator extends
AbstractStepOperator<MatchEdgeFunction, V
EdgeGroup edgeGroup = loadEdges;
if (!function.getEdgeTypes().isEmpty()) {
edgeGroup = loadEdges.filter(edge ->
+
function.getEdgeTypes().contains(edge.getBinaryLabel()));
}
Map<Object, ITreePath> targetTreePaths = new HashMap<>();
+
// generate new paths.
if (needAddToPath) {
int numEdge = 0;
for (RowEdge edge : edgeGroup) {
+ if (edgeProjector == null) {
+ edgeProjector = new EdgeProjectorUtil(
+ graphSchema,
+ fields,
+ getOutputType()
+ );
+ }
+ if (fields != null && !fields.isEmpty()) {
+ edge = edgeProjector.projectEdge(edge);
+ }
+
// add edge to path.
if (!targetTreePaths.containsKey(edge.getTargetId())) {
ITreePath newPath = vertex.getTreePath().extendTo(edge);
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchVertexOperator.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchVertexOperator.java
index 8ae305ef..47c84535 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchVertexOperator.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchVertexOperator.java
@@ -19,12 +19,10 @@
package org.apache.geaflow.dsl.runtime.traversal.operator;
-import java.util.Set;
-import org.apache.geaflow.dsl.common.data.RowEdge;
-import org.apache.geaflow.dsl.common.data.RowVertex;
-import org.apache.geaflow.dsl.common.data.StepRecord;
+import java.util.*;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.geaflow.dsl.common.data.*;
import org.apache.geaflow.dsl.common.data.StepRecord.StepRecordType;
-import org.apache.geaflow.dsl.common.data.VirtualId;
import org.apache.geaflow.dsl.common.data.impl.VertexEdgeFactory;
import org.apache.geaflow.dsl.common.types.VertexType;
import org.apache.geaflow.dsl.runtime.function.graph.MatchVertexFunction;
@@ -35,11 +33,12 @@ import
org.apache.geaflow.dsl.runtime.traversal.data.EdgeGroupRecord;
import org.apache.geaflow.dsl.runtime.traversal.data.IdOnlyVertex;
import org.apache.geaflow.dsl.runtime.traversal.data.VertexRecord;
import org.apache.geaflow.dsl.runtime.traversal.path.ITreePath;
+import org.apache.geaflow.dsl.runtime.util.VertexProjectorUtil;
import org.apache.geaflow.metrics.common.MetricNameFormatter;
import org.apache.geaflow.metrics.common.api.Histogram;
public class MatchVertexOperator extends
AbstractStepOperator<MatchVertexFunction, StepRecord,
- VertexRecord> implements LabeledStepOperator {
+ VertexRecord> implements FilteredFieldsOperator, LabeledStepOperator {
private Histogram loadVertexRt;
@@ -47,6 +46,16 @@ public class MatchVertexOperator extends
AbstractStepOperator<MatchVertexFunctio
private Set<Object> idSet;
+ private Set<RexFieldAccess> fields;
+
+ private VertexProjectorUtil vertexProjector = null;
+
+ @Override
+ public StepOperator<StepRecord, VertexRecord>
withFilteredFields(Set<RexFieldAccess> fields) {
+ this.fields = fields;
+ return this;
+ }
+
public MatchVertexOperator(long id, MatchVertexFunction function) {
super(id, function);
if (function instanceof MatchVertexFunctionImpl) {
@@ -74,6 +83,7 @@ public class MatchVertexOperator extends
AbstractStepOperator<MatchVertexFunctio
}
}
+
private void processVertex(VertexRecord vertexRecord) {
RowVertex vertex = vertexRecord.getVertex();
if (vertex instanceof IdOnlyVertex && needLoadVertex(vertex.getId())) {
@@ -83,6 +93,19 @@ public class MatchVertexOperator extends
AbstractStepOperator<MatchVertexFunctio
graphSchema,
addingVertexFieldTypes);
loadVertexRt.update(System.currentTimeMillis() - startTs);
+
+ if (vertexProjector == null) {
+ vertexProjector = new VertexProjectorUtil(
+ graphSchema,
+ fields,
+ addingVertexFieldNames,
+ addingVertexFieldTypes
+ );
+ }
+ if (fields != null && !fields.isEmpty()) {
+ vertex = vertexProjector.projectVertex(vertex);
+ }
+
if (vertex == null && !isOptionMatch) {
// load a non-exists vertex, just skip.
return;
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/StepOperator.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/StepOperator.java
index 48fdeabf..f0496942 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/StepOperator.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/StepOperator.java
@@ -20,9 +20,7 @@
package org.apache.geaflow.dsl.runtime.traversal.operator;
import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
import org.apache.geaflow.common.type.IType;
import org.apache.geaflow.dsl.common.data.StepRecord;
import org.apache.geaflow.dsl.common.types.GraphSchema;
@@ -43,14 +41,12 @@ public interface StepOperator<IN extends StepRecord, OUT
extends StepRecord> ext
/**
* The init method for step operator.
- *
* @param context The context for traversal.
*/
void open(TraversalRuntimeContext context);
/**
* Process input record.
- *
* @param record The input record.
*/
void process(IN record);
@@ -67,14 +63,12 @@ public interface StepOperator<IN extends StepRecord, OUT
extends StepRecord> ext
/**
* Set the output path schema for the operator.
- *
* @param outputPath The output path schema.
*/
StepOperator<IN, OUT> withOutputPathSchema(PathType outputPath);
/**
* Set the input path schema for the operator.
- *
* @param inputPaths The input path schemas for each input.
*/
StepOperator<IN, OUT> withInputPathSchema(List<PathType> inputPaths);
@@ -87,14 +81,12 @@ public interface StepOperator<IN extends StepRecord, OUT
extends StepRecord> ext
/**
* Set the origin graph schema.
- *
* @param graphSchema The origin graph schema defined in the DDL.
*/
StepOperator<IN, OUT> withGraphSchema(GraphSchema graphSchema);
/**
* Set the modified graph schema after the let-global-statement.
- *
* @param modifyGraphSchema The modified graph schema.
*/
StepOperator<IN, OUT> withModifyGraphSchema(GraphSchema modifyGraphSchema);
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/AbstractTreePath.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/AbstractTreePath.java
index 8e7e739f..d5a9e6c5 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/AbstractTreePath.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/AbstractTreePath.java
@@ -63,10 +63,10 @@ public abstract class AbstractTreePath implements ITreePath
{
return this;
}
if (other.getNodeType() != getNodeType()
- && !(getNodeType() == NodeType.VERTEX_TREE && getVertex() == null)
- && !(other.getNodeType() == NodeType.VERTEX_TREE &&
other.getVertex() == null)) {
+ && !(getNodeType() == NodeType.VERTEX_TREE && getVertex() ==
null)
+ && !(other.getNodeType() == NodeType.VERTEX_TREE &&
other.getVertex() == null)) {
throw new GeaFlowDSLException("Merge with different tree kinds: "
+ getNodeType()
- + " and " + other.getNodeType());
+ + " and " + other.getNodeType());
}
if (this.equalNode(other) && getDepth() == other.getDepth()) {
@@ -105,8 +105,8 @@ public abstract class AbstractTreePath implements ITreePath
{
if (parent.getNodeType() == NodeType.EDGE_TREE) {
for (ITreePath mergedParent : mergedParents) {
if (mergedParent.getNodeType() == NodeType.EDGE_TREE
- && mergedParent.getEdgeSet().like(parent.getEdgeSet())
- && Objects.equals(mergedParent.getParents(),
parent.getParents())) {
+ && mergedParent.getEdgeSet().like(parent.getEdgeSet())
+ && Objects.equals(mergedParent.getParents(),
parent.getParents())) {
mergedParent.getEdgeSet().addEdges(parent.getEdgeSet());
hasMerged = true;
break;
@@ -205,7 +205,7 @@ public abstract class AbstractTreePath implements ITreePath
{
} else if (parentSize >= 1) {
for (ITreePath parent : getParents()) {
ITreePath filterTree = ((AbstractTreePath)
parent).filter(filterFunction,
- refPathIndices, fieldMapping, currentPath,
maxDepth, pathId);
+ refPathIndices, fieldMapping,
currentPath, maxDepth, pathId);
if (!filterTree.isEmpty()) {
filterTrees.add(filterTree.extendTo(edge));
}
@@ -239,7 +239,7 @@ public abstract class AbstractTreePath implements ITreePath
{
List<ITreePath> filterParents = new ArrayList<>(parentSize);
for (ITreePath parent : getParents()) {
ITreePath filterTree = ((AbstractTreePath)
parent).filter(filterFunction, refPathIndices,
- fieldMapping, currentPath, maxDepth, pathId);
+ fieldMapping, currentPath, maxDepth, pathId);
if (!filterTree.isEmpty()) {
filterParents.add(filterTree);
}
@@ -480,4 +480,4 @@ public abstract class AbstractTreePath implements ITreePath
{
public ITreePath extendTo(RowVertex vertex) {
return extendTo(null, vertex);
}
-}
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/EmptyTreePath.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/EmptyTreePath.java
index 1748c6b8..5f81b9ef 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/EmptyTreePath.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/EmptyTreePath.java
@@ -151,4 +151,4 @@ public class EmptyTreePath extends AbstractSingleTreePath
implements KryoSeriali
// no fields to deserialize
}
-}
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/UnionTreePath.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/UnionTreePath.java
index 2c7f1131..0f59e411 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/UnionTreePath.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/path/UnionTreePath.java
@@ -45,7 +45,7 @@ public class UnionTreePath extends AbstractTreePath {
public static ITreePath create(List<ITreePath> nodes) {
List<ITreePath> notEmptyTreePath =
- Objects.requireNonNull(nodes).stream().filter(n -> n.getNodeType()
!= NodeType.EMPTY_TREE).collect(Collectors.toList());
+ Objects.requireNonNull(nodes).stream().filter(n ->
n.getNodeType() != NodeType.EMPTY_TREE).collect(Collectors.toList());
if (notEmptyTreePath.isEmpty()) {
return EmptyTreePath.of();
} else if (notEmptyTreePath.size() == 1) {
@@ -126,8 +126,8 @@ public class UnionTreePath extends AbstractTreePath {
if (node.getNodeType() == NodeType.EDGE_TREE) {
for (ITreePath thisNode : nodes) {
if (thisNode.getNodeType() == NodeType.EDGE_TREE
- && thisNode.getEdgeSet().like(node.getEdgeSet())
- && Objects.equals(thisNode.getParents(),
node.getParents())) {
+ &&
thisNode.getEdgeSet().like(node.getEdgeSet())
+ && Objects.equals(thisNode.getParents(),
node.getParents())) {
thisNode.getEdgeSet().addEdges(node.getEdgeSet());
hasMerged = true;
break;
@@ -274,7 +274,7 @@ public class UnionTreePath extends AbstractTreePath {
List<ITreePath> filterNodes = new ArrayList<>();
for (ITreePath node : nodes) {
ITreePath filterNode = ((AbstractTreePath)
node).filter(filterFunction,
- refPathIndices, fieldMapping, currentPath, maxDepth, pathId);
+ refPathIndices, fieldMapping, currentPath, maxDepth,
pathId);
if (filterNode != null) {
filterNodes.add(filterNode);
}
@@ -351,4 +351,4 @@ public class UnionTreePath extends AbstractTreePath {
}
}
-}
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/util/EdgeProjectorUtil.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/util/EdgeProjectorUtil.java
new file mode 100644
index 00000000..fbd1052d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/util/EdgeProjectorUtil.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.util;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.dsl.common.binary.encoder.DefaultEdgeEncoder;
+import org.apache.geaflow.dsl.common.binary.encoder.EdgeEncoder;
+import org.apache.geaflow.dsl.common.data.RowEdge;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.types.EdgeType;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.runtime.expression.*;
+import
org.apache.geaflow.dsl.runtime.expression.construct.EdgeConstructExpression;
+import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
+import org.apache.geaflow.dsl.runtime.expression.literal.LiteralExpression;
+import org.apache.geaflow.dsl.runtime.function.table.ProjectFunction;
+import org.apache.geaflow.dsl.runtime.function.table.ProjectFunctionImpl;
+
+/**
+ * Utility class for projecting edges with field pruning.
+ */
+public class EdgeProjectorUtil {
+
+ private static final String SOURCE_ID = "srcId";
+ private static final String TARGET_ID = "targetId";
+ private static final String LABEL = "~label";
+
+ private final Map<String, ProjectFunction> projectFunctions;
+ private final Map<String, List<TableField>> tableOutputTypes;
+ private final GraphSchema graphSchema;
+ private final Set<RexFieldAccess> fields;
+ private final IType<?> outputType;
+
+ /**
+ * Constructs an EdgeProjector with specified parameters.
+ *
+ * @param graphSchema The graph schema containing all vertex and edge type
definitions
+ * @param fields The set of fields to be included in the projection, null
means no filtering
+ * @param outputType The output type of the edge, must be an EdgeType
+ */
+ public EdgeProjectorUtil(GraphSchema graphSchema,
+ Set<RexFieldAccess> fields,
+ IType<?> outputType) {
+ this.graphSchema = graphSchema;
+ this.fields = fields;
+ this.outputType = outputType;
+ this.projectFunctions = new HashMap<>();
+ this.tableOutputTypes = new HashMap<>();
+
+ if (!(outputType instanceof EdgeType)) {
+ throw new IllegalArgumentException("Unsupported type: " +
outputType.getClass());
+ }
+ }
+
+ /**
+ * Projects an edge by filtering fields based on the required field set.
+ *
+ * @param edge The input edge to be projected
+ * @return The projected edge with only required fields, or null if input
is null
+ */
+ public RowEdge projectEdge(RowEdge edge) {
+ if (edge == null) {
+ return null;
+ }
+
+ String edgeLabel = edge.getLabel();
+
+ // Initialize project function for this edge label if not exists
+ if (this.projectFunctions.get(edgeLabel) == null) {
+ initializeProject(
+ edge, // edge: The edge instance used for schema
inference
+ edgeLabel // edgeLabel: The label of the edge for unique
identification
+ );
+ }
+
+ // Utilize project functions to filter fields
+ ProjectFunction currentProjectFunction =
this.projectFunctions.get(edgeLabel);
+ ObjectRow projectEdge = (ObjectRow)
currentProjectFunction.project(edge);
+ RowEdge edgeDecoded = (RowEdge) projectEdge.getField(0, null);
+
+ EdgeType edgeType = new EdgeType(this.tableOutputTypes.get(edgeLabel),
false);
+ EdgeEncoder encoder = new DefaultEdgeEncoder(edgeType);
+ return encoder.encode(edgeDecoded);
+ }
+
+ /**
+ * Initializes the project function for a given edge label.
+ *
+ * @param edge The edge instance used to determine the schema and label
+ * @param edgeLabel The label of the edge for unique identification
+ */
+ private void initializeProject(RowEdge edge, String edgeLabel) {
+ List<TableField> graphSchemaFieldList = graphSchema.getFields();
+
+ // Get fields of the output edge type
+ List<TableField> fieldsOfTable = ((EdgeType) outputType).getFields();
+
+ // Extract field names from RexFieldAccess list into a set
+ Set<String> fieldNames = (this.fields == null)
+ ? Collections.emptySet()
+ : this.fields.stream()
+ .map(e -> e.getField().getName())
+ .collect(Collectors.toSet());
+
+ List<Expression> expressions = new ArrayList<>();
+ List<TableField> tableOutputType = null;
+
+ // Enumerate list of fields in every table
+ for (TableField tableField : graphSchemaFieldList) {
+ if (edgeLabel.equals(tableField.getName())) {
+ List<Expression> inputs = new ArrayList<>();
+ tableOutputType = new ArrayList<>();
+
+ // Enumerate list of fields in the targeted table
+ for (int i = 0; i < fieldsOfTable.size(); i++) {
+ TableField column = fieldsOfTable.get(i);
+ String columnName = column.getName();
+
+ // Normalize: convert fields like `knowsCreationDate` to
`creationDate`
+ if (columnName.startsWith(edgeLabel)) {
+ String suffix =
columnName.substring(edgeLabel.length());
+ if (!suffix.isEmpty()) {
+ suffix = Character.toLowerCase(suffix.charAt(0)) +
suffix.substring(1);
+ columnName = suffix;
+ }
+ }
+
+ if (fieldNames.contains(columnName)
+ || columnName.equals(SOURCE_ID)
+ || columnName.equals(TARGET_ID)) {
+ // Include a field if it's in fieldNames or is
source/target ID column
+ inputs.add(new FieldExpression(null, i,
column.getType()));
+ tableOutputType.add(column);
+ } else if (columnName.equals(LABEL)) {
+ // Add edge label for LABEL column
+ inputs.add(new LiteralExpression(edge.getLabel(),
column.getType()));
+ tableOutputType.add(column);
+ } else {
+ // Use null placeholder for excluded fields
+ inputs.add(new LiteralExpression(null,
column.getType()));
+ tableOutputType.add(column);
+ }
+ }
+
+ expressions.add(new EdgeConstructExpression(inputs, new
EdgeType(tableOutputType, false)));
+ }
+ }
+
+ ProjectFunction projectFunction = new ProjectFunctionImpl(expressions);
+
+ // Store project function and output type for this edge label
+ this.projectFunctions.put(edgeLabel, projectFunction);
+ this.tableOutputTypes.put(edgeLabel, tableOutputType);
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/util/VertexProjectorUtil.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/util/VertexProjectorUtil.java
new file mode 100644
index 00000000..1e87ac6f
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/util/VertexProjectorUtil.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.util;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.dsl.common.binary.encoder.DefaultVertexEncoder;
+import org.apache.geaflow.dsl.common.binary.encoder.VertexEncoder;
+import org.apache.geaflow.dsl.common.data.RowVertex;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.types.GraphSchema;
+import org.apache.geaflow.dsl.common.types.TableField;
+import org.apache.geaflow.dsl.common.types.VertexType;
+import org.apache.geaflow.dsl.runtime.expression.*;
+import
org.apache.geaflow.dsl.runtime.expression.construct.VertexConstructExpression;
+import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
+import org.apache.geaflow.dsl.runtime.expression.literal.LiteralExpression;
+import org.apache.geaflow.dsl.runtime.function.table.ProjectFunction;
+import org.apache.geaflow.dsl.runtime.function.table.ProjectFunctionImpl;
+
+/**
+ * Utility class for projecting vertices with field pruning.
+ */
+public class VertexProjectorUtil {
+
+ private static final String ID = "id";
+ private static final String LABEL = "~label";
+
+ private final Map<String, ProjectFunction> projectFunctions;
+ private final Map<String, List<TableField>> tableOutputTypes;
+ private final GraphSchema graphSchema;
+ private final Set<RexFieldAccess> fields;
+ private final String[] addingVertexFieldNames;
+ private final IType<?>[] addingVertexFieldTypes;
+
+ /**
+ * Constructs a VertexProjector with specified parameters.
+ *
+ * @param graphSchema The graph schema containing all vertex and edge type
definitions
+ * @param fields The set of fields to be included in the projection, null
means no filtering
+ * @param addingVertexFieldNames The names of additional fields to be
added to vertices (e.g., global variables)
+ * @param addingVertexFieldTypes The types of additional fields
corresponding to addingVertexFieldNames
+ */
+ public VertexProjectorUtil(GraphSchema graphSchema,
+ Set<RexFieldAccess> fields,
+ String[] addingVertexFieldNames,
+ IType<?>[] addingVertexFieldTypes) {
+ this.graphSchema = graphSchema;
+ this.fields = fields;
+ this.addingVertexFieldNames = addingVertexFieldNames != null ?
addingVertexFieldNames : new String[0];
+ this.addingVertexFieldTypes = addingVertexFieldTypes != null ?
addingVertexFieldTypes : new IType<?>[0];
+ this.projectFunctions = new HashMap<>();
+ this.tableOutputTypes = new HashMap<>();
+ }
+
+ /**
+ * Projects a vertex by filtering fields based on the required field set.
+ *
+ * @param vertex The input vertex to be projected
+ * @return The projected vertex with only required fields, or null if
input is null
+ */
+ public RowVertex projectVertex(RowVertex vertex) {
+ if (vertex == null) {
+ return null;
+ }
+
+ // Handle the case of global variables
+ String compactedVertexLabel = vertex.getLabel();
+ for (String addingName : addingVertexFieldNames) {
+ compactedVertexLabel += "_" + addingName;
+ }
+
+ // Initialize
+ if (this.projectFunctions.get(compactedVertexLabel) == null) {
+ initializeProject(vertex, compactedVertexLabel,
addingVertexFieldTypes, addingVertexFieldNames);
+ }
+
+ // Utilize project functions to filter fields
+ ProjectFunction currentProjectFunction =
this.projectFunctions.get(compactedVertexLabel);
+ ObjectRow projectVertex = (ObjectRow)
currentProjectFunction.project(vertex);
+ RowVertex vertexDecoded = (RowVertex) projectVertex.getField(0, null);
+
+ VertexType vertexType = new
VertexType(this.tableOutputTypes.get(compactedVertexLabel));
+ VertexEncoder encoder = new DefaultVertexEncoder(vertexType);
+ return encoder.encode(vertexDecoded);
+ }
+
+ /**
+ * Initializes the project function for a given vertex label.
+ *
+ * @param vertex The vertex instance used to determine the schema and label
+ * @param compactedLabel The vertex label with additional field names
appended for unique identification
+ * @param globalTypes The types of global variables to be added to the
vertex
+ * @param globalNames The names of global variables to be added to the
vertex
+ */
+ private void initializeProject(RowVertex vertex, String compactedLabel,
+ IType<?>[] globalTypes, String[]
globalNames) {
+ List<TableField> graphSchemaFieldList = graphSchema.getFields();
+ List<TableField> fieldsOfTable;
+ List<TableField> tableOutputType = new ArrayList<>();
+
+ // Extract field names from RexFieldAccess list into a set
+ Set<String> fieldNames = (this.fields == null)
+ ? Collections.emptySet()
+ : this.fields.stream()
+ .map(e -> e.getField().getName())
+ .collect(Collectors.toSet());
+
+ List<Expression> expressions = new ArrayList<>();
+ String vertexLabel = vertex.getLabel();
+
+ for (TableField tableField : graphSchemaFieldList) {
+ if (vertexLabel.equals(tableField.getName())) {
+ List<Expression> inputs = new ArrayList<>();
+ fieldsOfTable = ((VertexType)
tableField.getType()).getFields();
+
+ for (int i = 0; i < fieldsOfTable.size(); i++) {
+ TableField column = fieldsOfTable.get(i);
+ String columnName = column.getName();
+
+ // Normalize: convert fields like `personId` to `id`
+ if (columnName.startsWith(vertexLabel)) {
+ String suffix =
columnName.substring(vertexLabel.length());
+ if (!suffix.isEmpty()) {
+ suffix = Character.toLowerCase(suffix.charAt(0)) +
suffix.substring(1);
+ columnName = suffix;
+ }
+ }
+
+ if (fieldNames.contains(columnName) ||
columnName.equals(ID)) {
+ // Include a field if it's in fieldNames or is ID
column
+ inputs.add(new FieldExpression(null, i,
column.getType()));
+ tableOutputType.add(column);
+ } else if (columnName.equals(LABEL)) {
+ // Add vertex label for LABEL column
+ inputs.add(new LiteralExpression(vertex.getLabel(),
column.getType()));
+ tableOutputType.add(column);
+ } else {
+ // Use null placeholder for excluded fields
+ inputs.add(new LiteralExpression(null,
column.getType()));
+ tableOutputType.add(column);
+ }
+ }
+
+ // Handle additional mapping when all global variables exist
+ if (globalNames.length > 0) {
+ for (int j = 0; j < globalNames.length; j++) {
+ int fieldIndex = j + fieldsOfTable.size();
+ inputs.add(new FieldExpression(null, fieldIndex,
globalTypes[j]));
+ tableOutputType.add(new TableField(globalNames[j],
globalTypes[j]));
+ }
+ }
+
+ expressions.add(new VertexConstructExpression(inputs, null,
new VertexType(tableOutputType)));
+ }
+ }
+
+ ProjectFunction projectFunction = new ProjectFunctionImpl(expressions);
+
+ // Store project functions
+ this.projectFunctions.put(compactedLabel, projectFunction);
+ this.tableOutputTypes.put(compactedLabel, tableOutputType);
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_match_001.sql
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_match_001.sql
index c0f69388..d43c822b 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_match_001.sql
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_match_001.sql
@@ -34,6 +34,6 @@ SELECT
weight,
b_id
FROM (
- MATCH (a) -[e:knows]->(b:person where b.id != 1)
+ MATCH (a) -[e:knows]->(b:person where b.id <> 1)
RETURN a.id as a_id, e.weight as weight, b.id as b_id
)
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]