PHILO-HE commented on code in PR #11365:
URL: 
https://github.com/apache/incubator-gluten/pull/11365#discussion_r2688734143


##########
gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java:
##########
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.client;
+
+import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.keyselector.GlutenKeySelector;
+import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
+import org.apache.gluten.table.runtime.operators.GlutenSourceFunction;
+import org.apache.gluten.table.runtime.operators.GlutenTwoInputOperator;
+import 
org.apache.gluten.table.runtime.typeutils.GlutenStatefulRecordSerializer;
+import org.apache.gluten.util.Utils;
+
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/*
+ * Generates an offloaded JobGraph by transforming offloadable operators to 
Gluten operators.
+ * Main workflow:
+ * 1. For each JobVertex, generate an OperatorChainSliceGraph to identify 
offloadable slices.
+ * 2. Recursively visit chain slices: create offloaded operators for 
offloadable slices,
+ *    keep original operators for unoffloadable ones.
+ * 3. For offloadable operators: update input/output serializers and state 
partitioners
+ *    based on upstream/downstream operator capabilities.
+ * 4. Update stream edges and serialize all operator configurations.
+ */
+public class OffloadedJobGraphGenerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OffloadedJobGraphGenerator.class);
+  private final JobGraph jobGraph;
+  private final ClassLoader userClassloader;
+  private boolean hasGenerated = false;
+
+  public OffloadedJobGraphGenerator(JobGraph jobGraph, ClassLoader 
userClassloader) {
+    this.jobGraph = jobGraph;
+    this.userClassloader = userClassloader;
+  }
+
+  public JobGraph generate() {
+    if (hasGenerated) {
+      throw new IllegalStateException("JobGraph has been generated.");
+    }
+    hasGenerated = true;

Review Comment:
   Nit: suggest to move this state flag to the end before result is returned.



##########
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java:
##########
@@ -18,35 +18,40 @@
 
 import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
 import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
+import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics;
 import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
 
-import io.github.zhztheplayer.velox4j.Velox4j;
 import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
-import io.github.zhztheplayer.velox4j.data.RowVector;
 import io.github.zhztheplayer.velox4j.iterator.UpIterator;
-import io.github.zhztheplayer.velox4j.memory.AllocationListener;
-import io.github.zhztheplayer.velox4j.memory.MemoryManager;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.query.Query;
 import io.github.zhztheplayer.velox4j.query.SerialTask;
-import io.github.zhztheplayer.velox4j.serde.Serde;
 import io.github.zhztheplayer.velox4j.session.Session;
 import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
+import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
+import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
 import io.github.zhztheplayer.velox4j.type.RowType;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.table.data.RowData;
 
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 
-/** Gluten legacy source function, call velox plan to execute. */
-public class GlutenSourceFunction extends RichParallelSourceFunction<RowData> {
+/**
+ * Gluten legacy source function, call velox plan to execute. It sends 
RowVector to downstream
+ * instead of RowData to avoid data convert.

Review Comment:
   Nit: conversion



##########
gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java:
##########
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.client;
+
+import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.keyselector.GlutenKeySelector;
+import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
+import org.apache.gluten.table.runtime.operators.GlutenSourceFunction;
+import org.apache.gluten.table.runtime.operators.GlutenTwoInputOperator;
+import 
org.apache.gluten.table.runtime.typeutils.GlutenStatefulRecordSerializer;
+import org.apache.gluten.util.Utils;
+
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/*
+ * Generates an offloaded JobGraph by transforming offloadable operators to 
Gluten operators.
+ * Main workflow:
+ * 1. For each JobVertex, generate an OperatorChainSliceGraph to identify 
offloadable slices.
+ * 2. Recursively visit chain slices: create offloaded operators for 
offloadable slices,
+ *    keep original operators for unoffloadable ones.
+ * 3. For offloadable operators: update input/output serializers and state 
partitioners
+ *    based on upstream/downstream operator capabilities.
+ * 4. Update stream edges and serialize all operator configurations.
+ */
+public class OffloadedJobGraphGenerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OffloadedJobGraphGenerator.class);
+  private final JobGraph jobGraph;
+  private final ClassLoader userClassloader;
+  private boolean hasGenerated = false;
+
+  public OffloadedJobGraphGenerator(JobGraph jobGraph, ClassLoader 
userClassloader) {
+    this.jobGraph = jobGraph;
+    this.userClassloader = userClassloader;
+  }
+
+  public JobGraph generate() {
+    if (hasGenerated) {
+      throw new IllegalStateException("JobGraph has been generated.");
+    }
+    hasGenerated = true;
+    for (JobVertex jobVertex : jobGraph.getVertices()) {
+      offloadJobVertex(jobVertex);
+    }
+    return jobGraph;
+  }
+
+  private void offloadJobVertex(JobVertex jobVertex) {
+    OperatorChainSliceGraphGenerator graphGenerator =
+        new OperatorChainSliceGraphGenerator(jobVertex, userClassloader);
+    OperatorChainSliceGraph chainSliceGraph = graphGenerator.getGraph();
+    LOG.info("OperatorChainSliceGraph:\n{}", chainSliceGraph);
+
+    OperatorChainSlice sourceChainSlice = chainSliceGraph.getSourceSlice();
+    OperatorChainSliceGraph offloadedChainSliceGraph = new 
OperatorChainSliceGraph();
+    visitAndOffloadChainOperators(
+        sourceChainSlice, chainSliceGraph, offloadedChainSliceGraph, 0, 
jobVertex);
+    visitAndUpdateStreamEdges(sourceChainSlice, chainSliceGraph, 
offloadedChainSliceGraph);
+    serializeAllOperatorsConfigs(offloadedChainSliceGraph);
+
+    StreamConfig sourceConfig = sourceChainSlice.getOperatorConfigs().get(0);
+    StreamConfig offloadedSourceConfig =
+        
offloadedChainSliceGraph.getSlice(sourceChainSlice.id()).getOperatorConfigs().get(0);
+
+    Map<Integer, StreamConfig> chainedConfigs =
+        collectChainedConfigs(sourceChainSlice, offloadedChainSliceGraph);
+    updateSourceConfigIfOffloadable(
+        sourceConfig, offloadedSourceConfig, sourceChainSlice, chainedConfigs);
+    sourceConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
+    sourceConfig.serializeAllConfigs();
+  }
+
+  // Process and offload operator chain slices recursively
+  private void visitAndOffloadChainOperators(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph sourceChainSliceGraph,
+      OperatorChainSliceGraph offloadedChainSliceGraph,
+      Integer chainIndex,
+      JobVertex jobVertex) {
+    OperatorChainSlice processedChainSlice;
+    if (sourceChainSlice.isOffloadable()) {
+      processedChainSlice =
+          createOffloadedOperatorChainSlice(
+              sourceChainSliceGraph, sourceChainSlice, chainIndex, jobVertex);
+      chainIndex = chainIndex + 1;
+    } else {
+      processedChainSlice = 
createUnoffloadableOperatorChainSlice(sourceChainSlice, chainIndex);
+      chainIndex = chainIndex + sourceChainSlice.getOperatorConfigs().size();
+    }
+
+    processedChainSlice.getInputs().addAll(sourceChainSlice.getInputs());
+    processedChainSlice.getOutputs().addAll(sourceChainSlice.getOutputs());
+    offloadedChainSliceGraph.addSlice(sourceChainSlice.id(), 
processedChainSlice);
+
+    // Recursively process downstream chain slices
+    for (Integer downstreamSliceId : sourceChainSlice.getOutputs()) {
+      OperatorChainSlice downstreamSourceSlice = 
sourceChainSliceGraph.getSlice(downstreamSliceId);
+      OperatorChainSlice downstreamProcessedSlice =
+          offloadedChainSliceGraph.getSlice(downstreamSliceId);
+      if (downstreamProcessedSlice == null) {
+        visitAndOffloadChainOperators(
+            downstreamSourceSlice,
+            sourceChainSliceGraph,
+            offloadedChainSliceGraph,
+            chainIndex,
+            jobVertex);
+      }
+    }
+  }
+
+  // Keep the original operator chain slice as is.
+  private OperatorChainSlice createUnoffloadableOperatorChainSlice(
+      OperatorChainSlice sourceChainSlice, Integer chainIndex) {
+    OperatorChainSlice unoffloadableChainSlice = new 
OperatorChainSlice(sourceChainSlice.id());
+    List<StreamConfig> operatorConfigs = sourceChainSlice.getOperatorConfigs();
+    for (StreamConfig opConfig : operatorConfigs) {
+      StreamConfig newOpConfig = new StreamConfig(new 
Configuration(opConfig.getConfiguration()));
+      newOpConfig.setChainIndex(chainIndex);
+      unoffloadableChainSlice.getOperatorConfigs().add(newOpConfig);
+    }
+    unoffloadableChainSlice.setOffloadable(false);
+    return unoffloadableChainSlice;
+  }
+
+  // Create offloadable operator chain slice, and update the input/output 
channel serializers
+  private OperatorChainSlice createOffloadedOperatorChainSlice(
+      OperatorChainSliceGraph chainSliceGraph,
+      OperatorChainSlice sourceChainSlice,
+      Integer chainIndex,
+      JobVertex jobVertex) {
+    OperatorChainSlice offloadedChainSlice = new 
OperatorChainSlice(sourceChainSlice.id());
+    List<StreamConfig> operatorConfigs = sourceChainSlice.getOperatorConfigs();
+
+    // May coalesce multiple operators into one in the future.
+    if (operatorConfigs.size() != 1) {
+      throw new UnsupportedOperationException(
+          "Only one operator is supported for offloaded operator chain 
slice.");
+    }
+
+    StreamConfig sourceOpConfig = operatorConfigs.get(0);
+    GlutenOperator sourceOperator = Utils.getGlutenOperator(sourceOpConfig, 
userClassloader).get();
+    StatefulPlanNode planNode = sourceOperator.getPlanNode();
+    // Create a new operator config for the offloaded operator.
+    StreamConfig offloadedOpConfig =
+        new StreamConfig(new Configuration(sourceOpConfig.getConfiguration()));
+    if (sourceOperator instanceof GlutenStreamSource) {
+      boolean canOutputRowVector = canOutputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+      Class<?> outClass = canOutputRowVector ? StatefulRecord.class : 
RowData.class;
+      GlutenStreamSource newSourceOp =
+          new GlutenStreamSource(
+              new GlutenSourceFunction<>(
+                  planNode,
+                  sourceOperator.getOutputTypes(),
+                  sourceOperator.getId(),
+                  ((GlutenStreamSource) sourceOperator).getConnectorSplit(),
+                  outClass));
+      offloadedOpConfig.setStreamOperator(newSourceOp);
+      if (canOutputRowVector) {
+        setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
+      }
+    } else if (sourceOperator instanceof GlutenOneInputOperator) {
+      createOffloadedOneInputOperator(
+          sourceChainSlice,
+          chainSliceGraph,
+          jobVertex,
+          planNode,
+          (GlutenOneInputOperator<?, ?>) sourceOperator,
+          sourceOpConfig,
+          offloadedOpConfig);
+    } else if (sourceOperator instanceof GlutenTwoInputOperator) {
+      createOffloadedTwoInputOperator(
+          sourceChainSlice,
+          chainSliceGraph,
+          jobVertex,
+          planNode,
+          (GlutenTwoInputOperator<?, ?>) sourceOperator,
+          sourceOpConfig,
+          offloadedOpConfig);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported operator type for offloading: " + 
sourceOperator.getClass().getName());
+    }
+
+    offloadedOpConfig.setChainIndex(chainIndex);
+    offloadedChainSlice.getOperatorConfigs().add(offloadedOpConfig);
+    offloadedChainSlice.setOffloadable(true);
+    return offloadedChainSlice;
+  }
+
+  private void createOffloadedOneInputOperator(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph chainSliceGraph,
+      JobVertex jobVertex,
+      StatefulPlanNode planNode,
+      GlutenOneInputOperator<?, ?> sourceOperator,
+      StreamConfig sourceOpConfig,
+      StreamConfig offloadedOpConfig) {
+    boolean canOutputRowVector = canOutputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+    boolean canInputRowVector = canInputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+    Class<?> inClass = canInputRowVector ? StatefulRecord.class : 
RowData.class;
+    Class<?> outClass = canOutputRowVector ? StatefulRecord.class : 
RowData.class;
+    GlutenOneInputOperator<?, ?> newOneInputOp =
+        new GlutenOneInputOperator<>(
+            planNode,
+            sourceOperator.getId(),
+            sourceOperator.getInputType(),
+            sourceOperator.getOutputTypes(),
+            inClass,
+            outClass,
+            sourceOperator.getDescription());
+    offloadedOpConfig.setStreamOperator(newOneInputOp);
+    if (canOutputRowVector) {
+      setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
+    }
+    if (canInputRowVector) {
+      setOffloadedInputSerializer(offloadedOpConfig, sourceOperator);
+      setOffloadedStatePartitioner(
+          sourceOpConfig, offloadedOpConfig, 0, 
sourceOperator.getDescription());
+    }
+  }
+
+  private void createOffloadedTwoInputOperator(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph chainSliceGraph,
+      JobVertex jobVertex,
+      StatefulPlanNode planNode,
+      GlutenTwoInputOperator<?, ?> sourceOperator,
+      StreamConfig sourceOpConfig,
+      StreamConfig offloadedOpConfig) {
+    boolean canOutputRowVector = canOutputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+    boolean canInputRowVector = canInputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+    setOffloadedStatePartitioner(
+        sourceOpConfig, offloadedOpConfig, 0, sourceOperator.getDescription());
+    setOffloadedStatePartitioner(
+        sourceOpConfig, offloadedOpConfig, 1, sourceOperator.getDescription());
+    Class<?> inClass = canInputRowVector ? StatefulRecord.class : 
RowData.class;
+    Class<?> outClass = canOutputRowVector ? StatefulRecord.class : 
RowData.class;
+    GlutenTwoInputOperator<?, ?> newTwoInputOp =
+        new GlutenTwoInputOperator<>(
+            planNode,
+            sourceOperator.getLeftId(),
+            sourceOperator.getRightId(),
+            sourceOperator.getLeftInputType(),
+            sourceOperator.getRightInputType(),
+            sourceOperator.getOutputTypes(),
+            inClass,
+            outClass);
+    offloadedOpConfig.setStreamOperator(newTwoInputOp);
+    offloadedOpConfig.setStatePartitioner(0, new GlutenKeySelector());
+    offloadedOpConfig.setStatePartitioner(1, new GlutenKeySelector());
+    if (canOutputRowVector) {
+      setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
+    }
+    if (canInputRowVector) {
+      setOffloadedInputSerializersForTwoInputOperator(offloadedOpConfig, 
sourceOperator);
+    }
+  }
+
+  private void setOffloadedOutputSerializer(StreamConfig opConfig, 
GlutenOperator operator) {
+    RowType rowType = 
operator.getOutputTypes().entrySet().iterator().next().getValue();
+    opConfig.setTypeSerializerOut(new GlutenStatefulRecordSerializer(rowType, 
operator.getId()));
+  }
+
+  private void setOffloadedInputSerializer(StreamConfig opConfig, 
GlutenOperator operator) {
+    opConfig.setupNetworkInputs(
+        new GlutenStatefulRecordSerializer(operator.getInputType(), 
operator.getId()));
+  }
+
+  private void setOffloadedInputSerializersForTwoInputOperator(
+      StreamConfig opConfig, GlutenTwoInputOperator<?, ?> operator) {
+    opConfig.setupNetworkInputs(
+        new GlutenStatefulRecordSerializer(operator.getLeftInputType(), 
operator.getId()),
+        new GlutenStatefulRecordSerializer(operator.getRightInputType(), 
operator.getId()));
+  }
+
+  private void setOffloadedStatePartitioner(
+      StreamConfig sourceOpConfig,
+      StreamConfig offloadedOpConfig,
+      int inputIndex,
+      String operatorDescription) {
+    KeySelector<?, ?> keySelector = 
sourceOpConfig.getStatePartitioner(inputIndex, userClassloader);
+    if (keySelector != null) {
+      LOG.info(
+          "State partitioner ({}) found in input {} of operator {}, change it 
to GlutenKeySelector.",
+          keySelector.getClass().getName(),
+          inputIndex,
+          operatorDescription);
+      offloadedOpConfig.setStatePartitioner(inputIndex, new 
GlutenKeySelector());
+    }
+  }
+
+  private StreamConfig findLastOperatorInChain(JobVertex vertex) {
+    StreamConfig rootStreamConfig = new 
StreamConfig(vertex.getConfiguration());
+    Map<Integer, StreamConfig> chainedConfigs =
+        rootStreamConfig.getTransitiveChainedTaskConfigs(userClassloader);
+    chainedConfigs.put(rootStreamConfig.getVertexID(), rootStreamConfig);
+
+    // Find the last operator (the one with no chained outputs)
+    for (StreamConfig config : chainedConfigs.values()) {
+      List<StreamEdge> chainedOutputs = 
config.getChainedOutputs(userClassloader);
+      if (chainedOutputs == null || chainedOutputs.isEmpty()) {
+        return config;
+      }
+    }
+
+    // If no last operator found, use the root config
+    return rootStreamConfig;
+  }
+
+  private StreamNode mockStreamNode(StreamConfig streamConfig) {
+    return new StreamNode(
+        streamConfig.getVertexID(),
+        null,
+        null,
+        (StreamOperatorFactory<?>) 
streamConfig.getStreamOperatorFactory(userClassloader),
+        streamConfig.getOperatorName(),
+        null);
+  }
+
+  // Update stream edges when vertices have been changed due to offloading
+  private void visitAndUpdateStreamEdges(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph sourceChainSliceGraph,
+      OperatorChainSliceGraph offloadedChainSliceGraph) {
+    OperatorChainSlice offloadedChainSlice =
+        offloadedChainSliceGraph.getSlice(sourceChainSlice.id());
+    if (offloadedChainSlice.isOffloadable()) {
+      updateStreamEdgesForOffloadedSlice(
+          sourceChainSlice, sourceChainSliceGraph, offloadedChainSliceGraph, 
offloadedChainSlice);
+    }
+
+    // Recursively update downstream chain slices
+    for (Integer downstreamSliceId : sourceChainSlice.getOutputs()) {
+      visitAndUpdateStreamEdges(
+          sourceChainSliceGraph.getSlice(downstreamSliceId),
+          sourceChainSliceGraph,
+          offloadedChainSliceGraph);
+    }
+  }
+
+  private void updateStreamEdgesForOffloadedSlice(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph sourceChainSliceGraph,
+      OperatorChainSliceGraph offloadedChainSliceGraph,
+      OperatorChainSlice offloadedChainSlice) {
+    List<Integer> downstreamSliceIds = sourceChainSlice.getOutputs();
+    if (downstreamSliceIds.isEmpty()) {
+      StreamConfig offloadedOpConfig = 
offloadedChainSlice.getOperatorConfigs().get(0);
+      offloadedOpConfig.setChainedOutputs(new ArrayList<>());
+      return;
+    }
+
+    List<StreamEdge> newOutputEdges = new ArrayList<>();
+    List<StreamConfig> sourceOperatorConfigs = 
sourceChainSlice.getOperatorConfigs();
+    StreamConfig lastSourceOpConfig = 
sourceOperatorConfigs.get(sourceOperatorConfigs.size() - 1);
+    List<StreamEdge> originalOutputEdges = 
lastSourceOpConfig.getChainedOutputs(userClassloader);
+
+    for (int i = 0; i < downstreamSliceIds.size(); i++) {
+      Integer downstreamSliceId = downstreamSliceIds.get(i);
+      OperatorChainSlice downstreamOffloadedSlice =
+          offloadedChainSliceGraph.getSlice(downstreamSliceId);
+      StreamConfig downstreamOpConfig = 
downstreamOffloadedSlice.getOperatorConfigs().get(0);
+      StreamEdge originalEdge = originalOutputEdges.get(i);
+      StreamEdge newEdge = createStreamEdge(originalEdge, downstreamOpConfig, 
offloadedChainSlice);
+      newOutputEdges.add(newEdge);
+    }
+
+    StreamConfig offloadedOpConfig = 
offloadedChainSlice.getOperatorConfigs().get(0);
+    offloadedOpConfig.setChainedOutputs(newOutputEdges);
+  }
+
+  private StreamEdge createStreamEdge(
+      StreamEdge originalEdge,
+      StreamConfig downstreamOpConfig,
+      OperatorChainSlice offloadedChainSlice) {
+    StreamConfig sourceOpConfig = 
offloadedChainSlice.getOperatorConfigs().get(0);
+    return new StreamEdge(
+        mockStreamNode(sourceOpConfig),
+        mockStreamNode(downstreamOpConfig),
+        originalEdge.getTypeNumber(),
+        originalEdge.getBufferTimeout(),
+        originalEdge.getPartitioner(),
+        originalEdge.getOutputTag(),
+        originalEdge.getExchangeMode(),
+        0, // default value
+        originalEdge.getIntermediateDatasetIdToProduce());
+  }
+
+  private void serializeAllOperatorsConfigs(OperatorChainSliceGraph 
chainSliceGraph) {
+    for (OperatorChainSlice chainSlice : chainSliceGraph.getSlices().values()) 
{
+      for (StreamConfig opConfig : chainSlice.getOperatorConfigs()) {
+        opConfig.serializeAllConfigs();
+      }
+    }
+  }
+
+  private Map<Integer, StreamConfig> collectChainedConfigs(
+      OperatorChainSlice sourceChainSlice, OperatorChainSliceGraph 
offloadedChainSliceGraph) {
+    Map<Integer, StreamConfig> chainedConfigs = new HashMap<>();
+    if (!sourceChainSlice.isOffloadable()) {
+      List<StreamConfig> operatorConfigs = 
sourceChainSlice.getOperatorConfigs();
+      for (int i = 1; i < operatorConfigs.size(); i++) {
+        StreamConfig opConfig = operatorConfigs.get(i);
+        chainedConfigs.put(opConfig.getVertexID(), opConfig);
+      }
+    }
+    for (OperatorChainSlice chainSlice : 
offloadedChainSliceGraph.getSlices().values()) {
+      if (chainSlice.id().equals(sourceChainSlice.id())) {
+        continue;
+      }
+      for (StreamConfig opConfig : chainSlice.getOperatorConfigs()) {
+        chainedConfigs.put(opConfig.getVertexID(), opConfig);
+      }
+    }
+    return chainedConfigs;
+  }
+
+  private void updateSourceConfigIfOffloadable(
+      StreamConfig sourceConfig,
+      StreamConfig offloadedSourceConfig,
+      OperatorChainSlice sourceChainSlice,
+      Map<Integer, StreamConfig> chainedConfigs) {
+    if (sourceChainSlice.isOffloadable()) {
+      // Update the first operator config
+      sourceConfig.setStreamOperatorFactory(
+          offloadedSourceConfig.getStreamOperatorFactory(userClassloader));
+      
sourceConfig.setChainedOutputs(offloadedSourceConfig.getChainedOutputs(userClassloader));
+      sourceConfig.setTypeSerializerOut(
+          offloadedSourceConfig.getTypeSerializerOut(userClassloader));
+      sourceConfig.setInputs(offloadedSourceConfig.getInputs(userClassloader));
+      updateStatePartitioners(sourceConfig, offloadedSourceConfig);
+    }
+  }
+
+  private void updateStatePartitioners(StreamConfig sourceConfig, StreamConfig 
offloadedConfig) {
+    KeySelector<?, ?> keySelector0 = offloadedConfig.getStatePartitioner(0, 
userClassloader);
+    if (keySelector0 != null) {
+      sourceConfig.setStatePartitioner(0, keySelector0);
+    }
+    KeySelector<?, ?> keySelector1 = offloadedConfig.getStatePartitioner(1, 
userClassloader);
+    if (keySelector1 != null) {
+      sourceConfig.setStatePartitioner(1, keySelector1);
+    }
+  }
+
+  private boolean areAllOffloadable(
+      OperatorChainSliceGraph chainSliceGraph, List<Integer> chainIds) {
+    for (Integer chainId : chainIds) {
+      OperatorChainSlice chainSlice = chainSliceGraph.getSlice(chainId);
+      if (!chainSlice.isOffloadable()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean canOutputRowVector(
+      OperatorChainSlice chainSlice, OperatorChainSliceGraph chainSliceGraph, 
JobVertex jobVertex) {
+    List<Integer> downstreamSliceIds = chainSlice.getOutputs();
+
+    // If chainSlice has downstream in the operator chain, check these 
downstream
+    if (!downstreamSliceIds.isEmpty()) {
+      return checkDownstreamSlicesInChain(chainSliceGraph, downstreamSliceIds);
+    } else {
+      // If chainSlice has no downstream in the operator chain, check 
downstream JobVertex from
+      // JobGraph level
+      return checkDownstreamVerticesFromJobGraph(jobVertex);
+    }
+  }
+
+  private boolean checkDownstreamSlicesInChain(
+      OperatorChainSliceGraph chainSliceGraph, List<Integer> 
downstreamSliceIds) {
+    for (Integer downstreamSliceId : downstreamSliceIds) {
+      OperatorChainSlice downstreamSlice = 
chainSliceGraph.getSlice(downstreamSliceId);
+      if (!downstreamSlice.isOffloadable()) {
+        return false;
+      }
+      List<Integer> inputSliceIds = downstreamSlice.getInputs();
+      if (!areAllOffloadable(chainSliceGraph, inputSliceIds)) {

Review Comment:
   Can we simply use the following code for brevity?
   ```java
   
inputSliceIds.stream().map(chainSliceGraph::getSlice).allMatch(OperatorChainSlice::isOffloadable)
   ```



##########
gluten-flink/runtime/src/main/java/org/apache/gluten/client/OperatorChainSliceGraph.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OperatorChainSliceGraph {
+  private Map<Integer, OperatorChainSlice> slices;
+
+  public OperatorChainSliceGraph() {
+    slices = new HashMap<>();
+  }
+
+  public void addSlice(Integer id, OperatorChainSlice chainSlice) {
+    slices.put(id, chainSlice);
+  }
+
+  public OperatorChainSlice getSlice(Integer id) {
+    return slices.get(id);
+  }
+
+  public OperatorChainSlice getSourceSlice() {
+    List<OperatorChainSlice> sourceCandidates = new ArrayList<>();
+
+    for (OperatorChainSlice chainSlice : slices.values()) {
+      if (chainSlice.getInputs().isEmpty()) {
+        sourceCandidates.add(chainSlice);
+      }
+    }
+
+    if (sourceCandidates.isEmpty()) {
+      throw new IllegalStateException(
+          "No source operator chain slice found (no operator chain slice with 
empty inputs)");
+    } else if (sourceCandidates.size() > 1) {
+      throw new IllegalStateException(
+          "Multiple source operator chain slices found: "
+              + sourceCandidates.size()
+              + " operator chain slices have empty inputs");

Review Comment:
   Nit: simplify the check with Flink's Preconditions class.
   ```java
   Preconditions.checkState(sourceCandidates.size() == 1, "Expected exactly one 
source operator chain slice with empty inputs, but found %s", 
sourceCandidates.size())
   ```



##########
gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java:
##########
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.client;
+
+import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.keyselector.GlutenKeySelector;
+import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
+import org.apache.gluten.table.runtime.operators.GlutenSourceFunction;
+import org.apache.gluten.table.runtime.operators.GlutenTwoInputOperator;
+import 
org.apache.gluten.table.runtime.typeutils.GlutenStatefulRecordSerializer;
+import org.apache.gluten.util.Utils;
+
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/*
+ * Generates an offloaded JobGraph by transforming offloadable operators to 
Gluten operators.
+ * Main workflow:
+ * 1. For each JobVertex, generate an OperatorChainSliceGraph to identify 
offloadable slices.
+ * 2. Recursively visit chain slices: create offloaded operators for 
offloadable slices,
+ *    keep original operators for unoffloadable ones.
+ * 3. For offloadable operators: update input/output serializers and state 
partitioners
+ *    based on upstream/downstream operator capabilities.
+ * 4. Update stream edges and serialize all operator configurations.
+ */
+public class OffloadedJobGraphGenerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OffloadedJobGraphGenerator.class);
+  private final JobGraph jobGraph;
+  private final ClassLoader userClassloader;
+  private boolean hasGenerated = false;
+
+  public OffloadedJobGraphGenerator(JobGraph jobGraph, ClassLoader 
userClassloader) {
+    this.jobGraph = jobGraph;
+    this.userClassloader = userClassloader;
+  }
+
+  public JobGraph generate() {
+    if (hasGenerated) {
+      throw new IllegalStateException("JobGraph has been generated.");
+    }
+    hasGenerated = true;
+    for (JobVertex jobVertex : jobGraph.getVertices()) {
+      offloadJobVertex(jobVertex);
+    }
+    return jobGraph;
+  }
+
+  private void offloadJobVertex(JobVertex jobVertex) {
+    OperatorChainSliceGraphGenerator graphGenerator =
+        new OperatorChainSliceGraphGenerator(jobVertex, userClassloader);
+    OperatorChainSliceGraph chainSliceGraph = graphGenerator.getGraph();
+    LOG.info("OperatorChainSliceGraph:\n{}", chainSliceGraph);
+
+    OperatorChainSlice sourceChainSlice = chainSliceGraph.getSourceSlice();
+    OperatorChainSliceGraph offloadedChainSliceGraph = new 
OperatorChainSliceGraph();
+    visitAndOffloadChainOperators(
+        sourceChainSlice, chainSliceGraph, offloadedChainSliceGraph, 0, 
jobVertex);
+    visitAndUpdateStreamEdges(sourceChainSlice, chainSliceGraph, 
offloadedChainSliceGraph);
+    serializeAllOperatorsConfigs(offloadedChainSliceGraph);
+
+    StreamConfig sourceConfig = sourceChainSlice.getOperatorConfigs().get(0);
+    StreamConfig offloadedSourceConfig =
+        
offloadedChainSliceGraph.getSlice(sourceChainSlice.id()).getOperatorConfigs().get(0);
+
+    Map<Integer, StreamConfig> chainedConfigs =
+        collectChainedConfigs(sourceChainSlice, offloadedChainSliceGraph);
+    updateSourceConfigIfOffloadable(
+        sourceConfig, offloadedSourceConfig, sourceChainSlice, chainedConfigs);
+    sourceConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
+    sourceConfig.serializeAllConfigs();
+  }
+
+  // Process and offload operator chain slices recursively
+  private void visitAndOffloadChainOperators(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph sourceChainSliceGraph,
+      OperatorChainSliceGraph offloadedChainSliceGraph,
+      Integer chainIndex,
+      JobVertex jobVertex) {
+    OperatorChainSlice processedChainSlice;
+    if (sourceChainSlice.isOffloadable()) {
+      processedChainSlice =
+          createOffloadedOperatorChainSlice(
+              sourceChainSliceGraph, sourceChainSlice, chainIndex, jobVertex);
+      chainIndex = chainIndex + 1;
+    } else {
+      processedChainSlice = 
createUnoffloadableOperatorChainSlice(sourceChainSlice, chainIndex);
+      chainIndex = chainIndex + sourceChainSlice.getOperatorConfigs().size();
+    }
+
+    processedChainSlice.getInputs().addAll(sourceChainSlice.getInputs());
+    processedChainSlice.getOutputs().addAll(sourceChainSlice.getOutputs());
+    offloadedChainSliceGraph.addSlice(sourceChainSlice.id(), 
processedChainSlice);
+
+    // Recursively process downstream chain slices
+    for (Integer downstreamSliceId : sourceChainSlice.getOutputs()) {
+      OperatorChainSlice downstreamSourceSlice = 
sourceChainSliceGraph.getSlice(downstreamSliceId);
+      OperatorChainSlice downstreamProcessedSlice =
+          offloadedChainSliceGraph.getSlice(downstreamSliceId);
+      if (downstreamProcessedSlice == null) {
+        visitAndOffloadChainOperators(
+            downstreamSourceSlice,
+            sourceChainSliceGraph,
+            offloadedChainSliceGraph,
+            chainIndex,
+            jobVertex);
+      }
+    }
+  }
+
+  // Keep the original operator chain slice as is.
+  private OperatorChainSlice createUnoffloadableOperatorChainSlice(
+      OperatorChainSlice sourceChainSlice, Integer chainIndex) {
+    OperatorChainSlice unoffloadableChainSlice = new 
OperatorChainSlice(sourceChainSlice.id());
+    List<StreamConfig> operatorConfigs = sourceChainSlice.getOperatorConfigs();
+    for (StreamConfig opConfig : operatorConfigs) {
+      StreamConfig newOpConfig = new StreamConfig(new 
Configuration(opConfig.getConfiguration()));
+      newOpConfig.setChainIndex(chainIndex);
+      unoffloadableChainSlice.getOperatorConfigs().add(newOpConfig);
+    }
+    unoffloadableChainSlice.setOffloadable(false);
+    return unoffloadableChainSlice;
+  }
+
+  // Create offloadable operator chain slice, and update the input/output 
channel serializers
+  private OperatorChainSlice createOffloadedOperatorChainSlice(
+      OperatorChainSliceGraph chainSliceGraph,
+      OperatorChainSlice sourceChainSlice,
+      Integer chainIndex,
+      JobVertex jobVertex) {
+    OperatorChainSlice offloadedChainSlice = new 
OperatorChainSlice(sourceChainSlice.id());
+    List<StreamConfig> operatorConfigs = sourceChainSlice.getOperatorConfigs();
+
+    // May coalesce multiple operators into one in the future.
+    if (operatorConfigs.size() != 1) {
+      throw new UnsupportedOperationException(
+          "Only one operator is supported for offloaded operator chain 
slice.");
+    }
+
+    StreamConfig sourceOpConfig = operatorConfigs.get(0);
+    GlutenOperator sourceOperator = Utils.getGlutenOperator(sourceOpConfig, 
userClassloader).get();
+    StatefulPlanNode planNode = sourceOperator.getPlanNode();
+    // Create a new operator config for the offloaded operator.
+    StreamConfig offloadedOpConfig =
+        new StreamConfig(new Configuration(sourceOpConfig.getConfiguration()));
+    if (sourceOperator instanceof GlutenStreamSource) {
+      boolean canOutputRowVector = canOutputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+      Class<?> outClass = canOutputRowVector ? StatefulRecord.class : 
RowData.class;
+      GlutenStreamSource newSourceOp =
+          new GlutenStreamSource(
+              new GlutenSourceFunction<>(
+                  planNode,
+                  sourceOperator.getOutputTypes(),
+                  sourceOperator.getId(),
+                  ((GlutenStreamSource) sourceOperator).getConnectorSplit(),
+                  outClass));
+      offloadedOpConfig.setStreamOperator(newSourceOp);
+      if (canOutputRowVector) {
+        setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
+      }
+    } else if (sourceOperator instanceof GlutenOneInputOperator) {
+      createOffloadedOneInputOperator(
+          sourceChainSlice,
+          chainSliceGraph,
+          jobVertex,
+          planNode,
+          (GlutenOneInputOperator<?, ?>) sourceOperator,
+          sourceOpConfig,
+          offloadedOpConfig);
+    } else if (sourceOperator instanceof GlutenTwoInputOperator) {
+      createOffloadedTwoInputOperator(
+          sourceChainSlice,
+          chainSliceGraph,
+          jobVertex,
+          planNode,
+          (GlutenTwoInputOperator<?, ?>) sourceOperator,
+          sourceOpConfig,
+          offloadedOpConfig);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported operator type for offloading: " + 
sourceOperator.getClass().getName());
+    }
+
+    offloadedOpConfig.setChainIndex(chainIndex);
+    offloadedChainSlice.getOperatorConfigs().add(offloadedOpConfig);
+    offloadedChainSlice.setOffloadable(true);
+    return offloadedChainSlice;
+  }
+
+  private void createOffloadedOneInputOperator(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph chainSliceGraph,
+      JobVertex jobVertex,
+      StatefulPlanNode planNode,
+      GlutenOneInputOperator<?, ?> sourceOperator,
+      StreamConfig sourceOpConfig,
+      StreamConfig offloadedOpConfig) {
+    boolean canOutputRowVector = canOutputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+    boolean canInputRowVector = canInputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+    Class<?> inClass = canInputRowVector ? StatefulRecord.class : 
RowData.class;
+    Class<?> outClass = canOutputRowVector ? StatefulRecord.class : 
RowData.class;
+    GlutenOneInputOperator<?, ?> newOneInputOp =
+        new GlutenOneInputOperator<>(
+            planNode,
+            sourceOperator.getId(),
+            sourceOperator.getInputType(),
+            sourceOperator.getOutputTypes(),
+            inClass,
+            outClass,
+            sourceOperator.getDescription());
+    offloadedOpConfig.setStreamOperator(newOneInputOp);
+    if (canOutputRowVector) {
+      setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
+    }
+    if (canInputRowVector) {
+      setOffloadedInputSerializer(offloadedOpConfig, sourceOperator);
+      setOffloadedStatePartitioner(
+          sourceOpConfig, offloadedOpConfig, 0, 
sourceOperator.getDescription());
+    }
+  }
+
+  private void createOffloadedTwoInputOperator(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph chainSliceGraph,
+      JobVertex jobVertex,
+      StatefulPlanNode planNode,
+      GlutenTwoInputOperator<?, ?> sourceOperator,
+      StreamConfig sourceOpConfig,
+      StreamConfig offloadedOpConfig) {
+    boolean canOutputRowVector = canOutputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+    boolean canInputRowVector = canInputRowVector(sourceChainSlice, 
chainSliceGraph, jobVertex);
+    setOffloadedStatePartitioner(
+        sourceOpConfig, offloadedOpConfig, 0, sourceOperator.getDescription());
+    setOffloadedStatePartitioner(
+        sourceOpConfig, offloadedOpConfig, 1, sourceOperator.getDescription());
+    Class<?> inClass = canInputRowVector ? StatefulRecord.class : 
RowData.class;
+    Class<?> outClass = canOutputRowVector ? StatefulRecord.class : 
RowData.class;
+    GlutenTwoInputOperator<?, ?> newTwoInputOp =
+        new GlutenTwoInputOperator<>(
+            planNode,
+            sourceOperator.getLeftId(),
+            sourceOperator.getRightId(),
+            sourceOperator.getLeftInputType(),
+            sourceOperator.getRightInputType(),
+            sourceOperator.getOutputTypes(),
+            inClass,
+            outClass);
+    offloadedOpConfig.setStreamOperator(newTwoInputOp);
+    offloadedOpConfig.setStatePartitioner(0, new GlutenKeySelector());
+    offloadedOpConfig.setStatePartitioner(1, new GlutenKeySelector());
+    if (canOutputRowVector) {
+      setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
+    }
+    if (canInputRowVector) {
+      setOffloadedInputSerializersForTwoInputOperator(offloadedOpConfig, 
sourceOperator);
+    }
+  }
+
+  private void setOffloadedOutputSerializer(StreamConfig opConfig, 
GlutenOperator operator) {
+    RowType rowType = 
operator.getOutputTypes().entrySet().iterator().next().getValue();
+    opConfig.setTypeSerializerOut(new GlutenStatefulRecordSerializer(rowType, 
operator.getId()));
+  }
+
+  private void setOffloadedInputSerializer(StreamConfig opConfig, 
GlutenOperator operator) {
+    opConfig.setupNetworkInputs(
+        new GlutenStatefulRecordSerializer(operator.getInputType(), 
operator.getId()));
+  }
+
+  private void setOffloadedInputSerializersForTwoInputOperator(
+      StreamConfig opConfig, GlutenTwoInputOperator<?, ?> operator) {
+    opConfig.setupNetworkInputs(
+        new GlutenStatefulRecordSerializer(operator.getLeftInputType(), 
operator.getId()),
+        new GlutenStatefulRecordSerializer(operator.getRightInputType(), 
operator.getId()));
+  }
+
+  private void setOffloadedStatePartitioner(
+      StreamConfig sourceOpConfig,
+      StreamConfig offloadedOpConfig,
+      int inputIndex,
+      String operatorDescription) {
+    KeySelector<?, ?> keySelector = 
sourceOpConfig.getStatePartitioner(inputIndex, userClassloader);
+    if (keySelector != null) {
+      LOG.info(
+          "State partitioner ({}) found in input {} of operator {}, change it 
to GlutenKeySelector.",
+          keySelector.getClass().getName(),
+          inputIndex,
+          operatorDescription);
+      offloadedOpConfig.setStatePartitioner(inputIndex, new 
GlutenKeySelector());
+    }
+  }
+
+  private StreamConfig findLastOperatorInChain(JobVertex vertex) {
+    StreamConfig rootStreamConfig = new 
StreamConfig(vertex.getConfiguration());
+    Map<Integer, StreamConfig> chainedConfigs =
+        rootStreamConfig.getTransitiveChainedTaskConfigs(userClassloader);
+    chainedConfigs.put(rootStreamConfig.getVertexID(), rootStreamConfig);
+
+    // Find the last operator (the one with no chained outputs)
+    for (StreamConfig config : chainedConfigs.values()) {
+      List<StreamEdge> chainedOutputs = 
config.getChainedOutputs(userClassloader);
+      if (chainedOutputs == null || chainedOutputs.isEmpty()) {
+        return config;
+      }
+    }
+
+    // If no last operator found, use the root config
+    return rootStreamConfig;
+  }
+
+  private StreamNode mockStreamNode(StreamConfig streamConfig) {
+    return new StreamNode(
+        streamConfig.getVertexID(),
+        null,
+        null,
+        (StreamOperatorFactory<?>) 
streamConfig.getStreamOperatorFactory(userClassloader),
+        streamConfig.getOperatorName(),
+        null);
+  }
+
+  // Update stream edges when vertices have been changed due to offloading
+  private void visitAndUpdateStreamEdges(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph sourceChainSliceGraph,
+      OperatorChainSliceGraph offloadedChainSliceGraph) {
+    OperatorChainSlice offloadedChainSlice =
+        offloadedChainSliceGraph.getSlice(sourceChainSlice.id());
+    if (offloadedChainSlice.isOffloadable()) {
+      updateStreamEdgesForOffloadedSlice(
+          sourceChainSlice, sourceChainSliceGraph, offloadedChainSliceGraph, 
offloadedChainSlice);
+    }
+
+    // Recursively update downstream chain slices
+    for (Integer downstreamSliceId : sourceChainSlice.getOutputs()) {
+      visitAndUpdateStreamEdges(
+          sourceChainSliceGraph.getSlice(downstreamSliceId),
+          sourceChainSliceGraph,
+          offloadedChainSliceGraph);
+    }
+  }
+
+  private void updateStreamEdgesForOffloadedSlice(
+      OperatorChainSlice sourceChainSlice,
+      OperatorChainSliceGraph sourceChainSliceGraph,
+      OperatorChainSliceGraph offloadedChainSliceGraph,
+      OperatorChainSlice offloadedChainSlice) {
+    List<Integer> downstreamSliceIds = sourceChainSlice.getOutputs();
+    if (downstreamSliceIds.isEmpty()) {
+      StreamConfig offloadedOpConfig = 
offloadedChainSlice.getOperatorConfigs().get(0);
+      offloadedOpConfig.setChainedOutputs(new ArrayList<>());
+      return;
+    }
+
+    List<StreamEdge> newOutputEdges = new ArrayList<>();
+    List<StreamConfig> sourceOperatorConfigs = 
sourceChainSlice.getOperatorConfigs();
+    StreamConfig lastSourceOpConfig = 
sourceOperatorConfigs.get(sourceOperatorConfigs.size() - 1);
+    List<StreamEdge> originalOutputEdges = 
lastSourceOpConfig.getChainedOutputs(userClassloader);
+
+    for (int i = 0; i < downstreamSliceIds.size(); i++) {
+      Integer downstreamSliceId = downstreamSliceIds.get(i);
+      OperatorChainSlice downstreamOffloadedSlice =
+          offloadedChainSliceGraph.getSlice(downstreamSliceId);
+      StreamConfig downstreamOpConfig = 
downstreamOffloadedSlice.getOperatorConfigs().get(0);
+      StreamEdge originalEdge = originalOutputEdges.get(i);
+      StreamEdge newEdge = createStreamEdge(originalEdge, downstreamOpConfig, 
offloadedChainSlice);
+      newOutputEdges.add(newEdge);
+    }
+
+    StreamConfig offloadedOpConfig = 
offloadedChainSlice.getOperatorConfigs().get(0);
+    offloadedOpConfig.setChainedOutputs(newOutputEdges);
+  }
+
+  private StreamEdge createStreamEdge(
+      StreamEdge originalEdge,
+      StreamConfig downstreamOpConfig,
+      OperatorChainSlice offloadedChainSlice) {
+    StreamConfig sourceOpConfig = 
offloadedChainSlice.getOperatorConfigs().get(0);
+    return new StreamEdge(
+        mockStreamNode(sourceOpConfig),
+        mockStreamNode(downstreamOpConfig),
+        originalEdge.getTypeNumber(),
+        originalEdge.getBufferTimeout(),
+        originalEdge.getPartitioner(),
+        originalEdge.getOutputTag(),
+        originalEdge.getExchangeMode(),
+        0, // default value
+        originalEdge.getIntermediateDatasetIdToProduce());
+  }
+
+  private void serializeAllOperatorsConfigs(OperatorChainSliceGraph 
chainSliceGraph) {
+    for (OperatorChainSlice chainSlice : chainSliceGraph.getSlices().values()) 
{
+      for (StreamConfig opConfig : chainSlice.getOperatorConfigs()) {
+        opConfig.serializeAllConfigs();
+      }
+    }
+  }
+
+  private Map<Integer, StreamConfig> collectChainedConfigs(
+      OperatorChainSlice sourceChainSlice, OperatorChainSliceGraph 
offloadedChainSliceGraph) {
+    Map<Integer, StreamConfig> chainedConfigs = new HashMap<>();
+    if (!sourceChainSlice.isOffloadable()) {
+      List<StreamConfig> operatorConfigs = 
sourceChainSlice.getOperatorConfigs();
+      for (int i = 1; i < operatorConfigs.size(); i++) {
+        StreamConfig opConfig = operatorConfigs.get(i);
+        chainedConfigs.put(opConfig.getVertexID(), opConfig);
+      }
+    }
+    for (OperatorChainSlice chainSlice : 
offloadedChainSliceGraph.getSlices().values()) {
+      if (chainSlice.id().equals(sourceChainSlice.id())) {
+        continue;
+      }
+      for (StreamConfig opConfig : chainSlice.getOperatorConfigs()) {
+        chainedConfigs.put(opConfig.getVertexID(), opConfig);
+      }
+    }
+    return chainedConfigs;
+  }
+
+  private void updateSourceConfigIfOffloadable(
+      StreamConfig sourceConfig,
+      StreamConfig offloadedSourceConfig,
+      OperatorChainSlice sourceChainSlice,
+      Map<Integer, StreamConfig> chainedConfigs) {
+    if (sourceChainSlice.isOffloadable()) {
+      // Update the first operator config
+      sourceConfig.setStreamOperatorFactory(
+          offloadedSourceConfig.getStreamOperatorFactory(userClassloader));
+      
sourceConfig.setChainedOutputs(offloadedSourceConfig.getChainedOutputs(userClassloader));
+      sourceConfig.setTypeSerializerOut(
+          offloadedSourceConfig.getTypeSerializerOut(userClassloader));
+      sourceConfig.setInputs(offloadedSourceConfig.getInputs(userClassloader));
+      updateStatePartitioners(sourceConfig, offloadedSourceConfig);
+    }
+  }
+
+  private void updateStatePartitioners(StreamConfig sourceConfig, StreamConfig 
offloadedConfig) {
+    KeySelector<?, ?> keySelector0 = offloadedConfig.getStatePartitioner(0, 
userClassloader);
+    if (keySelector0 != null) {
+      sourceConfig.setStatePartitioner(0, keySelector0);
+    }
+    KeySelector<?, ?> keySelector1 = offloadedConfig.getStatePartitioner(1, 
userClassloader);
+    if (keySelector1 != null) {
+      sourceConfig.setStatePartitioner(1, keySelector1);
+    }
+  }
+
+  private boolean areAllOffloadable(
+      OperatorChainSliceGraph chainSliceGraph, List<Integer> chainIds) {
+    for (Integer chainId : chainIds) {
+      OperatorChainSlice chainSlice = chainSliceGraph.getSlice(chainId);
+      if (!chainSlice.isOffloadable()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean canOutputRowVector(
+      OperatorChainSlice chainSlice, OperatorChainSliceGraph chainSliceGraph, 
JobVertex jobVertex) {
+    List<Integer> downstreamSliceIds = chainSlice.getOutputs();
+
+    // If chainSlice has downstream in the operator chain, check these 
downstream
+    if (!downstreamSliceIds.isEmpty()) {
+      return checkDownstreamSlicesInChain(chainSliceGraph, downstreamSliceIds);
+    } else {
+      // If chainSlice has no downstream in the operator chain, check 
downstream JobVertex from
+      // JobGraph level
+      return checkDownstreamVerticesFromJobGraph(jobVertex);
+    }
+  }
+
+  private boolean checkDownstreamSlicesInChain(
+      OperatorChainSliceGraph chainSliceGraph, List<Integer> 
downstreamSliceIds) {
+    for (Integer downstreamSliceId : downstreamSliceIds) {
+      OperatorChainSlice downstreamSlice = 
chainSliceGraph.getSlice(downstreamSliceId);
+      if (!downstreamSlice.isOffloadable()) {
+        return false;
+      }
+      List<Integer> inputSliceIds = downstreamSlice.getInputs();
+      if (!areAllOffloadable(chainSliceGraph, inputSliceIds)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean checkDownstreamVerticesFromJobGraph(JobVertex jobVertex) {
+    List<JobVertex> downstreamVertices = getDownstreamJobVertices(jobVertex);
+    if (downstreamVertices.isEmpty()) {
+      // If there is no downstream JobVertex, can output RowVector (no 
downstream needs
+      // conversion)
+      return true;
+    }
+    // Check if all downstream vertices' operators are gluten operators
+    for (JobVertex downstreamVertex : downstreamVertices) {
+      StreamConfig downstreamStreamConfig = new 
StreamConfig(downstreamVertex.getConfiguration());
+      Optional<GlutenOperator> glutenOperator =
+          Utils.getGlutenOperator(downstreamStreamConfig, userClassloader);
+      if (!glutenOperator.isPresent()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean canInputRowVector(

Review Comment:
   Nit: suggest the following names which seems more natural.
   ```
   canInputRowVector -> supportsVectorInput
   canOutputRowVector -> supportsVectorOutput
   ```



##########
gluten-flink/runtime/src/main/java/org/apache/gluten/client/OperatorChainSliceGraph.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OperatorChainSliceGraph {
+  private Map<Integer, OperatorChainSlice> slices;
+
+  public OperatorChainSliceGraph() {
+    slices = new HashMap<>();
+  }
+
+  public void addSlice(Integer id, OperatorChainSlice chainSlice) {
+    slices.put(id, chainSlice);
+  }
+
+  public OperatorChainSlice getSlice(Integer id) {
+    return slices.get(id);
+  }
+
+  public OperatorChainSlice getSourceSlice() {
+    List<OperatorChainSlice> sourceCandidates = new ArrayList<>();
+
+    for (OperatorChainSlice chainSlice : slices.values()) {
+      if (chainSlice.getInputs().isEmpty()) {
+        sourceCandidates.add(chainSlice);
+      }
+    }
+
+    if (sourceCandidates.isEmpty()) {
+      throw new IllegalStateException(
+          "No source operator chain slice found (no operator chain slice with 
empty inputs)");
+    } else if (sourceCandidates.size() > 1) {
+      throw new IllegalStateException(
+          "Multiple source operator chain slices found: "
+              + sourceCandidates.size()
+              + " operator chain slices have empty inputs");
+    }
+
+    return sourceCandidates.get(0);
+  }
+
+  public Map<Integer, OperatorChainSlice> getSlices() {
+    return slices;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (OperatorChainSlice chainSlice : slices.values()) {
+      sb.append("Slice ID: ")
+          .append(chainSlice.id())
+          .append(", offloadable: ")
+          .append(chainSlice.isOffloadable())
+          .append("\n");
+      sb.append("  Inputs: ").append(chainSlice.getInputs()).append("\n");
+      sb.append("  Outputs: ").append(chainSlice.getOutputs()).append("\n");
+      String operatorConfigs =
+          chainSlice.getOperatorConfigs().stream()
+              .map(config -> config.getOperatorName() + "(" + 
config.getVertexID() + ")")
+              .reduce((a, b) -> a + ", " + b)

Review Comment:
   Can we simplify it with the code below?
   
   `map(xxx).collect(Collectors.joining(", ")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to