This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 41fbbecbf9 [GLUTEN-10212][FLINK] Fix resources not being released in 
GlutenRowVectorSerializer (#10265)
41fbbecbf9 is described below

commit 41fbbecbf97641bd63f33a4214799994ff934183
Author: shuai.xu <[email protected]>
AuthorDate: Mon Jul 28 18:00:21 2025 +0800

    [GLUTEN-10212][FLINK] Fix resources not being released in 
GlutenRowVectorSerializer (#10265)
---
 .../runtime/io/AbstractStreamTaskNetworkInput.java | 255 +++++++++++++++++++++
 .../typeutils/GlutenRowVectorSerializer.java       |  25 +-
 2 files changed, 275 insertions(+), 5 deletions(-)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
new file mode 100644
index 0000000000..e3e1caabc6
--- /dev/null
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecordsChecker;
+import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class for network-based StreamTaskInput where each channel has a 
designated {@link
+ * RecordDeserializer} for spanning records. Specific implementation bind it 
to a specific {@link
+ * RecordDeserializer}.
+ */
+public abstract class AbstractStreamTaskNetworkInput<
+        T, R extends 
RecordDeserializer<DeserializationDelegate<StreamElement>>>
+    implements StreamTaskInput<T> {
+  protected final CheckpointedInputGate checkpointedInputGate;
+  protected final DeserializationDelegate<StreamElement> 
deserializationDelegate;
+  protected final TypeSerializer<T> inputSerializer;
+  protected final Map<InputChannelInfo, R> recordDeserializers;
+  protected final Map<InputChannelInfo, Integer> flattenedChannelIndices = new 
HashMap<>();
+  /** Valve that controls how watermarks and watermark statuses are forwarded. 
*/
+  protected final StatusWatermarkValve statusWatermarkValve;
+
+  protected final int inputIndex;
+  private final RecordAttributesCombiner recordAttributesCombiner;
+  private InputChannelInfo lastChannel = null;
+  private R currentRecordDeserializer = null;
+
+  protected final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;
+
+  public AbstractStreamTaskNetworkInput(
+      CheckpointedInputGate checkpointedInputGate,
+      TypeSerializer<T> inputSerializer,
+      StatusWatermarkValve statusWatermarkValve,
+      int inputIndex,
+      Map<InputChannelInfo, R> recordDeserializers,
+      CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
+    super();
+    this.checkpointedInputGate = checkpointedInputGate;
+    deserializationDelegate =
+        new NonReusingDeserializationDelegate<>(new 
StreamElementSerializer<>(inputSerializer));
+    this.inputSerializer = inputSerializer;
+
+    for (InputChannelInfo i : checkpointedInputGate.getChannelInfos()) {
+      flattenedChannelIndices.put(i, flattenedChannelIndices.size());
+    }
+
+    this.statusWatermarkValve = checkNotNull(statusWatermarkValve);
+    this.inputIndex = inputIndex;
+    this.recordDeserializers = checkNotNull(recordDeserializers);
+    this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
+    this.recordAttributesCombiner =
+        new 
RecordAttributesCombiner(checkpointedInputGate.getNumberOfInputChannels());
+  }
+
+  @Override
+  public DataInputStatus emitNext(DataOutput<T> output) throws Exception {
+
+    while (true) {
+      // get the stream element from the deserializer
+      if (currentRecordDeserializer != null) {
+        RecordDeserializer.DeserializationResult result;
+        try {
+          result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate);
+        } catch (IOException e) {
+          throw new IOException(
+              String.format("Can't get next record for channel %s", 
lastChannel), e);
+        }
+        if (result.isBufferConsumed()) {
+          currentRecordDeserializer = null;
+        }
+
+        if (result.isFullRecord()) {
+          final boolean breakBatchEmitting =
+              processElement(deserializationDelegate.getInstance(), output);
+          if (canEmitBatchOfRecords.check() && !breakBatchEmitting) {
+            continue;
+          }
+          return DataInputStatus.MORE_AVAILABLE;
+        }
+      }
+
+      Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
+      if (bufferOrEvent.isPresent()) {
+        // return to the mailbox after receiving a checkpoint barrier to avoid 
processing of
+        // data after the barrier before checkpoint is performed for unaligned 
checkpoint
+        // mode
+        if (bufferOrEvent.get().isBuffer()) {
+          processBuffer(bufferOrEvent.get());
+        } else {
+          DataInputStatus status = processEvent(bufferOrEvent.get());
+          if (status == DataInputStatus.MORE_AVAILABLE && 
canEmitBatchOfRecords.check()) {
+            continue;
+          }
+          return status;
+        }
+      } else {
+        if (checkpointedInputGate.isFinished()) {
+          checkState(
+              checkpointedInputGate.getAvailableFuture().isDone(),
+              "Finished BarrierHandler should be available");
+          return DataInputStatus.END_OF_INPUT;
+        }
+        return DataInputStatus.NOTHING_AVAILABLE;
+      }
+    }
+  }
+
+  /**
+   * Process the given stream element and returns whether to stop processing 
and return from the
+   * emitNext method so that the emitNext is invoked again right after 
processing the element to
+   * allow behavior change in emitNext method. For example, the behavior of 
emitNext may need to
+   * change right after process a RecordAttributes.
+   */
+  private boolean processElement(StreamElement streamElement, DataOutput<T> 
output)
+      throws Exception {
+    if (streamElement.isRecord()) {
+      output.emitRecord(streamElement.asRecord());
+      return false;
+    } else if (streamElement.isWatermark()) {
+      statusWatermarkValve.inputWatermark(
+          streamElement.asWatermark(), 
flattenedChannelIndices.get(lastChannel), output);
+      return false;
+    } else if (streamElement.isLatencyMarker()) {
+      output.emitLatencyMarker(streamElement.asLatencyMarker());
+      return false;
+    } else if (streamElement.isWatermarkStatus()) {
+      statusWatermarkValve.inputWatermarkStatus(
+          streamElement.asWatermarkStatus(), 
flattenedChannelIndices.get(lastChannel), output);
+      return false;
+    } else if (streamElement.isRecordAttributes()) {
+      recordAttributesCombiner.inputRecordAttributes(
+          streamElement.asRecordAttributes(), 
flattenedChannelIndices.get(lastChannel), output);
+      return true;
+    } else {
+      throw new UnsupportedOperationException("Unknown type of StreamElement");
+    }
+  }
+
+  protected DataInputStatus processEvent(BufferOrEvent bufferOrEvent) {
+    // Event received
+    final AbstractEvent event = bufferOrEvent.getEvent();
+    if (event.getClass() == EndOfData.class) {
+      switch (checkpointedInputGate.hasReceivedEndOfData()) {
+        case NOT_END_OF_DATA:
+          // skip
+          break;
+        case DRAINED:
+          return DataInputStatus.END_OF_DATA;
+        case STOPPED:
+          return DataInputStatus.STOPPED;
+      }
+    } else if (event.getClass() == EndOfPartitionEvent.class) {
+      // release the record deserializer immediately,
+      // which is very valuable in case of bounded stream
+      releaseDeserializer(bufferOrEvent.getChannelInfo());
+      if (checkpointedInputGate.isFinished()) {
+        return DataInputStatus.END_OF_INPUT;
+      }
+    } else if (event.getClass() == EndOfChannelStateEvent.class) {
+      if (checkpointedInputGate.allChannelsRecovered()) {
+        return DataInputStatus.END_OF_RECOVERY;
+      }
+    }
+    return DataInputStatus.MORE_AVAILABLE;
+  }
+
+  protected void processBuffer(BufferOrEvent bufferOrEvent) throws IOException 
{
+    lastChannel = bufferOrEvent.getChannelInfo();
+    checkState(lastChannel != null);
+    currentRecordDeserializer = 
getActiveSerializer(bufferOrEvent.getChannelInfo());
+    checkState(
+        currentRecordDeserializer != null, "currentRecordDeserializer has 
already been released");
+
+    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+  }
+
+  protected R getActiveSerializer(InputChannelInfo channelInfo) {
+    return recordDeserializers.get(channelInfo);
+  }
+
+  @Override
+  public int getInputIndex() {
+    return inputIndex;
+  }
+
+  @Override
+  public CompletableFuture<?> getAvailableFuture() {
+    if (currentRecordDeserializer != null) {
+      return AVAILABLE;
+    }
+    return checkpointedInputGate.getAvailableFuture();
+  }
+
+  @Override
+  public void close() throws IOException {
+    // --- Begin Gluten-specific code changes ---
+    if (inputSerializer instanceof Closeable) {
+      // GlutenRowVectorSerializer need to release native resources.
+      ((Closeable) inputSerializer).close();
+    }
+    // --- End Gluten-specific code changes ---
+    // release the deserializers . this part should not ever fail
+    for (InputChannelInfo channelInfo : new 
ArrayList<>(recordDeserializers.keySet())) {
+      releaseDeserializer(channelInfo);
+    }
+  }
+
+  protected void releaseDeserializer(InputChannelInfo channelInfo) {
+    R deserializer = recordDeserializers.get(channelInfo);
+    if (deserializer != null) {
+      // recycle buffers and clear the deserializer.
+      deserializer.clear();
+      recordDeserializers.remove(channelInfo);
+    }
+  }
+}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java
index e59128f2d5..db17a47a2d 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/typeutils/GlutenRowVectorSerializer.java
@@ -31,15 +31,16 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import java.io.Closeable;
 import java.io.IOException;
 
 /** Serializer for {@link RowVector}. */
 @Internal
-public class GlutenRowVectorSerializer extends TypeSerializer<StatefulRecord> {
+public class GlutenRowVectorSerializer extends TypeSerializer<StatefulRecord> 
implements Closeable {
   private static final long serialVersionUID = 1L;
   private final RowType rowType;
-  private MemoryManager memoryManager;
-  private Session session;
+  private transient MemoryManager memoryManager;
+  private transient Session session;
 
   public GlutenRowVectorSerializer(RowType rowType) {
     this.rowType = rowType;
@@ -100,8 +101,11 @@ public class GlutenRowVectorSerializer extends 
TypeSerializer<StatefulRecord> {
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof GlutenRowVectorSerializer) {
-      GlutenRowVectorSerializer other = (GlutenRowVectorSerializer) obj;
-      return rowType.equals(other.rowType);
+      if (rowType != null) {
+        GlutenRowVectorSerializer other = (GlutenRowVectorSerializer) obj;
+        return rowType.equals(other.rowType);
+      }
+      return true;
     }
 
     return false;
@@ -109,6 +113,9 @@ public class GlutenRowVectorSerializer extends 
TypeSerializer<StatefulRecord> {
 
   @Override
   public int hashCode() {
+    if (rowType == null) {
+      return 0;
+    }
     return rowType.hashCode();
   }
 
@@ -127,6 +134,14 @@ public class GlutenRowVectorSerializer extends 
TypeSerializer<StatefulRecord> {
     return new RowVectorSerializerSnapshot(rowType);
   }
 
+  @Override
+  public void close() {
+    if (memoryManager != null) {
+      memoryManager.close();
+      session.close();
+    }
+  }
+
   /** {@link TypeSerializerSnapshot} for Gluten RowVector.. */
   public static final class RowVectorSerializerSnapshot
       implements TypeSerializerSnapshot<StatefulRecord> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to