This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d36fdd2ca Revert "[GLUTEN-9567][FLINK] Add container class 
`Velox4JBean` for proper JSO…" (#9656)
9d36fdd2ca is described below

commit 9d36fdd2ca9387e4f2fe13886e9d1a6f48dcb7b7
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu May 15 11:35:14 2025 +0100

    Revert "[GLUTEN-9567][FLINK] Add container class `Velox4JBean` for proper 
JSO…" (#9656)
    
    This reverts commit e436fa4899eee052a5ed019542d8ce5a4183ffd5.
---
 .../operators/GlutenSingleInputOperator.java       | 35 ++++++------
 .../runtime/operators/GlutenSourceFunction.java    | 27 +++++----
 .../java/org/apache/gluten/util/Velox4JBean.java   | 66 ----------------------
 3 files changed, 32 insertions(+), 96 deletions(-)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
index 8e888a5ef9..034768f36a 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
@@ -17,14 +17,15 @@
 
 package org.apache.gluten.table.runtime.operators;
 
+import io.github.zhztheplayer.velox4j.connector.ExternalStream;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
+import io.github.zhztheplayer.velox4j.iterator.DownIterators;
 import io.github.zhztheplayer.velox4j.iterator.UpIterator;
 import io.github.zhztheplayer.velox4j.query.SerialTask;
 import io.github.zhztheplayer.velox4j.type.RowType;
 import org.apache.gluten.streaming.api.operators.GlutenOperator;
-import org.apache.gluten.util.Velox4JBean;
 import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
@@ -49,6 +50,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /** Calculate operator in gluten, which will call Velox to run. */
 public class GlutenSingleInputOperator extends TableStreamOperator<RowData>
@@ -56,10 +59,10 @@ public class GlutenSingleInputOperator extends 
TableStreamOperator<RowData>
 
     private static final Logger LOG = 
LoggerFactory.getLogger(GlutenSingleInputOperator.class);
 
-    private final Velox4JBean<PlanNode> glutenPlan;
+    private final PlanNode glutenPlan;
     private final String id;
-    private final Velox4JBean<RowType> inputType;
-    private final Velox4JBean<RowType> outputType;
+    private final RowType inputType;
+    private final RowType outputType;
 
     private StreamRecord<RowData> outElement = null;
 
@@ -71,10 +74,10 @@ public class GlutenSingleInputOperator extends 
TableStreamOperator<RowData>
     private SerialTask task;
 
     public GlutenSingleInputOperator(PlanNode plan, String id, RowType 
inputType, RowType outputType) {
-        this.glutenPlan = Velox4JBean.of(plan);
+        this.glutenPlan = plan;
         this.id = id;
-        this.inputType = Velox4JBean.of(inputType);
-        this.outputType = Velox4JBean.of(outputType);
+        this.inputType = inputType;
+        this.outputType = outputType;
     }
 
     @Override
@@ -88,12 +91,12 @@ public class GlutenSingleInputOperator extends 
TableStreamOperator<RowData>
         // add a mock input as velox not allow the source is empty.
         PlanNode mockInput = new TableScanNode(
                 id,
-                inputType.get(),
+                inputType,
                 new ExternalStreamTableHandle("connector-external-stream"),
                 List.of());
-        glutenPlan.get().setSources(List.of(mockInput));
-        LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan.get()));
-        query = new Query(glutenPlan.get(), Config.empty(), 
ConnectorConfig.empty());
+        glutenPlan.setSources(List.of(mockInput));
+        LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
+        query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty());
         allocator = new RootAllocator(Long.MAX_VALUE);
         task = session.queryOps().execute(query);
         ExternalStreamConnectorSplit split = new 
ExternalStreamConnectorSplit("connector-external-stream", inputQueue.id());
@@ -107,7 +110,7 @@ public class GlutenSingleInputOperator extends 
TableStreamOperator<RowData>
             element.getValue(),
             allocator,
             session,
-            inputType.get());
+            inputType);
         inputQueue.put(inRv);
         UpIterator.State state = task.advance();
         if (state == UpIterator.State.AVAILABLE) {
@@ -115,7 +118,7 @@ public class GlutenSingleInputOperator extends 
TableStreamOperator<RowData>
             List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
                     outRv,
                     allocator,
-                    outputType.get());
+                    outputType);
             for (RowData row : rows) {
                 output.collect(outElement.replace(row));
             }
@@ -135,17 +138,17 @@ public class GlutenSingleInputOperator extends 
TableStreamOperator<RowData>
 
     @Override
     public PlanNode getPlanNode() {
-        return glutenPlan.get();
+        return glutenPlan;
     }
 
     @Override
     public RowType getInputType() {
-        return inputType.get();
+        return inputType;
     }
 
     @Override
     public RowType getOutputType() {
-        return outputType.get();
+        return outputType;
     }
 
     @Override
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 690cf29a6e..1d80806842 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -18,7 +18,6 @@
 package org.apache.gluten.table.runtime.operators;
 
 import io.github.zhztheplayer.velox4j.query.SerialTask;
-import org.apache.gluten.util.Velox4JBean;
 import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
@@ -48,10 +47,10 @@ import java.util.List;
 public class GlutenSourceFunction extends RichParallelSourceFunction<RowData> {
     private static final Logger LOG = 
LoggerFactory.getLogger(GlutenSourceFunction.class);
 
-    private final Velox4JBean<PlanNode> planNode;
-    private final Velox4JBean<RowType> outputType;
+    private final PlanNode planNode;
+    private final RowType outputType;
     private final String id;
-    private final Velox4JBean<ConnectorSplit> split;
+    private final ConnectorSplit split;
     private volatile boolean isRunning = true;
 
     private Session session;
@@ -64,32 +63,32 @@ public class GlutenSourceFunction extends 
RichParallelSourceFunction<RowData> {
             RowType outputType,
             String id,
             ConnectorSplit split) {
-        this.planNode = Velox4JBean.of(planNode);
-        this.outputType = Velox4JBean.of(outputType);
+        this.planNode = planNode;
+        this.outputType = outputType;
         this.id = id;
-        this.split = Velox4JBean.of(split);
+        this.split = split;
     }
 
     public PlanNode getPlanNode() {
-        return planNode.get();
+        return planNode;
     }
 
-    public RowType getOutputType() { return outputType.get(); }
+    public RowType getOutputType() { return outputType; }
 
     public String getId() { return id; }
 
-    public ConnectorSplit getConnectorSplit() { return split.get(); }
+    public ConnectorSplit getConnectorSplit() { return split; }
 
     @Override
     public void run(SourceContext<RowData> sourceContext) throws Exception {
-        LOG.debug("Running GlutenSourceFunction: " + 
Serde.toJson(planNode.get()));
+        LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
         memoryManager = MemoryManager.create(AllocationListener.NOOP);
         session = Velox4j.newSession(memoryManager);
-        query = new Query(planNode.get(), Config.empty(), 
ConnectorConfig.empty());
+        query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
         allocator = new RootAllocator(Long.MAX_VALUE);
 
         SerialTask task = session.queryOps().execute(query);
-        task.addSplit(id, split.get());
+        task.addSplit(id, split);
         task.noMoreSplits(id);
         while (isRunning) {
             UpIterator.State state = task.advance();
@@ -98,7 +97,7 @@ public class GlutenSourceFunction extends 
RichParallelSourceFunction<RowData> {
                 List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
                         outRv,
                         allocator,
-                        outputType.get());
+                        outputType);
                 for (RowData row : rows) {
                     sourceContext.collect(row);
                 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Velox4JBean.java 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Velox4JBean.java
deleted file mode 100644
index 7cb77cfffd..0000000000
--- a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Velox4JBean.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gluten.util;
-
-import com.google.common.base.Preconditions;
-import io.github.zhztheplayer.velox4j.serde.NativeBean;
-import io.github.zhztheplayer.velox4j.serde.Serde;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-/**
- * A serializable container for an instance of Velox4J's bean class, with the 
Java
- * Serializable interface implemented.
- * <p>
- * Velox4J's JSON serde is well-maintained so we simply route the serialization
- * calls to the JSON serdes.
- */
-public class Velox4JBean<T extends NativeBean> implements Serializable {
-  private transient String className;
-  private transient T bean;
-
-  private Velox4JBean(T bean) {
-    this.className = bean.getClass().getName();
-    this.bean = bean;
-  }
-
-  public static <T extends NativeBean> Velox4JBean<T> of(T bean) {
-    return new Velox4JBean<>(bean);
-  }
-
-  public T get() {
-    return bean;
-  }
-
-  private void writeObject(ObjectOutputStream out) throws IOException {
-    final String json = Serde.toJson(bean);
-    out.writeUTF(className);
-    out.writeUTF(json);
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-    className = in.readUTF();
-    final Class<?> clazz = Class.forName(className);
-    Preconditions.checkState(NativeBean.class.isAssignableFrom(clazz),
-        "Class %s is not a NativeBean", className);
-    bean = (T) Serde.fromJson(in.readUTF(), (Class<? extends NativeBean>) 
clazz);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to