yaozhongq commented on code in PR #629:
URL: https://github.com/apache/geaflow/pull/629#discussion_r2453990202
##########
build.sh:
##########
@@ -152,7 +152,8 @@ function buildJarPackage() {
checkMaven || return 1
cd $MVN_BUILD_DIR
- mvn clean install -DskipTests -Dcheckstyle.skip -T4 || return 1
Review Comment:
Restore the original compilation method?
##########
geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/SelectFieldPruneRule.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.optimize.rule;
+
+import java.util.*;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.*;
+import org.apache.geaflow.dsl.rel.GraphMatch;
+import org.apache.geaflow.dsl.rel.PathModify.PathModifyExpression;
+import org.apache.geaflow.dsl.rel.logical.LogicalGraphMatch;
+import org.apache.geaflow.dsl.rel.match.*;
+import org.apache.geaflow.dsl.rex.PathInputRef;
+import org.apache.geaflow.dsl.rex.RexObjectConstruct;
+import org.apache.geaflow.dsl.rex.RexParameterRef;
+
+public class SelectFieldPruneRule extends RelOptRule {
+
+ public static final SelectFieldPruneRule PROJECT_INSTANCE;
+ public static final SelectFieldPruneRule GRAPH_MATCH_INSTANCE;
+
+ static {
+ PROJECT_INSTANCE = new ProjectPruneRule(LogicalProject.class);
+ GRAPH_MATCH_INSTANCE = new
GraphMatchPruneRule(LogicalGraphMatch.class);
+ }
+
+ //尝试通过thread判断是否访问过
Review Comment:
Comments need to be in English.
##########
geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml:
##########
@@ -153,6 +153,18 @@
<groupId>org.apache.geaflow</groupId>
<artifactId>geaflow-view-meta</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.hibernate</groupId>
Review Comment:
Is this necessary?
##########
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchVertexOperator.java:
##########
@@ -74,6 +88,104 @@ protected void processRecord(StepRecord record) {
}
}
+ private RowVertex projectVertex(RowVertex vertex) {
+ if (vertex == null) { //找不到符合条件节点,无法映射
+ return null;
+ }
+
+ String compactedVertexLabel = vertex.getLabel(); // 原始变量
+ for (String addingName: addingVertexFieldNames) {
+ compactedVertexLabel += "_" + addingName; //
将addingVariable后的label设为新的一类
+ }
+
+ // 当前表projectFunction未定义
+ if (this.projectFunctions.get(compactedVertexLabel) == null) {
+ initializeProject(vertex, compactedVertexLabel,
addingVertexFieldTypes, addingVertexFieldNames);
+ }
+
+ //进行projection
+ ProjectFunction currentProjectFunction =
this.projectFunctions.get(compactedVertexLabel);
+ ObjectRow projectVertex = (ObjectRow)
currentProjectFunction.project(vertex); //通过project进行属性筛选
+ RowVertex vertexDecoded = (RowVertex) projectVertex.getField(0, null);
+
+
+ //需要重构Fields,以定义VertexType,然后再进行encode
+ VertexType vertexType = new
VertexType(this.tableOutputTypes.get(compactedVertexLabel));
+ VertexEncoder encoder = new DefaultVertexEncoder(vertexType);
+ return encoder.encode(vertexDecoded);
+ }
+
+ private void initializeProject(RowVertex vertex, String compactedLabel,
+ IType<?>[] globalTypes, String[]
globalNames) {
+ List<TableField> graphSchemaFieldList = graphSchema.getFields();
//这里是图中的所有表集合
+
+ List<TableField> fieldsOfTable; //这里的是一张表里的所有字段
+
+ List<TableField> tableOutputType = new ArrayList<>();
//记录新表格所有字段所包括的输出Type
+
+ //提取当前表格内,使用到的字段集合。
+ Set<String> fieldNames = (this.fields == null)
+ ? Collections.emptySet()
+ : this.fields.stream()
+ .map(e -> e.getField().getName())
+ .collect(Collectors.toSet());
+
+
+ //对于每个表,都需要一个expression
+ List<Expression> expressions = new ArrayList<>();
//对于每个表,都需要一个expression
+ String vertexLabel = vertex.getLabel();
+
+ for (TableField tableField : graphSchemaFieldList) {
//枚举所有table,并构造List<Expression>
+ if (vertexLabel.equals(tableField.getName())) { //table名匹配
(如都为`person`)
+
+ List<Expression> inputs = new ArrayList<>();
+ fieldsOfTable = ((VertexType)tableField.getType()).getFields();
+
+ for (int i = 0; i < fieldsOfTable.size(); i++) {
//枚举表格内不同字段,并做属性筛选
+ TableField column = fieldsOfTable.get(i);
+ String columnName = column.getName();
+
+ //标准化,将形如personId改为id
+ if (columnName.startsWith(vertexLabel)) {
+ String suffix =
columnName.substring(vertexLabel.length());
+ if (!suffix.isEmpty()) {
+ suffix = Character.toLowerCase(suffix.charAt(0)) +
suffix.substring(1);
+ columnName = suffix;
+ }
+ }
+
+ if (fieldNames.contains(columnName) ||
columnName.equals("id")) { //存在已经筛选出的字段或是特殊的Id字段
Review Comment:
No constants? Don't hardcode them.
##########
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/AbstractStepOperator.java:
##########
@@ -87,6 +89,8 @@ public abstract class AbstractStepOperator<FUNC extends
StepFunction, IN extends
protected String name;
+ protected Set<RexFieldAccess> fields;
Review Comment:
Don't put it in the base class
##########
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchVertexOperator.java:
##########
@@ -74,6 +88,104 @@ protected void processRecord(StepRecord record) {
}
}
+ private RowVertex projectVertex(RowVertex vertex) {
Review Comment:
Consider putting it in a new tool class and adding fine-grained unit testing
##########
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchEdgeOperator.java:
##########
@@ -54,6 +70,99 @@ public void open(TraversalRuntimeContext context) {
this.loadEdgeRt =
metricGroup.histogram(MetricNameFormatter.loadEdgeTimeRtName(getName()));
}
+ private RowEdge projectEdge(RowEdge edge) {
+ if (edge == null) { //找不到符合条件节点,无法映射
+ return null;
+ }
+
+ if (this.projectFunction == null) {
+ initializeProject(edge);
+ }
+
+ //进行projection
+ ObjectRow projectEdge = (ObjectRow)
this.projectFunction.project(edge); //通过project进行属性筛选
+ RowEdge edgeDecoded = (RowEdge) projectEdge.getField(0, null);
+
+ //需要重构Fields,以定义EdgeType,然后再进行encode
+ EdgeType edgeType = new EdgeType(this.tableOutputType, false);
+ EdgeEncoder encoder = new DefaultEdgeEncoder(edgeType);
+ return encoder.encode(edgeDecoded);
+ }
+
+
+ private void initializeProject(RowEdge edge) {
+ List<TableField> graphSchemaFieldList = graphSchema.getFields();
//这里是图中的所有表集合
+
+ IType<?> outputType = this.getOutputType();
+ List<TableField> fieldsOfTable; //这里的是一张表里的所有字段
+ if (outputType instanceof EdgeType) {
+ fieldsOfTable = ((EdgeType) outputType).getFields();
+ } else {
+ throw new IllegalArgumentException("Unsupported type: " +
outputType.getClass());
+ }
+
+ //提取当前表格内,使用到的字段集合。
+ Set<String> fieldNames = (this.fields == null)
+ ? Collections.emptySet()
+ : this.fields.stream()
+ .map(e -> e.getField().getName())
+ .collect(Collectors.toSet());
+
+
+ List<Expression> expressions = new ArrayList<>();
//对于每个表,都需要一个expression
+ int[] currentIndicesMapping = new int[graphSchemaFieldList.size()];
//在当前匹配下,原index到裁剪后index的映射
+ Arrays.fill(currentIndicesMapping, -1);
+
+ List<TableField> tableOutputType = null;
+ for (TableField tableField : graphSchemaFieldList) {
//枚举所有table,并构造List<Expression>
+ if (edge.getLabel().equals(tableField.getName())) { //table名匹配
(如都为`knows`)
+
+ List<Expression> inputs = new ArrayList<>();
+ tableOutputType = new ArrayList<>();
+ String edgeLabel = edge.getLabel();
+
+ for (int i = 0; i < fieldsOfTable.size(); i++) {
//枚举表格内不同字段,并做属性筛选
+ TableField column = fieldsOfTable.get(i);
+ String columnName = column.getName();
+
+ //标准化,将形如personId改为id
+ if (columnName.startsWith(edgeLabel)) {
+ String suffix =
columnName.substring(edgeLabel.length());
+ if (!suffix.isEmpty()) {
+ suffix = Character.toLowerCase(suffix.charAt(0)) +
suffix.substring(1);
+ columnName = suffix;
+ }
+ }
+
+
+ if (fieldNames.contains(columnName) ||
columnName.equals("srcId")
Review Comment:
Use contants
##########
geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/SelectFieldPruneRule.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.optimize.rule;
+
+import java.util.*;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.*;
+import org.apache.geaflow.dsl.rel.GraphMatch;
+import org.apache.geaflow.dsl.rel.PathModify.PathModifyExpression;
+import org.apache.geaflow.dsl.rel.logical.LogicalGraphMatch;
+import org.apache.geaflow.dsl.rel.match.*;
+import org.apache.geaflow.dsl.rex.PathInputRef;
+import org.apache.geaflow.dsl.rex.RexObjectConstruct;
+import org.apache.geaflow.dsl.rex.RexParameterRef;
+
+public class SelectFieldPruneRule extends RelOptRule {
+
+ public static final SelectFieldPruneRule PROJECT_INSTANCE;
+ public static final SelectFieldPruneRule GRAPH_MATCH_INSTANCE;
+
+ static {
+ PROJECT_INSTANCE = new ProjectPruneRule(LogicalProject.class);
+ GRAPH_MATCH_INSTANCE = new
GraphMatchPruneRule(LogicalGraphMatch.class);
+ }
+
+ //尝试通过thread判断是否访问过
+ public static void resetState() {
+ ProjectPruneRule.visitedProjectsThreadLocal.remove();
+ }
+
+ //将只有index作为索引转换为带label的完整fields
+ private static Set<RexFieldAccess> convertToPathRefs(Set<RexFieldAccess>
fieldAccesses, RelNode node) {
+ Set<RexFieldAccess> convertedFieldAccesses = new HashSet<>(); //
储存已经转换后的index
+ RelDataType pathRecordType = node.getRowType(); // 获取当前层级下的表类型
+ RexBuilder rexBuilder = node.getCluster().getRexBuilder(); //
通过构建函数新建新fileds
+
+ for (RexFieldAccess fieldAccess: fieldAccesses) {
+ RexNode referenceExpr = fieldAccess.getReferenceExpr();
+
+ // 只处理输入引用类型的字段访问
+ if (referenceExpr instanceof RexInputRef) {
+ RexInputRef inputRef = (RexInputRef) referenceExpr;
+
+ // 如果大于index,说明来源子查询,跳过
+ if (pathRecordType.getFieldList().size() <=
inputRef.getIndex()) {
+ continue;
+ }
+
+ // 从 PathRecordType 中获取对应的路径字段信息
+ RelDataTypeField pathField =
pathRecordType.getFieldList().get(inputRef.getIndex());
+
+ // 创建真正的 PathInputRef
+ PathInputRef pathInputRef = new PathInputRef(
+ pathField.getName(), // 路径变量名 (如 "a", "b", "c")
+ pathField.getIndex(), // 字段索引
+ pathField.getType() // 字段类型
+ );
+
+ // 用新的路径引用重新创建 RexFieldAccess,替换原来的元素
+ RexFieldAccess newFieldAccess = (RexFieldAccess)
rexBuilder.makeFieldAccess(
+ pathInputRef,
+ fieldAccess.getField().getIndex()
+ );
+ convertedFieldAccesses.add(newFieldAccess);
+ }
+ }
+
+ return convertedFieldAccesses;
+ }
+
+ // 从RexNode向下递归,挖掘所有可能的特征
+ private static Set<RexFieldAccess> collectAllFieldAccesses(RexBuilder
rexBuilder, RexNode rootNode) {
+ Set<RexFieldAccess> fieldAccesses = new HashSet<>();
+ Queue<RexNode> queue = new LinkedList<>();
+ queue.offer(rootNode);
+
+ while (!queue.isEmpty()) {
+ RexNode node = queue.poll();
+
+ if (node instanceof RexFieldAccess) {
+ // 如果可以直接转换
+ fieldAccesses.add((RexFieldAccess) node);
+
+ } else if (node instanceof RexCall) {
+ // 自定义方法,需要提取元素转换
+ RexCall rexCall = (RexCall) node;
+
+ // 检查是否是字段访问类型的调用(operand[0]是ref,operator是字段名)
+ if (rexCall.getOperands().size() > 0) {
+ RexNode ref = rexCall.getOperands().get(0);
+ String fieldName = rexCall.getOperator().getName();
+
+ // 如果是特殊字段,按原来的映射处理
+ if ("id".equals(fieldName)
+ || "label".equals(fieldName)
+ || "srcId".equals(fieldName)
+ || "targetId".equals(fieldName)) {
+
+ String mappedFieldName = "id".equals(fieldName) ?
"~id" :
+ "label".equals(fieldName) ? "~label" :
+ "srcId".equals(fieldName) ? "~srcId" :
+ "~targetId";
+ fieldAccesses.add((RexFieldAccess)
rexBuilder.makeFieldAccess(ref, mappedFieldName, false));
+
+ } else if (ref instanceof RexInputRef) {
+ // 其他非嵌套自定义函数:枚举ref的所有字段并全部加入
+ RelDataType refType = ref.getType();
+ List<RelDataTypeField> refFields =
refType.getFieldList();
+
+ for (RelDataTypeField field : refFields) {
+ RexFieldAccess fieldAccess = (RexFieldAccess)
rexBuilder.makeFieldAccess(
+ ref,
+ field.getName(),
+ false
+ );
+ fieldAccesses.add(fieldAccess);
+ }
+
+ } else {
+ // ref本身可能是复杂表达式,继续递归处理
+ queue.add(ref);
+ }
+
+ // 将其他操作数也加入队列继续处理
+ for (int i = 1; i < rexCall.getOperands().size(); i++) {
+ queue.add(rexCall.getOperands().get(i));
+ }
+ }
+
+ } else if (node instanceof RexInputRef) {
+ // RexInputRef 直接引用输入,枚举其所有字段
+ RelDataType refType = node.getType();
+ List<RelDataTypeField> refFields = refType.getFieldList();
+
+ for (RelDataTypeField field : refFields) {
+ RexFieldAccess fieldAccess = (RexFieldAccess)
rexBuilder.makeFieldAccess(
+ node,
+ field.getName(),
+ false
+ );
+ fieldAccesses.add(fieldAccess);
+ }
+
+ } else if (node instanceof RexLiteral || node instanceof
RexParameterRef) {
+ // 字面量,跳过
+ continue;
+
+ } else {
+ // 其他未知类型,可以选择抛异常或记录日志
+ throw new IllegalArgumentException("Unsupported type: " +
node.getClass());
+ }
+ }
+
+ return fieldAccesses;
+ }
+
+ // 内部类:处理 LogicalProject 的规则并下推
+ private SelectFieldPruneRule(RelOptRuleOperand operand, String
description) {
+ super(operand, description);
+ }
+
+ private static void transverseFilteredElements(Set<RexFieldAccess> fields,
IMatchNode pathPattern) {
+ Queue<IMatchNode> queue = new LinkedList<>(); //记录待访问队列
+ Set<IMatchNode> visited = new HashSet<>(); //标记已经访问过的点
+
+ queue.offer(pathPattern);
+ visited.add(pathPattern);
+
+ //逐个访问该Path的所有点,同时遍历字段列表:若Label匹配则将该字段特征加入.fields
+ while (!queue.isEmpty()) {
+ IMatchNode currentPathPattern = queue.poll();
+
+ if (currentPathPattern instanceof VertexMatch) {
+ VertexMatch vertexMatch = (VertexMatch) currentPathPattern;
+ String vertexLabel = vertexMatch.getLabel();
+ for (RexFieldAccess fieldElement: fields) {
+ PathInputRef inputRef = (PathInputRef)
fieldElement.getReferenceExpr();
+ if (inputRef.getLabel().equals(vertexLabel)) {
+ vertexMatch.addField(fieldElement);
+ }
+ }
+ }
+ if (currentPathPattern instanceof EdgeMatch) {
+ EdgeMatch edgeMatch = (EdgeMatch) currentPathPattern;
+ String edgeLabel = edgeMatch.getLabel();
+ for (RexFieldAccess fieldElement : fields) {
+ PathInputRef inputRef = (PathInputRef)
fieldElement.getReferenceExpr();
+ if (inputRef.getLabel().equals(edgeLabel)) {
+ edgeMatch.addField(fieldElement);
+ }
+ }
+ }
+
+
+ //循环遍历可能存在的字段
+ List<RelNode> inputs = currentPathPattern.getInputs();
+ for (RelNode candidateInput : inputs) {
+ if (candidateInput != null && !visited.contains((IMatchNode)
candidateInput)) {
+ queue.offer((IMatchNode) candidateInput);
+ visited.add((IMatchNode) candidateInput);
+ }
+ }
+ }
+
+ }
+
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ // 基类的onMatch方法,由子类重写具体逻辑
+ }
+
+ // 内部类:处理 LogicalProject 的规则
+ private static class ProjectPruneRule extends SelectFieldPruneRule {
Review Comment:
1. Don't put them all in one class file.
2. add optimization rules and test them separately.
##########
geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/optimize/rule/SelectFieldPruneRule.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.optimize.rule;
+
+import java.util.*;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.*;
+import org.apache.geaflow.dsl.rel.GraphMatch;
+import org.apache.geaflow.dsl.rel.PathModify.PathModifyExpression;
+import org.apache.geaflow.dsl.rel.logical.LogicalGraphMatch;
+import org.apache.geaflow.dsl.rel.match.*;
+import org.apache.geaflow.dsl.rex.PathInputRef;
+import org.apache.geaflow.dsl.rex.RexObjectConstruct;
+import org.apache.geaflow.dsl.rex.RexParameterRef;
+
+public class SelectFieldPruneRule extends RelOptRule {
+
+ public static final SelectFieldPruneRule PROJECT_INSTANCE;
+ public static final SelectFieldPruneRule GRAPH_MATCH_INSTANCE;
+
+ static {
+ PROJECT_INSTANCE = new ProjectPruneRule(LogicalProject.class);
+ GRAPH_MATCH_INSTANCE = new
GraphMatchPruneRule(LogicalGraphMatch.class);
+ }
+
+ //尝试通过thread判断是否访问过
+ public static void resetState() {
Review Comment:
Optimization rules cannot have state
##########
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/traversal/operator/MatchEdgeOperator.java:
##########
@@ -54,6 +70,99 @@ public void open(TraversalRuntimeContext context) {
this.loadEdgeRt =
metricGroup.histogram(MetricNameFormatter.loadEdgeTimeRtName(getName()));
}
+ private RowEdge projectEdge(RowEdge edge) {
Review Comment:
Consider putting it in a new tool class and adding fine-grained unit testing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]