This is an automated email from the ASF dual-hosted git repository.

zhanglistar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 22cefd5264 [FLINK] Support Apache Pulsar as DataSource (#12297)
22cefd5264 is described below

commit 22cefd5264557a6d42dd36f1f8ab8e7ea8a1627c
Author: zhanglistar <[email protected]>
AuthorDate: Mon Jun 29 17:08:50 2026 +0800

    [FLINK] Support Apache Pulsar as DataSource (#12297)
    
    * Fix Flink Nexmark window aggregate tests
    
    * [VL] Add Flink Pulsar source support
    
    * [FLINK] Point CI to Pulsar velox4j commit
    
    * [FLINK] Update velox4j pin for stateful join fix
    
    * [FLINK] Point CI to Pulsar velox4j commit
    
    * [FLINK] Update velox4j pin for stateful join fix
    
    * [FLINK] Adapt to latest velox4j API and simplify Pulsar source detection
    
    - Simplify isPulsarSource to use class simple name check (like Kafka)
    - Replace hand-rolled reflection with ReflectUtils.getObjectField
    - Adapt NexmarkSourceFactory to new NexmarkGeneratorConfig API
    - Adapt PrintSinkFactory to new PrintTableHandle constructor (isStdErr)
    - Update CI velox4j pin to gluten-0530 latest commit
    
    * Update flink.yml
    
    * [FLINK] Update velox4j CI pin to latest gluten-0530
    
    * [FLINK] Fix isPulsarSource to handle wrapped sources
    
    * [FLINK] Add --add-opens for jdk.internal.reflect to fix NexmarkTest
    
    * [FLINK] Fix NexmarkSourceFactoryTest GeneratorConfig constructor arity
    
    * fix ut
    
    * [FLINK] Add --add-opens jdk.internal.reflect to surefire extraJavaTestArgs
    
    The NexmarkTest was failing with InaccessibleObjectException because
    surefire forks a new JVM that reads argLine (not JAVA_TOOL_OPTIONS).
    Adding --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED to the
    POM's extraJavaTestArgs ensures the forked JVM always has the required
    opens regardless of environment variables.
    
    * [FLINK] Add --add-opens sun.reflect.generics.repository for NexmarkTest 
q18
    
    * [FLINK] Remove dead getWindowPropertyIndex method from 
StreamExecWindowAggregate
    
    * [FLINK] Fix spotless format violation in StreamExecWindowAggregate
---
 .github/workflows/flink.yml                        |   4 +
 .../plan/nodes/exec/common/CommonExecSink.java     |   4 +-
 .../stream/StreamExecGlobalWindowAggregate.java    |   2 +-
 .../stream/StreamExecLocalWindowAggregate.java     |   2 +-
 .../exec/stream/StreamExecTableSourceScan.java     |   4 +-
 .../org/apache/gluten/rexnode/WindowUtils.java     |   2 +
 .../gluten/velox/PulsarSourceSinkFactory.java      | 337 +++++++++++++++++++++
 .../org.apache.gluten.velox.VeloxSourceSinkFactory |   1 +
 .../table/runtime/config/VeloxConnectorConfig.java |   1 +
 .../runtime/operators/GlutenSourceFunction.java    |   1 +
 .../gluten/velox/VeloxSourceSinkFactory.java       |  73 ++++-
 gluten-flink/ut/pom.xml                            |  33 +-
 .../table/runtime/stream/custom/NexmarkTest.java   |  14 +-
 .../gluten/velox/NexmarkSourceFactoryTest.java     |   2 +-
 .../gluten/velox/PulsarSourceSinkFactoryTest.java  |  96 ++++++
 15 files changed, 557 insertions(+), 19 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 1df73f3119..66c5734923 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -36,6 +36,8 @@ env:
     --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
     --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
     --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
+    --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED
+    --add-opens=java.base/sun.reflect.generics.repository=ALL-UNNAMED
   JAVA_TOOL_OPTIONS: >-
     --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
     --add-opens=java.base/sun.nio.ch=org.apache.arrow.memory.core,ALL-UNNAMED
@@ -48,6 +50,8 @@ env:
     --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
     --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
     --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
+    --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED
+    --add-opens=java.base/sun.reflect.generics.repository=ALL-UNNAMED
   CCACHE_DIR: "${{ github.workspace }}/.ccache"
 
 jobs:
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index fab420c481..1f3bb41e45 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -222,7 +222,9 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             Configuration.class.getName(),
             streamExecEnv.getConfiguration(),
             ResolvedSchema.class.getName(),
-            schema));
+            schema,
+            VeloxSourceSinkFactory.FACTORY_CLASS_LOADER_KEY,
+            CommonExecSink.class.getClassLoader()));
     // --- End Gluten-specific code changes ---
   }
 
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index e5dca4ee8d..1e657950ef 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -259,7 +259,7 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
             offset,
             windowType,
             outputType,
-            true,
+            windowing.isRowtime(),
             rowtimeIndex,
             windowStartIndex,
             windowEndIndex);
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index d67d61709e..63e51797ea 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -220,7 +220,7 @@ public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBas
             offset,
             windowType,
             outputType,
-            false,
+            windowing.isRowtime(),
             rowtimeIndex,
             -1,
             -1);
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index e7ec714e70..91f59fd636 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -183,7 +183,9 @@ public class StreamExecTableSourceScan extends 
CommonExecTableSourceScan
             "checkpoint.enabled",
             
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled(),
             "watermarkPushDownSpec",
-            watermarkPushDownSpec));
+            watermarkPushDownSpec,
+            VeloxSourceSinkFactory.FACTORY_CLASS_LOADER_KEY,
+            StreamExecTableSourceScan.class.getClassLoader()));
     // --- End Gluten-specific code changes ---
   }
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
index 021969f2a2..b42635f06b 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
@@ -48,6 +48,7 @@ public class WindowUtils {
     int windowType = -1;
     WindowSpec windowSpec = windowing.getWindow();
     if (windowSpec instanceof HoppingWindowSpec) {
+      windowType = 0;
       size = ((HoppingWindowSpec) windowSpec).getSize().toMillis();
       slide = ((HoppingWindowSpec) windowSpec).getSlide().toMillis();
       if (size % slide != 0) {
@@ -63,6 +64,7 @@ public class WindowUtils {
       }
       windowType = 0;
     } else if (windowSpec instanceof TumblingWindowSpec) {
+      windowType = 1;
       size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
       Duration windowOffset = ((TumblingWindowSpec) windowSpec).getOffset();
       if (windowOffset != null) {
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PulsarSourceSinkFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PulsarSourceSinkFactory.java
new file mode 100644
index 0000000000..354945c78c
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PulsarSourceSinkFactory.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.PulsarConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.PulsarTableHandle;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+
+public class PulsarSourceSinkFactory implements VeloxSourceSinkFactory {
+
+  private static final String CONNECTOR_ID = "connector-pulsar";
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
+    if (transformation instanceof SourceTransformation) {
+      Source source = ((SourceTransformation) transformation).getSource();
+      return isPulsarSource(source);
+    }
+    return false;
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    RowType outputType =
+        (RowType)
+            LogicalTypeConverter.toVLType(
+                ((InternalTypeInfo<?>) 
transformation.getOutputType()).toLogicalType());
+    try {
+      ScanTableSource tableSource =
+          (ScanTableSource) parameters.get(ScanTableSource.class.getName());
+      SourceTransformation sourceTransformation = (SourceTransformation) 
transformation;
+      Source source = sourceTransformation.getSource();
+      Map<String, String> pulsarTableParameters = 
buildTableParameters(tableSource, source);
+      String topic = required(pulsarTableParameters, "topic");
+      String serviceUrl = required(pulsarTableParameters, "service.url");
+      String subscriptionName = required(pulsarTableParameters, 
"subscription.name");
+      String format = pulsarTableParameters.getOrDefault("format", "raw");
+
+      String planId = PlanNodeIdGenerator.newId();
+      PulsarTableHandle pulsarTableHandle =
+          new PulsarTableHandle(CONNECTOR_ID, topic, outputType, 
pulsarTableParameters);
+      PulsarConnectorSplit connectorSplit =
+          new PulsarConnectorSplit(CONNECTOR_ID, serviceUrl, topic, 
subscriptionName, format);
+      TableScanNode pulsarScan =
+          new TableScanNode(planId, outputType, pulsarTableHandle, List.of());
+      GlutenStreamSource sourceOp =
+          new GlutenStreamSource(
+              new GlutenSourceFunction(
+                  new StatefulPlanNode(pulsarScan.getId(), pulsarScan),
+                  Map.of(pulsarScan.getId(), outputType),
+                  pulsarScan.getId(),
+                  connectorSplit,
+                  RowData.class),
+              "PulsarSource");
+      return new LegacySourceTransformation<RowData>(
+          sourceTransformation.getName(),
+          sourceOp,
+          transformation.getOutputType(),
+          sourceTransformation.getParallelism(),
+          sourceTransformation.getBoundedness(),
+          false);
+    } catch (Exception e) {
+      throw new FlinkRuntimeException(e);
+    }
+  }
+
+  @Override
+  public Transformation<RowData> buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    throw new FlinkRuntimeException("Unimplemented method 'buildSink'");
+  }
+
+  static Map<String, String> buildTableParameters(Object tableSource, Object 
source) {
+    Map<String, String> options = new HashMap<>();
+    putAllStringOptions(options, tableSource);
+    putAllStringOptions(options, source);
+
+    firstString(tableSource, source, "serviceUrl", "serviceURL", 
"pulsarServiceUrl")
+        .ifPresent(value -> options.put("service.url", value));
+    option(options, "service-url", "pulsar.service.url", "service.url", 
"pulsar.client.serviceUrl")
+        .ifPresent(value -> options.put("service.url", value));
+
+    firstString(tableSource, source, "adminUrl", "adminURL", "pulsarAdminUrl")
+        .ifPresent(value -> options.put("admin.url", value));
+    option(options, "admin-url", "pulsar.admin.url", "admin.url")
+        .ifPresent(value -> options.put("admin.url", value));
+
+    firstTopic(tableSource, source).ifPresent(value -> options.put("topic", 
value));
+    option(options, "topic", "topics").ifPresent(value -> options.put("topic", 
value));
+
+    firstString(tableSource, source, "subscriptionName")
+        .ifPresent(value -> options.put("subscription.name", value));
+    option(
+            options,
+            "subscription-name",
+            "subscription.name",
+            "pulsar.subscription.name",
+            "pulsar.consumer.subscriptionName",
+            "properties.subscription.name",
+            "source.subscription-name")
+        .ifPresent(value -> options.put("subscription.name", value));
+
+    options.putIfAbsent("subscription.name", "gluten-pulsar-" + 
UUID.randomUUID().toString());
+    firstString(tableSource, source, "subscriptionType")
+        .or(
+            () ->
+                option(
+                    options,
+                    "source.subscription-type",
+                    "subscription.type",
+                    "pulsar.subscription.type"))
+        .map(PulsarSourceSinkFactory::normalizeSubscriptionType)
+        .ifPresent(value -> options.put("subscription.type", value));
+    option(options, "value.format")
+        .or(
+            () -> {
+              String resolvedFormat = resolveFormat(tableSource);
+              if (!"raw".equals(resolvedFormat)) {
+                return Optional.of(resolvedFormat);
+              }
+              return option(options, "format").filter(value -> 
!"raw".equals(value));
+            })
+        .ifPresent(value -> options.put("format", value));
+    options.putIfAbsent("format", "raw");
+    option(
+            options,
+            "scan.startup.mode",
+            "startup.mode",
+            "initial.position",
+            "source.start.message-id")
+        .map(PulsarSourceSinkFactory::toInitialPosition)
+        .ifPresent(value -> options.put("initial.position", value));
+    options.putIfAbsent("initial.position", "latest");
+    return options;
+  }
+
+  static boolean isPulsarSource(Object source) {
+    return isPulsarSource(source, 0);
+  }
+
+  private static boolean isPulsarSource(Object source, int depth) {
+    if (source == null || depth > 3) {
+      return false;
+    }
+    if (source.getClass().getSimpleName().equals("PulsarSource")) {
+      return true;
+    }
+    // Check wrapped source fields (e.g. Flink wraps PulsarSource in a 
SourceReaderContext wrapper)
+    for (java.lang.reflect.Field f : source.getClass().getDeclaredFields()) {
+      Class<?> fieldType = f.getType();
+      // Only recurse into non-primitive, non-collection, non-map fields that 
could wrap a Source
+      if (fieldType.isPrimitive()
+          || fieldType == String.class
+          || Collection.class.isAssignableFrom(fieldType)
+          || Map.class.isAssignableFrom(fieldType)
+          || fieldType.isEnum()) {
+        continue;
+      }
+      f.setAccessible(true);
+      try {
+        Object inner = f.get(source);
+        if (inner != null && isPulsarSource(inner, depth + 1)) {
+          return true;
+        }
+      } catch (IllegalAccessException ignored) {
+      }
+    }
+    return false;
+  }
+
+  private static String required(Map<String, String> options, String key) {
+    String value = options.get(key);
+    if (value == null || value.isEmpty()) {
+      throw new FlinkRuntimeException("Missing Pulsar option: " + key);
+    }
+    return value;
+  }
+
+  private static String resolveFormat(Object tableSource) {
+    Optional<Object> decodingFormat = firstField(tableSource, 
"valueDecodingFormat", "format");
+    if (decodingFormat.isEmpty()) {
+      decodingFormat =
+          firstField(tableSource, "deserializationSchemaFactory")
+              .flatMap(factory -> firstField(factory, "valueDecodingFormat"));
+    }
+    if (decodingFormat.isPresent() && decodingFormat.get() instanceof 
DecodingFormat) {
+      String className = decodingFormat.get().getClass().getName();
+      if (className.contains("JsonFormatFactory")) {
+        return "json";
+      } else if (className.contains("CsvFormatFactory")) {
+        return "csv";
+      }
+    }
+    return "raw";
+  }
+
+  private static String toInitialPosition(String startupMode) {
+    String normalized = startupMode.toLowerCase().replace("_", "-");
+    if (normalized.contains("earliest")) {
+      return "earliest";
+    }
+    if (normalized.contains("latest")) {
+      return "latest";
+    }
+    return normalized;
+  }
+
+  private static String normalizeSubscriptionType(String subscriptionType) {
+    return subscriptionType.toLowerCase(Locale.ROOT).replace("-", "_");
+  }
+
+  private static Optional<String> firstTopic(Object... targets) {
+    for (Object target : targets) {
+      Optional<Object> topics = firstField(target, "topics", "topic", 
"topicName");
+      if (topics.isPresent()) {
+        Object value = topics.get();
+        if (value instanceof Collection) {
+          return ((Collection<?>) 
value).stream().findFirst().map(String::valueOf);
+        }
+        if (value instanceof String[]) {
+          return Arrays.stream((String[]) value).findFirst();
+        }
+        String text = String.valueOf(value);
+        if (!text.isEmpty()) {
+          return Optional.of(text);
+        }
+      }
+    }
+    return Optional.empty();
+  }
+
+  private static Optional<String> firstString(Object first, Object second, 
String... fieldNames) {
+    return firstField(first, fieldNames)
+        .or(() -> firstField(second, fieldNames))
+        .map(String::valueOf)
+        .filter(value -> !value.isEmpty());
+  }
+
+  private static Optional<String> option(Map<String, String> options, 
String... keys) {
+    for (String key : keys) {
+      String value = options.get(key);
+      if (value != null && !value.isEmpty()) {
+        return Optional.of(value);
+      }
+    }
+    return Optional.empty();
+  }
+
+  private static void putAllStringOptions(Map<String, String> output, Object 
target) {
+    firstField(target, "properties", "tableOptions", "options", 
"configuration")
+        .ifPresent(value -> putAllStringOptions(output, value));
+    if (target instanceof Properties) {
+      Properties properties = (Properties) target;
+      for (String key : properties.stringPropertyNames()) {
+        output.put(key, properties.getProperty(key));
+      }
+    } else if (target instanceof Map) {
+      ((Map<?, ?>) target)
+          .forEach(
+              (key, value) -> {
+                if (key != null && value != null) {
+                  output.put(String.valueOf(key), String.valueOf(value));
+                }
+              });
+    }
+  }
+
+  private static Optional<Object> firstField(Object target, String... 
fieldNames) {
+    if (target == null) {
+      return Optional.empty();
+    }
+    for (String fieldName : fieldNames) {
+      Optional<Object> value = field(target, fieldName);
+      if (value.isPresent()) {
+        return value;
+      }
+    }
+    return Optional.empty();
+  }
+
+  private static Optional<Object> field(Object target, String fieldName) {
+    Class<?> clazz = target.getClass();
+    while (clazz != null) {
+      try {
+        return Optional.ofNullable(ReflectUtils.getObjectField(clazz, target, 
fieldName));
+      } catch (FlinkRuntimeException e) {
+        clazz = clazz.getSuperclass();
+      }
+    }
+    return Optional.empty();
+  }
+}
diff --git 
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
 
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
index b2d824ee12..db57ce53dc 100644
--- 
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
+++ 
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
@@ -1,5 +1,6 @@
 org.apache.gluten.velox.FromElementsSourceFactory
 org.apache.gluten.velox.KafkaSourceSinkFactory
+org.apache.gluten.velox.PulsarSourceSinkFactory
 org.apache.gluten.velox.PrintSinkFactory
 org.apache.gluten.velox.NexmarkSourceFactory
 org.apache.gluten.velox.FileSystemSinkFactory
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
index df9da7512a..967b8de73c 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
@@ -33,6 +33,7 @@ public class VeloxConnectorConfig {
       List.of(
           "connector-nexmark",
           "connector-kafka",
+          "connector-pulsar",
           "connector-fuzzer",
           "connector-filesystem",
           "connector-from-elements",
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 76cced93f1..19c7a69e99 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -110,6 +110,7 @@ public class GlutenSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
           processAvailableElement(sourceContext);
           break;
         case BLOCKED:
+          task.waitFor();
           break;
         default:
           return;
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
index aff1bb7795..f3240bc7d9 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
@@ -22,14 +22,29 @@ import org.apache.flink.table.data.RowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 
 public interface VeloxSourceSinkFactory {
 
   static final Logger LOG = 
LoggerFactory.getLogger(VeloxSourceSinkFactory.class);
 
+  String FACTORY_CLASS_LOADER_KEY = "velox.source-sink.factory.classloader";
+
+  List<String> FACTORY_CLASS_NAMES =
+      List.of(
+          "org.apache.gluten.velox.FromElementsSourceFactory",
+          "org.apache.gluten.velox.KafkaSourceSinkFactory",
+          "org.apache.gluten.velox.PulsarSourceSinkFactory",
+          "org.apache.gluten.velox.PrintSinkFactory",
+          "org.apache.gluten.velox.NexmarkSourceFactory",
+          "org.apache.gluten.velox.FileSystemSinkFactory",
+          "org.apache.gluten.velox.FuzzerSourceSinkFactory");
+
   /** Match the conditions to determine if the operator can be offloaded to 
velox. */
   boolean match(Transformation<RowData> transformation);
 
@@ -43,21 +58,67 @@ public interface VeloxSourceSinkFactory {
 
   /** Choose the matched source/sink factory by given transformation. */
   private static Optional<VeloxSourceSinkFactory> getFactory(
-      Transformation<RowData> transformation) {
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
     ServiceLoader<VeloxSourceSinkFactory> factories =
         ServiceLoader.load(VeloxSourceSinkFactory.class);
-    for (VeloxSourceSinkFactory factory : factories) {
-      if (factory.match(transformation)) {
-        return Optional.of(factory);
+    try {
+      for (VeloxSourceSinkFactory factory : factories) {
+        if (factory.match(transformation)) {
+          return Optional.of(factory);
+        }
+      }
+    } catch (ServiceConfigurationError e) {
+      LOG.warn("Failed to load Velox source/sink factory", e);
+    }
+    for (String factoryClassName : FACTORY_CLASS_NAMES) {
+      Optional<VeloxSourceSinkFactory> factory = loadFactory(factoryClassName, 
parameters);
+      if (factory.isPresent() && factory.get().match(transformation)) {
+        return factory;
       }
     }
     return Optional.empty();
   }
 
+  private static Optional<VeloxSourceSinkFactory> loadFactory(
+      String factoryClassName, Map<String, Object> parameters) {
+    Object classLoader = parameters.get(FACTORY_CLASS_LOADER_KEY);
+    if (classLoader instanceof ClassLoader) {
+      Optional<VeloxSourceSinkFactory> factory =
+          loadFactory(factoryClassName, (ClassLoader) classLoader);
+      if (factory.isPresent()) {
+        return factory;
+      }
+    }
+    ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+    Optional<VeloxSourceSinkFactory> factory = loadFactory(factoryClassName, 
contextClassLoader);
+    if (factory.isPresent()) {
+      return factory;
+    }
+    return loadFactory(factoryClassName, 
VeloxSourceSinkFactory.class.getClassLoader());
+  }
+
+  private static Optional<VeloxSourceSinkFactory> loadFactory(
+      String factoryClassName, ClassLoader classLoader) {
+    try {
+      return Optional.of(
+          (VeloxSourceSinkFactory)
+              Class.forName(factoryClassName, true, classLoader)
+                  .getDeclaredConstructor()
+                  .newInstance());
+    } catch (ClassNotFoundException e) {
+      return Optional.empty();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | InvocationTargetException
+        | NoSuchMethodException e) {
+      throw new RuntimeException("Failed to instantiate Velox source/sink 
factory", e);
+    }
+  }
+
   /** Build Velox source, or fallback to flink orignal source . */
   static Transformation<RowData> buildSource(
       Transformation<RowData> transformation, Map<String, Object> parameters) {
-    Optional<VeloxSourceSinkFactory> factory = getFactory(transformation);
+    Optional<VeloxSourceSinkFactory> factory = getFactory(transformation, 
parameters);
     if (factory.isEmpty()) {
       LOG.warn(
           "Not find matched factory to build velox source transformation, and 
we will use flink original transformation {} instead.",
@@ -71,7 +132,7 @@ public interface VeloxSourceSinkFactory {
   /** Build Velox sink, or fallback to flink original sink. */
   static Transformation<RowData> buildSink(
       Transformation<RowData> transformation, Map<String, Object> parameters) {
-    Optional<VeloxSourceSinkFactory> factory = getFactory(transformation);
+    Optional<VeloxSourceSinkFactory> factory = getFactory(transformation, 
parameters);
     if (factory.isEmpty()) {
       LOG.warn(
           "Not find matched factory to build velox sink transformation, and we 
will use flink original transformation {} instead.",
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index 03bda07034..f94d743e3e 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -39,6 +39,8 @@
             --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
             --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
             --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
+            --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED
+            --add-opens=java.base/sun.reflect.generics.repository=ALL-UNNAMED
             --add-opens=java.base/sun.nio.ch=ALL-UNNAMED</extraJavaTestArgs>
   </properties>
 
@@ -53,6 +55,11 @@
       <artifactId>gluten-flink-runtime</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>gluten-flink-planner</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-streaming-java</artifactId>
@@ -96,12 +103,6 @@
       <artifactId>flink-clients</artifactId>
       <version>${flink.version}</version>
     </dependency>
-    <dependency>
-      <groupId>com.github.nexmark</groupId>
-      <artifactId>nexmark-flink</artifactId>
-      <version>0.3-SNAPSHOT</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>io.github.zhztheplayer</groupId>
       <artifactId>velox4j</artifactId>
@@ -225,4 +226,24 @@
       </plugin>
     </plugins>
   </build>
+
+  <profiles>
+    <profile>
+      <id>nexmark-tests</id>
+      <activation>
+        <property>
+          <name>maven.test.skip</name>
+          <value>!true</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>com.github.nexmark</groupId>
+          <artifactId>nexmark-flink</artifactId>
+          <version>0.3-SNAPSHOT</version>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 </project>
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index 3a40bda78c..14b250480b 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -71,7 +71,7 @@ public class NexmarkTest {
         }
       };
 
-  private static final int KAFKA_PORT = 9092;
+  private static final int KAFKA_PORT = 19092;
   private static String topicName = "nexmark";
 
   @RegisterExtension
@@ -83,7 +83,7 @@ public class NexmarkTest {
   private static final Map<String, String> KAFKA_VARIABLES =
       new HashMap<>() {
         {
-          put("BOOTSTRAP_SERVERS", "localhost:9092");
+          put("BOOTSTRAP_SERVERS", "localhost:" + KAFKA_PORT);
           put("NEXMARK_TABLE", "kafka");
         }
       };
@@ -284,6 +284,16 @@ public class NexmarkTest {
         }
       }
 
+      String queryFilter = System.getProperty("nexmark.queries");
+      if (queryFilter != null && !queryFilter.trim().isEmpty()) {
+        List<String> selectedQueries =
+            Arrays.stream(queryFilter.split(","))
+                .map(String::trim)
+                .filter(query -> !query.isEmpty())
+                .collect(Collectors.toList());
+        queryFiles.retainAll(selectedQueries);
+      }
+
       return queryFiles.stream().sorted().collect(Collectors.toList());
 
     } catch (URISyntaxException | IOException e) {
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
index 8f149cfb73..eda06f2711 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
@@ -141,7 +141,7 @@ class NexmarkSourceFactoryTest {
             long.class,
             long.class);
     Object generatorConfig =
-        generatorConfigCtor.newInstance(nexmarkConfig, 0L, 0L, maxEvents, 
maxEvents, 0L);
+        generatorConfigCtor.newInstance(nexmarkConfig, 0L, 0L, maxEvents, 0L, 
0L);
 
     Class<?> nexmarkSourceCls = Class.forName(NEXMARK_SOURCE_CN);
     Constructor<?> nexmarkSourceCtor =
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PulsarSourceSinkFactoryTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PulsarSourceSinkFactoryTest.java
new file mode 100644
index 0000000000..33acfb8f0f
--- /dev/null
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PulsarSourceSinkFactoryTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class PulsarSourceSinkFactoryTest {
+
+  @Test
+  void serviceLoaderDiscoversPulsarFactory() {
+    List<String> factoryNames =
+        
StreamSupport.stream(ServiceLoader.load(VeloxSourceSinkFactory.class).spliterator(),
 false)
+            .map(factory -> factory.getClass().getName())
+            .collect(Collectors.toList());
+
+    
assertThat(factoryNames).contains("org.apache.gluten.velox.PulsarSourceSinkFactory");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  void runtimeConnectorConfigIncludesPulsarConnector() throws Exception {
+    Field connectors = 
VeloxConnectorConfig.class.getDeclaredField("CONNECTORS");
+    connectors.setAccessible(true);
+
+    assertThat((List<String>) 
connectors.get(null)).contains("connector-pulsar");
+  }
+
+  @Test
+  void tableParametersMapFlinkSqlOptionsToVeloxPulsarOptions() {
+    PulsarSource source = new PulsarSource();
+    source.options.put("pulsar.client.serviceUrl", "pulsar://127.0.0.1:16650");
+    source.options.put("admin-url", "http://127.0.0.1:18080";);
+    source.options.put("topics", 
"persistent://public/default/gluten-pulsar-smoke");
+    source.options.put("pulsar.consumer.subscriptionName", "gluten-test-sub");
+    source.options.put("format", "raw");
+    source.options.put("value.format", "json");
+    source.options.put("source.start.message-id", "earliest");
+    source.subscriptionType = FakeSubscriptionType.Shared;
+
+    Map<String, String> tableParameters =
+        PulsarSourceSinkFactory.buildTableParameters(null, source);
+
+    assertThat(tableParameters)
+        .containsEntry("service.url", "pulsar://127.0.0.1:16650")
+        .containsEntry("admin.url", "http://127.0.0.1:18080";)
+        .containsEntry("topic", 
"persistent://public/default/gluten-pulsar-smoke")
+        .containsEntry("subscription.name", "gluten-test-sub")
+        .containsEntry("subscription.type", "shared")
+        .containsEntry("format", "json")
+        .containsEntry("initial.position", "earliest");
+  }
+
+  @Test
+  void detectsWrappedPulsarSource() {
+    assertThat(PulsarSourceSinkFactory.isPulsarSource(new 
WrappedSource())).isTrue();
+  }
+
+  private static class PulsarSource {
+    private final Map<String, String> options = new HashMap<>();
+    private FakeSubscriptionType subscriptionType;
+  }
+
+  private static class WrappedSource {
+    private final Object source = new PulsarSource();
+  }
+
+  private enum FakeSubscriptionType {
+    Shared
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to