This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a02d726c62 [GLUTEN-9950][FLINK] Make nexmark source num events 
configurable (#9978)
a02d726c62 is described below

commit a02d726c62d6c3ec7093bb348f1207d3964af9e9
Author: kevinyhzou <[email protected]>
AuthorDate: Tue Jun 17 16:23:24 2025 +0800

    [GLUTEN-9950][FLINK] Make nexmark source num events configurable (#9978)
    
    * Make nexmark source num events configuratable
---
 .../SourceTransformationTranslator.java            | 27 +++++++++++--
 .../java/org/apache/gluten/util/ReflectUtils.java  | 44 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 3 deletions(-)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index 1966be8fb6..913992c843 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -20,6 +20,7 @@ import 
org.apache.gluten.streaming.api.operators.GlutenStreamSource;
 import org.apache.gluten.table.runtime.operators.GlutenSourceFunction;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
 
 import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
@@ -81,12 +82,29 @@ public class SourceTransformationTranslator<OUT, SplitT 
extends SourceSplit, Enu
     final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
 
     // --- Begin Gluten-specific code changes ---
-    if 
(transformation.getSource().getClass().getSimpleName().equals("NexmarkSource")) 
{
+    Class<?> sourceClazz = transformation.getSource().getClass();
+    if (sourceClazz.getSimpleName().equals("NexmarkSource")) {
       RowType outputType =
           (RowType)
               LogicalTypeConverter.toVLType(
                   ((InternalTypeInfo) 
transformation.getOutputType()).toLogicalType());
       String id = PlanNodeIdGenerator.newId();
+      Object nexmarkSource = transformation.getSource();
+      List<?> nexmarkSourceSplits =
+          (List<?>)
+              ReflectUtils.invokeObjectMethod(
+                  sourceClazz,
+                  nexmarkSource,
+                  "getSplits",
+                  new Class<?>[] {int.class},
+                  new Object[] {transformation.getParallelism()});
+      Object nexmarkSourceSplit = nexmarkSourceSplits.get(0);
+      Object generatorConfig =
+          ReflectUtils.getObjectField(
+              nexmarkSourceSplit.getClass(), nexmarkSourceSplit, 
"generatorConfig");
+      Long maxEvents =
+          (Long)
+              ReflectUtils.getObjectField(generatorConfig.getClass(), 
generatorConfig, "maxEvents");
       StreamOperatorFactory<OUT> operatorFactory =
           SimpleOperatorFactory.of(
               new GlutenStreamSource(
@@ -95,8 +113,11 @@ public class SourceTransformationTranslator<OUT, SplitT 
extends SourceSplit, Enu
                           id, outputType, new 
NexmarkTableHandle("connector-nexmark"), List.of()),
                       outputType,
                       id,
-                      // TODO: should use config to get parameters
-                      new NexmarkConnectorSplit("connector-nexmark", 
100000000))));
+                      new NexmarkConnectorSplit(
+                          "connector-nexmark",
+                          maxEvents > Integer.MAX_VALUE
+                              ? Integer.MAX_VALUE
+                              : maxEvents.intValue()))));
       streamGraph.addLegacySource(
           transformationId,
           slotSharingGroup,
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
new file mode 100644
index 0000000000..c3a34b45ec
--- /dev/null
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.util;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+public class ReflectUtils {
+
+  public static Object getObjectField(Class<?> clazz, Object obj, String 
fieldName) {
+    try {
+      Field f = clazz.getDeclaredField(fieldName);
+      f.setAccessible(true);
+      return f.get(obj);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Object invokeObjectMethod(
+      Class<?> clazz, Object obj, String methodName, Class<?>[] paramTypes, 
Object[] paramValues) {
+    try {
+      Method m = clazz.getDeclaredMethod(methodName, paramTypes);
+      m.setAccessible(true);
+      return m.invoke(obj, paramValues);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to