This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9d36fdd2ca Revert "[GLUTEN-9567][FLINK] Add container class
`Velox4JBean` for proper JSO…" (#9656)
9d36fdd2ca is described below
commit 9d36fdd2ca9387e4f2fe13886e9d1a6f48dcb7b7
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu May 15 11:35:14 2025 +0100
Revert "[GLUTEN-9567][FLINK] Add container class `Velox4JBean` for proper
JSO…" (#9656)
This reverts commit e436fa4899eee052a5ed019542d8ce5a4183ffd5.
---
.../operators/GlutenSingleInputOperator.java | 35 ++++++------
.../runtime/operators/GlutenSourceFunction.java | 27 +++++----
.../java/org/apache/gluten/util/Velox4JBean.java | 66 ----------------------
3 files changed, 32 insertions(+), 96 deletions(-)
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
index 8e888a5ef9..034768f36a 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
@@ -17,14 +17,15 @@
package org.apache.gluten.table.runtime.operators;
+import io.github.zhztheplayer.velox4j.connector.ExternalStream;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
+import io.github.zhztheplayer.velox4j.iterator.DownIterators;
import io.github.zhztheplayer.velox4j.iterator.UpIterator;
import io.github.zhztheplayer.velox4j.query.SerialTask;
import io.github.zhztheplayer.velox4j.type.RowType;
import org.apache.gluten.streaming.api.operators.GlutenOperator;
-import org.apache.gluten.util.Velox4JBean;
import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
import io.github.zhztheplayer.velox4j.Velox4j;
@@ -49,6 +50,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
/** Calculate operator in gluten, which will call Velox to run. */
public class GlutenSingleInputOperator extends TableStreamOperator<RowData>
@@ -56,10 +59,10 @@ public class GlutenSingleInputOperator extends
TableStreamOperator<RowData>
private static final Logger LOG =
LoggerFactory.getLogger(GlutenSingleInputOperator.class);
- private final Velox4JBean<PlanNode> glutenPlan;
+ private final PlanNode glutenPlan;
private final String id;
- private final Velox4JBean<RowType> inputType;
- private final Velox4JBean<RowType> outputType;
+ private final RowType inputType;
+ private final RowType outputType;
private StreamRecord<RowData> outElement = null;
@@ -71,10 +74,10 @@ public class GlutenSingleInputOperator extends
TableStreamOperator<RowData>
private SerialTask task;
public GlutenSingleInputOperator(PlanNode plan, String id, RowType
inputType, RowType outputType) {
- this.glutenPlan = Velox4JBean.of(plan);
+ this.glutenPlan = plan;
this.id = id;
- this.inputType = Velox4JBean.of(inputType);
- this.outputType = Velox4JBean.of(outputType);
+ this.inputType = inputType;
+ this.outputType = outputType;
}
@Override
@@ -88,12 +91,12 @@ public class GlutenSingleInputOperator extends
TableStreamOperator<RowData>
// add a mock input as velox not allow the source is empty.
PlanNode mockInput = new TableScanNode(
id,
- inputType.get(),
+ inputType,
new ExternalStreamTableHandle("connector-external-stream"),
List.of());
- glutenPlan.get().setSources(List.of(mockInput));
- LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan.get()));
- query = new Query(glutenPlan.get(), Config.empty(),
ConnectorConfig.empty());
+ glutenPlan.setSources(List.of(mockInput));
+ LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
+ query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty());
allocator = new RootAllocator(Long.MAX_VALUE);
task = session.queryOps().execute(query);
ExternalStreamConnectorSplit split = new
ExternalStreamConnectorSplit("connector-external-stream", inputQueue.id());
@@ -107,7 +110,7 @@ public class GlutenSingleInputOperator extends
TableStreamOperator<RowData>
element.getValue(),
allocator,
session,
- inputType.get());
+ inputType);
inputQueue.put(inRv);
UpIterator.State state = task.advance();
if (state == UpIterator.State.AVAILABLE) {
@@ -115,7 +118,7 @@ public class GlutenSingleInputOperator extends
TableStreamOperator<RowData>
List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
outRv,
allocator,
- outputType.get());
+ outputType);
for (RowData row : rows) {
output.collect(outElement.replace(row));
}
@@ -135,17 +138,17 @@ public class GlutenSingleInputOperator extends
TableStreamOperator<RowData>
@Override
public PlanNode getPlanNode() {
- return glutenPlan.get();
+ return glutenPlan;
}
@Override
public RowType getInputType() {
- return inputType.get();
+ return inputType;
}
@Override
public RowType getOutputType() {
- return outputType.get();
+ return outputType;
}
@Override
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 690cf29a6e..1d80806842 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -18,7 +18,6 @@
package org.apache.gluten.table.runtime.operators;
import io.github.zhztheplayer.velox4j.query.SerialTask;
-import org.apache.gluten.util.Velox4JBean;
import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
import io.github.zhztheplayer.velox4j.Velox4j;
@@ -48,10 +47,10 @@ import java.util.List;
public class GlutenSourceFunction extends RichParallelSourceFunction<RowData> {
private static final Logger LOG =
LoggerFactory.getLogger(GlutenSourceFunction.class);
- private final Velox4JBean<PlanNode> planNode;
- private final Velox4JBean<RowType> outputType;
+ private final PlanNode planNode;
+ private final RowType outputType;
private final String id;
- private final Velox4JBean<ConnectorSplit> split;
+ private final ConnectorSplit split;
private volatile boolean isRunning = true;
private Session session;
@@ -64,32 +63,32 @@ public class GlutenSourceFunction extends
RichParallelSourceFunction<RowData> {
RowType outputType,
String id,
ConnectorSplit split) {
- this.planNode = Velox4JBean.of(planNode);
- this.outputType = Velox4JBean.of(outputType);
+ this.planNode = planNode;
+ this.outputType = outputType;
this.id = id;
- this.split = Velox4JBean.of(split);
+ this.split = split;
}
public PlanNode getPlanNode() {
- return planNode.get();
+ return planNode;
}
- public RowType getOutputType() { return outputType.get(); }
+ public RowType getOutputType() { return outputType; }
public String getId() { return id; }
- public ConnectorSplit getConnectorSplit() { return split.get(); }
+ public ConnectorSplit getConnectorSplit() { return split; }
@Override
public void run(SourceContext<RowData> sourceContext) throws Exception {
- LOG.debug("Running GlutenSourceFunction: " +
Serde.toJson(planNode.get()));
+ LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
- query = new Query(planNode.get(), Config.empty(),
ConnectorConfig.empty());
+ query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
allocator = new RootAllocator(Long.MAX_VALUE);
SerialTask task = session.queryOps().execute(query);
- task.addSplit(id, split.get());
+ task.addSplit(id, split);
task.noMoreSplits(id);
while (isRunning) {
UpIterator.State state = task.advance();
@@ -98,7 +97,7 @@ public class GlutenSourceFunction extends
RichParallelSourceFunction<RowData> {
List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
outRv,
allocator,
- outputType.get());
+ outputType);
for (RowData row : rows) {
sourceContext.collect(row);
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Velox4JBean.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Velox4JBean.java
deleted file mode 100644
index 7cb77cfffd..0000000000
--- a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/Velox4JBean.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gluten.util;
-
-import com.google.common.base.Preconditions;
-import io.github.zhztheplayer.velox4j.serde.NativeBean;
-import io.github.zhztheplayer.velox4j.serde.Serde;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-/**
- * A serializable container for an instance of Velox4J's bean class, with the
Java
- * Serializable interface implemented.
- * <p>
- * Velox4J's JSON serde is well-maintained so we simply route the serialization
- * calls to the JSON serdes.
- */
-public class Velox4JBean<T extends NativeBean> implements Serializable {
- private transient String className;
- private transient T bean;
-
- private Velox4JBean(T bean) {
- this.className = bean.getClass().getName();
- this.bean = bean;
- }
-
- public static <T extends NativeBean> Velox4JBean<T> of(T bean) {
- return new Velox4JBean<>(bean);
- }
-
- public T get() {
- return bean;
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- final String json = Serde.toJson(bean);
- out.writeUTF(className);
- out.writeUTF(json);
- }
-
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- className = in.readUTF();
- final Class<?> clazz = Class.forName(className);
- Preconditions.checkState(NativeBean.class.isAssignableFrom(clazz),
- "Class %s is not a NativeBean", className);
- bean = (T) Serde.fromJson(in.readUTF(), (Class<? extends NativeBean>)
clazz);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]