PHILO-HE commented on code in PR #9940:
URL: https://github.com/apache/incubator-gluten/pull/9940#discussion_r2160797819
##########
gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java:
##########
@@ -96,71 +104,121 @@ private JobGraph mergeGlutenOperators(JobGraph jobGraph) {
return jobGraph;
}
- // A JobVertex may contain several operators chained like this:
Source-->Op1-->Op2-->Sink.
+ // A JobVertex may contain several operators chained like this:
Source-->Op1-->Op2-->Sink1.
+ //
-->Sink2.
// If the operators connected all support translated to gluten, we merge
them into
// a single GlutenOperator to avoid data transferred between flink and
native.
- // Now we only support that one operator followed by at most one other
operator.
+ // One operator may be followed by several other operators.
private void buildGlutenChains(StreamConfig vertexConfig) {
Map<Integer, StreamConfig> serializedTasks =
vertexConfig.getTransitiveChainedTaskConfigs(userClassloader);
Map<Integer, StreamConfig> chainedTasks = new
HashMap<>(serializedTasks.size());
serializedTasks.forEach(
(id, config) -> chainedTasks.put(id, new
StreamConfig(config.getConfiguration())));
- StreamConfig taskConfig = vertexConfig;
- while (true) {
- List<StreamEdge> outEdges =
taskConfig.getChainedOutputs(userClassloader);
- if (outEdges == null || outEdges.size() != 1) {
- // only support operators have one output.
- LOG.debug("{} has no or more than one chained task.",
taskConfig.getOperatorName());
- break;
- }
+ buildGlutenChains(vertexConfig, chainedTasks);
+ // TODO: may need fallback if failed.
+ vertexConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedTasks);
+ }
- StreamEdge outEdge = outEdges.get(0);
+ private void buildGlutenChains(StreamConfig taskConfig, Map<Integer,
StreamConfig> chainedTasks) {
+ boolean isGlutenOp = isGlutenOperator(taskConfig);
+ List<StreamEdge> outEdges = taskConfig.getChainedOutputs(userClassloader);
+ GlutenOperator sourceOperator = isGlutenOp ? getGlutenOperator(taskConfig)
: null;
+ if (outEdges == null || outEdges.isEmpty()) {
+ LOG.debug("{} has no chained task.", taskConfig.getOperatorName());
+ // TODO: judge whether can set?
+ if (isGlutenOp && taskConfig.getOperatorName().equals("exchange-hash")) {
+ taskConfig.setTypeSerializerOut(new GlutenRowVectorSerializer(null));
+ taskConfig.serializeAllConfigs();
+ }
+ return;
+ }
+ Map<String, Integer> nodeToChainedOuts = new HashMap<>(outEdges.size());
+ Map<IntermediateDataSetID, String> nodeToNonChainedOuts = new
HashMap<>(outEdges.size());
+ Map<String, RowType> nodeToOutTypes = new HashMap<>(outEdges.size());
+ List<StreamEdge> chainedOutputs = new ArrayList<>(outEdges.size());
+ List<NonChainedOutput> nonChainedOutputs = new
ArrayList<>(outEdges.size());
+ StatefulPlanNode sourceNode = isGlutenOp ? sourceOperator.getPlanNode() :
null;
+ boolean allGluten = true;
+ LOG.debug("Edge size {}, OP {}", outEdges.size(), sourceOperator);
+ for (StreamEdge outEdge : outEdges) {
StreamConfig outTask = chainedTasks.get(outEdge.getTargetId());
if (outTask == null) {
- LOG.warn("Not find task {} in Chained tasks", outEdge.getTargetId());
+ LOG.error("Not find task {} in Chained tasks", outEdge.getTargetId());
+ allGluten = false;
break;
}
- if (isGlutenOperator(taskConfig) && isGlutenOperator(outTask)) {
+ buildGlutenChains(outTask, chainedTasks);
+ if (isGlutenOp && isGlutenOperator(outTask)) {
GlutenOperator outOperator = getGlutenOperator(outTask);
- PlanNode outNode = outOperator.getPlanNode();
- GlutenOperator sourceOperator = getGlutenOperator(taskConfig);
- if (outNode != null) {
- outNode.setSources(List.of(sourceOperator.getPlanNode()));
- LOG.debug("Set {} source to {}", outNode,
sourceOperator.getPlanNode());
+ StatefulPlanNode outNode = outOperator.getPlanNode();
+ if (sourceNode != null) {
Review Comment:
From the code below, is sourceNode always assigned with non-null value if
it's gluten op?
```
StatefulPlanNode sourceNode = isGlutenOp ? sourceOperator.getPlanNode() :
null;
```
If true, it seems the sourceNode cannot be null under the `if (isGlutenOp &&
...)` block. Then, we don't need to add code for handling sourceNode == null?
##########
gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DataTimeRexCallConvertor.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import org.apache.gluten.rexnode.RexConversionContext;
+import org.apache.gluten.rexnode.RexNodeConverter;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.CastTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.TimestampType;
+import io.github.zhztheplayer.velox4j.type.Type;
+
+import org.apache.calcite.rex.RexCall;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+class TimeStampIntervalRexCallConverter extends BaseRexCallConverter {
+
+ public TimeStampIntervalRexCallConverter(String functionName) {
+ super(functionName);
+ }
+
+ @Override
+ public boolean isSupported(RexCall callNode, RexConversionContext context) {
+ // This converter supports timestamp and interval day-time operations.
+ List<Type> operandTypes =
+ callNode.getOperands().stream()
+ .map(param -> RexNodeConverter.toType(param.getType()))
+ .collect(Collectors.toList());
+ return (operandTypes.get(0) instanceof TimestampType
+ // && TypeUtils.isTimeInterval(operandTypes.get(1)))
+ || // (TypeUtils.isTimeInterval(operandTypes.get(0)) &&
+ operandTypes.get(1) instanceof TimestampType);
Review Comment:
Seems no interval type is used. Could you clarify?
##########
gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java:
##########
@@ -96,71 +104,121 @@ private JobGraph mergeGlutenOperators(JobGraph jobGraph) {
return jobGraph;
}
- // A JobVertex may contain several operators chained like this:
Source-->Op1-->Op2-->Sink.
+ // A JobVertex may contain several operators chained like this:
Source-->Op1-->Op2-->Sink1.
+ //
-->Sink2.
// If the operators connected all support translated to gluten, we merge
them into
// a single GlutenOperator to avoid data transferred between flink and
native.
- // Now we only support that one operator followed by at most one other
operator.
+ // One operator may be followed by several other operators.
private void buildGlutenChains(StreamConfig vertexConfig) {
Map<Integer, StreamConfig> serializedTasks =
vertexConfig.getTransitiveChainedTaskConfigs(userClassloader);
Map<Integer, StreamConfig> chainedTasks = new
HashMap<>(serializedTasks.size());
serializedTasks.forEach(
(id, config) -> chainedTasks.put(id, new
StreamConfig(config.getConfiguration())));
- StreamConfig taskConfig = vertexConfig;
- while (true) {
- List<StreamEdge> outEdges =
taskConfig.getChainedOutputs(userClassloader);
- if (outEdges == null || outEdges.size() != 1) {
- // only support operators have one output.
- LOG.debug("{} has no or more than one chained task.",
taskConfig.getOperatorName());
- break;
- }
+ buildGlutenChains(vertexConfig, chainedTasks);
+ // TODO: may need fallback if failed.
+ vertexConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedTasks);
+ }
- StreamEdge outEdge = outEdges.get(0);
+ private void buildGlutenChains(StreamConfig taskConfig, Map<Integer,
StreamConfig> chainedTasks) {
+ boolean isGlutenOp = isGlutenOperator(taskConfig);
+ List<StreamEdge> outEdges = taskConfig.getChainedOutputs(userClassloader);
+ GlutenOperator sourceOperator = isGlutenOp ? getGlutenOperator(taskConfig)
: null;
Review Comment:
Seems better to modify `getGlutenOperator` to return Optional<>? Then, the
result can indicate it's gluten operator or not , and `isGlutenOperator` can be
removed.
##########
gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java:
##########
@@ -52,6 +52,10 @@ public static boolean isStringType(Type type) {
return type instanceof VarCharType;
}
+ public static boolean isTimeInterval(Type type) {
Review Comment:
Nit:
name suggestion: `isIntervalType`
##########
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.typeutils;
+
+import io.github.zhztheplayer.velox4j.Velox4j;
+import io.github.zhztheplayer.velox4j.data.RowVector;
+import io.github.zhztheplayer.velox4j.memory.AllocationListener;
+import io.github.zhztheplayer.velox4j.memory.MemoryManager;
+import io.github.zhztheplayer.velox4j.session.Session;
+import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/** Serializer for {@link RowVector}. */
+@Internal
+public class GlutenRowVectorSerializer extends TypeSerializer<StatefulRecord> {
+ private static final long serialVersionUID = 1L;
+ private final RowType rowType;
+ private MemoryManager memoryManager;
+ private Session session;
+
+ public GlutenRowVectorSerializer(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ @Override
+ public TypeSerializer<StatefulRecord> duplicate() {
+ return new GlutenRowVectorSerializer(rowType);
+ }
+
+ @Override
+ public StatefulRecord createInstance() {
+ throw new RuntimeException("Not implemented for gluten");
+ }
+
+ @Override
+ public void serialize(StatefulRecord record, DataOutputView target) throws
IOException {
+ // memoryManager = MemoryManager.create(AllocationListener.NOOP);
+ // session = Velox4j.newSession(memoryManager);
+ String vectorStr =
+ record.getRowVector().serialize(); //
session.baseVectorOps().serializeOne(row);
Review Comment:
Nit: please these commented code if useless.
##########
gluten-flink/runtime/src/main/java/org/apache/gluten/util/Utils.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.util;
+
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** Generate a unique id for each velox PlanNode */
Review Comment:
Remove this.
##########
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -555,7 +561,8 @@ private Transformation<RowData> applyRowtimeTransformation(
"StreamRecordTimestampInserter",
config),
// TODO: support it
- new GlutenSingleInputOperator(null, PlanNodeIdGenerator.newId(), null,
null),
+ new GlutenSingleInputOperator(
+ null, PlanNodeIdGenerator.newId(), null, Map.of("1", outputType)),
Review Comment:
"1" is an arbitrary setting? If necessary, please add a comment to clarify.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]