This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a02d726c62 [GLUTEN-9950][FLINK] Make nexmark source num events
configurable (#9978)
a02d726c62 is described below
commit a02d726c62d6c3ec7093bb348f1207d3964af9e9
Author: kevinyhzou <[email protected]>
AuthorDate: Tue Jun 17 16:23:24 2025 +0800
[GLUTEN-9950][FLINK] Make nexmark source num events configurable (#9978)
* Make nexmark source num events configuratable
---
.../SourceTransformationTranslator.java | 27 +++++++++++--
.../java/org/apache/gluten/util/ReflectUtils.java | 44 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 3 deletions(-)
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index 1966be8fb6..913992c843 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -20,6 +20,7 @@ import
org.apache.gluten.streaming.api.operators.GlutenStreamSource;
import org.apache.gluten.table.runtime.operators.GlutenSourceFunction;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
@@ -81,12 +82,29 @@ public class SourceTransformationTranslator<OUT, SplitT
extends SourceSplit, Enu
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// --- Begin Gluten-specific code changes ---
- if
(transformation.getSource().getClass().getSimpleName().equals("NexmarkSource"))
{
+ Class<?> sourceClazz = transformation.getSource().getClass();
+ if (sourceClazz.getSimpleName().equals("NexmarkSource")) {
RowType outputType =
(RowType)
LogicalTypeConverter.toVLType(
((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
String id = PlanNodeIdGenerator.newId();
+ Object nexmarkSource = transformation.getSource();
+ List<?> nexmarkSourceSplits =
+ (List<?>)
+ ReflectUtils.invokeObjectMethod(
+ sourceClazz,
+ nexmarkSource,
+ "getSplits",
+ new Class<?>[] {int.class},
+ new Object[] {transformation.getParallelism()});
+ Object nexmarkSourceSplit = nexmarkSourceSplits.get(0);
+ Object generatorConfig =
+ ReflectUtils.getObjectField(
+ nexmarkSourceSplit.getClass(), nexmarkSourceSplit,
"generatorConfig");
+ Long maxEvents =
+ (Long)
+ ReflectUtils.getObjectField(generatorConfig.getClass(),
generatorConfig, "maxEvents");
StreamOperatorFactory<OUT> operatorFactory =
SimpleOperatorFactory.of(
new GlutenStreamSource(
@@ -95,8 +113,11 @@ public class SourceTransformationTranslator<OUT, SplitT
extends SourceSplit, Enu
id, outputType, new
NexmarkTableHandle("connector-nexmark"), List.of()),
outputType,
id,
- // TODO: should use config to get parameters
- new NexmarkConnectorSplit("connector-nexmark",
100000000))));
+ new NexmarkConnectorSplit(
+ "connector-nexmark",
+ maxEvents > Integer.MAX_VALUE
+ ? Integer.MAX_VALUE
+ : maxEvents.intValue()))));
streamGraph.addLegacySource(
transformationId,
slotSharingGroup,
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
new file mode 100644
index 0000000000..c3a34b45ec
--- /dev/null
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.util;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+public class ReflectUtils {
+
+ public static Object getObjectField(Class<?> clazz, Object obj, String
fieldName) {
+ try {
+ Field f = clazz.getDeclaredField(fieldName);
+ f.setAccessible(true);
+ return f.get(obj);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Object invokeObjectMethod(
+ Class<?> clazz, Object obj, String methodName, Class<?>[] paramTypes,
Object[] paramValues) {
+ try {
+ Method m = clazz.getDeclaredMethod(methodName, paramTypes);
+ m.setAccessible(true);
+ return m.invoke(obj, paramValues);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]