This is an automated email from the ASF dual-hosted git repository.
zhanglistar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 22cefd5264 [FLINK] Support Apache Pulsar as DataSource (#12297)
22cefd5264 is described below
commit 22cefd5264557a6d42dd36f1f8ab8e7ea8a1627c
Author: zhanglistar <[email protected]>
AuthorDate: Mon Jun 29 17:08:50 2026 +0800
[FLINK] Support Apache Pulsar as DataSource (#12297)
* Fix Flink Nexmark window aggregate tests
* [VL] Add Flink Pulsar source support
* [FLINK] Point CI to Pulsar velox4j commit
* [FLINK] Update velox4j pin for stateful join fix
* [FLINK] Point CI to Pulsar velox4j commit
* [FLINK] Update velox4j pin for stateful join fix
* [FLINK] Adapt to latest velox4j API and simplify Pulsar source detection
- Simplify isPulsarSource to use class simple name check (like Kafka)
- Replace hand-rolled reflection with ReflectUtils.getObjectField
- Adapt NexmarkSourceFactory to new NexmarkGeneratorConfig API
- Adapt PrintSinkFactory to new PrintTableHandle constructor (isStdErr)
- Update CI velox4j pin to gluten-0530 latest commit
* Update flink.yml
* [FLINK] Update velox4j CI pin to latest gluten-0530
* [FLINK] Fix isPulsarSource to handle wrapped sources
* [FLINK] Add --add-opens for jdk.internal.reflect to fix NexmarkTest
* [FLINK] Fix NexmarkSourceFactoryTest GeneratorConfig constructor arity
* fix ut
* [FLINK] Add --add-opens jdk.internal.reflect to surefire extraJavaTestArgs
The NexmarkTest was failing with InaccessibleObjectException because
surefire forks a new JVM that reads argLine (not JAVA_TOOL_OPTIONS).
Adding --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED to the
POM's extraJavaTestArgs ensures the forked JVM always has the required
opens regardless of environment variables.
* [FLINK] Add --add-opens sun.reflect.generics.repository for NexmarkTest
q18
* [FLINK] Remove dead getWindowPropertyIndex method from
StreamExecWindowAggregate
* [FLINK] Fix spotless format violation in StreamExecWindowAggregate
---
.github/workflows/flink.yml | 4 +
.../plan/nodes/exec/common/CommonExecSink.java | 4 +-
.../stream/StreamExecGlobalWindowAggregate.java | 2 +-
.../stream/StreamExecLocalWindowAggregate.java | 2 +-
.../exec/stream/StreamExecTableSourceScan.java | 4 +-
.../org/apache/gluten/rexnode/WindowUtils.java | 2 +
.../gluten/velox/PulsarSourceSinkFactory.java | 337 +++++++++++++++++++++
.../org.apache.gluten.velox.VeloxSourceSinkFactory | 1 +
.../table/runtime/config/VeloxConnectorConfig.java | 1 +
.../runtime/operators/GlutenSourceFunction.java | 1 +
.../gluten/velox/VeloxSourceSinkFactory.java | 73 ++++-
gluten-flink/ut/pom.xml | 33 +-
.../table/runtime/stream/custom/NexmarkTest.java | 14 +-
.../gluten/velox/NexmarkSourceFactoryTest.java | 2 +-
.../gluten/velox/PulsarSourceSinkFactoryTest.java | 96 ++++++
15 files changed, 557 insertions(+), 19 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 1df73f3119..66c5734923 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -36,6 +36,8 @@ env:
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
+ --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED
+ --add-opens=java.base/sun.reflect.generics.repository=ALL-UNNAMED
JAVA_TOOL_OPTIONS: >-
--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=org.apache.arrow.memory.core,ALL-UNNAMED
@@ -48,6 +50,8 @@ env:
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
+ --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED
+ --add-opens=java.base/sun.reflect.generics.repository=ALL-UNNAMED
CCACHE_DIR: "${{ github.workspace }}/.ccache"
jobs:
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index fab420c481..1f3bb41e45 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -222,7 +222,9 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
Configuration.class.getName(),
streamExecEnv.getConfiguration(),
ResolvedSchema.class.getName(),
- schema));
+ schema,
+ VeloxSourceSinkFactory.FACTORY_CLASS_LOADER_KEY,
+ CommonExecSink.class.getClassLoader()));
// --- End Gluten-specific code changes ---
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index e5dca4ee8d..1e657950ef 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -259,7 +259,7 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
offset,
windowType,
outputType,
- true,
+ windowing.isRowtime(),
rowtimeIndex,
windowStartIndex,
windowEndIndex);
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index d67d61709e..63e51797ea 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -220,7 +220,7 @@ public class StreamExecLocalWindowAggregate extends
StreamExecWindowAggregateBas
offset,
windowType,
outputType,
- false,
+ windowing.isRowtime(),
rowtimeIndex,
-1,
-1);
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index e7ec714e70..91f59fd636 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -183,7 +183,9 @@ public class StreamExecTableSourceScan extends
CommonExecTableSourceScan
"checkpoint.enabled",
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled(),
"watermarkPushDownSpec",
- watermarkPushDownSpec));
+ watermarkPushDownSpec,
+ VeloxSourceSinkFactory.FACTORY_CLASS_LOADER_KEY,
+ StreamExecTableSourceScan.class.getClassLoader()));
// --- End Gluten-specific code changes ---
}
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
index 021969f2a2..b42635f06b 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
@@ -48,6 +48,7 @@ public class WindowUtils {
int windowType = -1;
WindowSpec windowSpec = windowing.getWindow();
if (windowSpec instanceof HoppingWindowSpec) {
+ windowType = 0;
size = ((HoppingWindowSpec) windowSpec).getSize().toMillis();
slide = ((HoppingWindowSpec) windowSpec).getSlide().toMillis();
if (size % slide != 0) {
@@ -63,6 +64,7 @@ public class WindowUtils {
}
windowType = 0;
} else if (windowSpec instanceof TumblingWindowSpec) {
+ windowType = 1;
size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
Duration windowOffset = ((TumblingWindowSpec) windowSpec).getOffset();
if (windowOffset != null) {
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PulsarSourceSinkFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PulsarSourceSinkFactory.java
new file mode 100644
index 0000000000..354945c78c
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PulsarSourceSinkFactory.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.PulsarConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.PulsarTableHandle;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.dag.Transformation;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+
+public class PulsarSourceSinkFactory implements VeloxSourceSinkFactory {
+
+ private static final String CONNECTOR_ID = "connector-pulsar";
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean match(Transformation<RowData> transformation) {
+ if (transformation instanceof SourceTransformation) {
+ Source source = ((SourceTransformation) transformation).getSource();
+ return isPulsarSource(source);
+ }
+ return false;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public Transformation<RowData> buildVeloxSource(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ RowType outputType =
+ (RowType)
+ LogicalTypeConverter.toVLType(
+ ((InternalTypeInfo<?>)
transformation.getOutputType()).toLogicalType());
+ try {
+ ScanTableSource tableSource =
+ (ScanTableSource) parameters.get(ScanTableSource.class.getName());
+ SourceTransformation sourceTransformation = (SourceTransformation)
transformation;
+ Source source = sourceTransformation.getSource();
+ Map<String, String> pulsarTableParameters =
buildTableParameters(tableSource, source);
+ String topic = required(pulsarTableParameters, "topic");
+ String serviceUrl = required(pulsarTableParameters, "service.url");
+ String subscriptionName = required(pulsarTableParameters,
"subscription.name");
+ String format = pulsarTableParameters.getOrDefault("format", "raw");
+
+ String planId = PlanNodeIdGenerator.newId();
+ PulsarTableHandle pulsarTableHandle =
+ new PulsarTableHandle(CONNECTOR_ID, topic, outputType,
pulsarTableParameters);
+ PulsarConnectorSplit connectorSplit =
+ new PulsarConnectorSplit(CONNECTOR_ID, serviceUrl, topic,
subscriptionName, format);
+ TableScanNode pulsarScan =
+ new TableScanNode(planId, outputType, pulsarTableHandle, List.of());
+ GlutenStreamSource sourceOp =
+ new GlutenStreamSource(
+ new GlutenSourceFunction(
+ new StatefulPlanNode(pulsarScan.getId(), pulsarScan),
+ Map.of(pulsarScan.getId(), outputType),
+ pulsarScan.getId(),
+ connectorSplit,
+ RowData.class),
+ "PulsarSource");
+ return new LegacySourceTransformation<RowData>(
+ sourceTransformation.getName(),
+ sourceOp,
+ transformation.getOutputType(),
+ sourceTransformation.getParallelism(),
+ sourceTransformation.getBoundedness(),
+ false);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Transformation<RowData> buildVeloxSink(
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
+ throw new FlinkRuntimeException("Unimplemented method 'buildSink'");
+ }
+
+ static Map<String, String> buildTableParameters(Object tableSource, Object
source) {
+ Map<String, String> options = new HashMap<>();
+ putAllStringOptions(options, tableSource);
+ putAllStringOptions(options, source);
+
+ firstString(tableSource, source, "serviceUrl", "serviceURL",
"pulsarServiceUrl")
+ .ifPresent(value -> options.put("service.url", value));
+ option(options, "service-url", "pulsar.service.url", "service.url",
"pulsar.client.serviceUrl")
+ .ifPresent(value -> options.put("service.url", value));
+
+ firstString(tableSource, source, "adminUrl", "adminURL", "pulsarAdminUrl")
+ .ifPresent(value -> options.put("admin.url", value));
+ option(options, "admin-url", "pulsar.admin.url", "admin.url")
+ .ifPresent(value -> options.put("admin.url", value));
+
+ firstTopic(tableSource, source).ifPresent(value -> options.put("topic",
value));
+ option(options, "topic", "topics").ifPresent(value -> options.put("topic",
value));
+
+ firstString(tableSource, source, "subscriptionName")
+ .ifPresent(value -> options.put("subscription.name", value));
+ option(
+ options,
+ "subscription-name",
+ "subscription.name",
+ "pulsar.subscription.name",
+ "pulsar.consumer.subscriptionName",
+ "properties.subscription.name",
+ "source.subscription-name")
+ .ifPresent(value -> options.put("subscription.name", value));
+
+ options.putIfAbsent("subscription.name", "gluten-pulsar-" +
UUID.randomUUID().toString());
+ firstString(tableSource, source, "subscriptionType")
+ .or(
+ () ->
+ option(
+ options,
+ "source.subscription-type",
+ "subscription.type",
+ "pulsar.subscription.type"))
+ .map(PulsarSourceSinkFactory::normalizeSubscriptionType)
+ .ifPresent(value -> options.put("subscription.type", value));
+ option(options, "value.format")
+ .or(
+ () -> {
+ String resolvedFormat = resolveFormat(tableSource);
+ if (!"raw".equals(resolvedFormat)) {
+ return Optional.of(resolvedFormat);
+ }
+ return option(options, "format").filter(value ->
!"raw".equals(value));
+ })
+ .ifPresent(value -> options.put("format", value));
+ options.putIfAbsent("format", "raw");
+ option(
+ options,
+ "scan.startup.mode",
+ "startup.mode",
+ "initial.position",
+ "source.start.message-id")
+ .map(PulsarSourceSinkFactory::toInitialPosition)
+ .ifPresent(value -> options.put("initial.position", value));
+ options.putIfAbsent("initial.position", "latest");
+ return options;
+ }
+
+ static boolean isPulsarSource(Object source) {
+ return isPulsarSource(source, 0);
+ }
+
+ private static boolean isPulsarSource(Object source, int depth) {
+ if (source == null || depth > 3) {
+ return false;
+ }
+ if (source.getClass().getSimpleName().equals("PulsarSource")) {
+ return true;
+ }
+ // Check wrapped source fields (e.g. Flink wraps PulsarSource in a
SourceReaderContext wrapper)
+ for (java.lang.reflect.Field f : source.getClass().getDeclaredFields()) {
+ Class<?> fieldType = f.getType();
+ // Only recurse into non-primitive, non-collection, non-map fields that
could wrap a Source
+ if (fieldType.isPrimitive()
+ || fieldType == String.class
+ || Collection.class.isAssignableFrom(fieldType)
+ || Map.class.isAssignableFrom(fieldType)
+ || fieldType.isEnum()) {
+ continue;
+ }
+ f.setAccessible(true);
+ try {
+ Object inner = f.get(source);
+ if (inner != null && isPulsarSource(inner, depth + 1)) {
+ return true;
+ }
+ } catch (IllegalAccessException ignored) {
+ }
+ }
+ return false;
+ }
+
+ private static String required(Map<String, String> options, String key) {
+ String value = options.get(key);
+ if (value == null || value.isEmpty()) {
+ throw new FlinkRuntimeException("Missing Pulsar option: " + key);
+ }
+ return value;
+ }
+
+ private static String resolveFormat(Object tableSource) {
+ Optional<Object> decodingFormat = firstField(tableSource,
"valueDecodingFormat", "format");
+ if (decodingFormat.isEmpty()) {
+ decodingFormat =
+ firstField(tableSource, "deserializationSchemaFactory")
+ .flatMap(factory -> firstField(factory, "valueDecodingFormat"));
+ }
+ if (decodingFormat.isPresent() && decodingFormat.get() instanceof
DecodingFormat) {
+ String className = decodingFormat.get().getClass().getName();
+ if (className.contains("JsonFormatFactory")) {
+ return "json";
+ } else if (className.contains("CsvFormatFactory")) {
+ return "csv";
+ }
+ }
+ return "raw";
+ }
+
+ private static String toInitialPosition(String startupMode) {
+ String normalized = startupMode.toLowerCase().replace("_", "-");
+ if (normalized.contains("earliest")) {
+ return "earliest";
+ }
+ if (normalized.contains("latest")) {
+ return "latest";
+ }
+ return normalized;
+ }
+
+ private static String normalizeSubscriptionType(String subscriptionType) {
+ return subscriptionType.toLowerCase(Locale.ROOT).replace("-", "_");
+ }
+
+ private static Optional<String> firstTopic(Object... targets) {
+ for (Object target : targets) {
+ Optional<Object> topics = firstField(target, "topics", "topic",
"topicName");
+ if (topics.isPresent()) {
+ Object value = topics.get();
+ if (value instanceof Collection) {
+ return ((Collection<?>)
value).stream().findFirst().map(String::valueOf);
+ }
+ if (value instanceof String[]) {
+ return Arrays.stream((String[]) value).findFirst();
+ }
+ String text = String.valueOf(value);
+ if (!text.isEmpty()) {
+ return Optional.of(text);
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static Optional<String> firstString(Object first, Object second,
String... fieldNames) {
+ return firstField(first, fieldNames)
+ .or(() -> firstField(second, fieldNames))
+ .map(String::valueOf)
+ .filter(value -> !value.isEmpty());
+ }
+
+ private static Optional<String> option(Map<String, String> options,
String... keys) {
+ for (String key : keys) {
+ String value = options.get(key);
+ if (value != null && !value.isEmpty()) {
+ return Optional.of(value);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static void putAllStringOptions(Map<String, String> output, Object
target) {
+ firstField(target, "properties", "tableOptions", "options",
"configuration")
+ .ifPresent(value -> putAllStringOptions(output, value));
+ if (target instanceof Properties) {
+ Properties properties = (Properties) target;
+ for (String key : properties.stringPropertyNames()) {
+ output.put(key, properties.getProperty(key));
+ }
+ } else if (target instanceof Map) {
+ ((Map<?, ?>) target)
+ .forEach(
+ (key, value) -> {
+ if (key != null && value != null) {
+ output.put(String.valueOf(key), String.valueOf(value));
+ }
+ });
+ }
+ }
+
+ private static Optional<Object> firstField(Object target, String...
fieldNames) {
+ if (target == null) {
+ return Optional.empty();
+ }
+ for (String fieldName : fieldNames) {
+ Optional<Object> value = field(target, fieldName);
+ if (value.isPresent()) {
+ return value;
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static Optional<Object> field(Object target, String fieldName) {
+ Class<?> clazz = target.getClass();
+ while (clazz != null) {
+ try {
+ return Optional.ofNullable(ReflectUtils.getObjectField(clazz, target,
fieldName));
+ } catch (FlinkRuntimeException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
index b2d824ee12..db57ce53dc 100644
---
a/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
+++
b/gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory
@@ -1,5 +1,6 @@
org.apache.gluten.velox.FromElementsSourceFactory
org.apache.gluten.velox.KafkaSourceSinkFactory
+org.apache.gluten.velox.PulsarSourceSinkFactory
org.apache.gluten.velox.PrintSinkFactory
org.apache.gluten.velox.NexmarkSourceFactory
org.apache.gluten.velox.FileSystemSinkFactory
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
index df9da7512a..967b8de73c 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
@@ -33,6 +33,7 @@ public class VeloxConnectorConfig {
List.of(
"connector-nexmark",
"connector-kafka",
+ "connector-pulsar",
"connector-fuzzer",
"connector-filesystem",
"connector-from-elements",
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 76cced93f1..19c7a69e99 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -110,6 +110,7 @@ public class GlutenSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
processAvailableElement(sourceContext);
break;
case BLOCKED:
+ task.waitFor();
break;
default:
return;
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
index aff1bb7795..f3240bc7d9 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/velox/VeloxSourceSinkFactory.java
@@ -22,14 +22,29 @@ import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
public interface VeloxSourceSinkFactory {
static final Logger LOG =
LoggerFactory.getLogger(VeloxSourceSinkFactory.class);
+ String FACTORY_CLASS_LOADER_KEY = "velox.source-sink.factory.classloader";
+
+ List<String> FACTORY_CLASS_NAMES =
+ List.of(
+ "org.apache.gluten.velox.FromElementsSourceFactory",
+ "org.apache.gluten.velox.KafkaSourceSinkFactory",
+ "org.apache.gluten.velox.PulsarSourceSinkFactory",
+ "org.apache.gluten.velox.PrintSinkFactory",
+ "org.apache.gluten.velox.NexmarkSourceFactory",
+ "org.apache.gluten.velox.FileSystemSinkFactory",
+ "org.apache.gluten.velox.FuzzerSourceSinkFactory");
+
/** Match the conditions to determine if the operator can be offloaded to
velox. */
boolean match(Transformation<RowData> transformation);
@@ -43,21 +58,67 @@ public interface VeloxSourceSinkFactory {
/** Choose the matched source/sink factory by given transformation. */
private static Optional<VeloxSourceSinkFactory> getFactory(
- Transformation<RowData> transformation) {
+ Transformation<RowData> transformation, Map<String, Object> parameters) {
ServiceLoader<VeloxSourceSinkFactory> factories =
ServiceLoader.load(VeloxSourceSinkFactory.class);
- for (VeloxSourceSinkFactory factory : factories) {
- if (factory.match(transformation)) {
- return Optional.of(factory);
+ try {
+ for (VeloxSourceSinkFactory factory : factories) {
+ if (factory.match(transformation)) {
+ return Optional.of(factory);
+ }
+ }
+ } catch (ServiceConfigurationError e) {
+ LOG.warn("Failed to load Velox source/sink factory", e);
+ }
+ for (String factoryClassName : FACTORY_CLASS_NAMES) {
+ Optional<VeloxSourceSinkFactory> factory = loadFactory(factoryClassName,
parameters);
+ if (factory.isPresent() && factory.get().match(transformation)) {
+ return factory;
}
}
return Optional.empty();
}
+ private static Optional<VeloxSourceSinkFactory> loadFactory(
+ String factoryClassName, Map<String, Object> parameters) {
+ Object classLoader = parameters.get(FACTORY_CLASS_LOADER_KEY);
+ if (classLoader instanceof ClassLoader) {
+ Optional<VeloxSourceSinkFactory> factory =
+ loadFactory(factoryClassName, (ClassLoader) classLoader);
+ if (factory.isPresent()) {
+ return factory;
+ }
+ }
+ ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
+ Optional<VeloxSourceSinkFactory> factory = loadFactory(factoryClassName,
contextClassLoader);
+ if (factory.isPresent()) {
+ return factory;
+ }
+ return loadFactory(factoryClassName,
VeloxSourceSinkFactory.class.getClassLoader());
+ }
+
+ private static Optional<VeloxSourceSinkFactory> loadFactory(
+ String factoryClassName, ClassLoader classLoader) {
+ try {
+ return Optional.of(
+ (VeloxSourceSinkFactory)
+ Class.forName(factoryClassName, true, classLoader)
+ .getDeclaredConstructor()
+ .newInstance());
+ } catch (ClassNotFoundException e) {
+ return Optional.empty();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw new RuntimeException("Failed to instantiate Velox source/sink
factory", e);
+ }
+ }
+
/** Build Velox source, or fallback to flink orignal source . */
static Transformation<RowData> buildSource(
Transformation<RowData> transformation, Map<String, Object> parameters) {
- Optional<VeloxSourceSinkFactory> factory = getFactory(transformation);
+ Optional<VeloxSourceSinkFactory> factory = getFactory(transformation,
parameters);
if (factory.isEmpty()) {
LOG.warn(
"Not find matched factory to build velox source transformation, and
we will use flink original transformation {} instead.",
@@ -71,7 +132,7 @@ public interface VeloxSourceSinkFactory {
/** Build Velox sink, or fallback to flink original sink. */
static Transformation<RowData> buildSink(
Transformation<RowData> transformation, Map<String, Object> parameters) {
- Optional<VeloxSourceSinkFactory> factory = getFactory(transformation);
+ Optional<VeloxSourceSinkFactory> factory = getFactory(transformation,
parameters);
if (factory.isEmpty()) {
LOG.warn(
"Not find matched factory to build velox sink transformation, and we
will use flink original transformation {} instead.",
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index 03bda07034..f94d743e3e 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -39,6 +39,8 @@
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
+ --add-opens=java.base/jdk.internal.reflect=ALL-UNNAMED
+ --add-opens=java.base/sun.reflect.generics.repository=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED</extraJavaTestArgs>
</properties>
@@ -53,6 +55,11 @@
<artifactId>gluten-flink-runtime</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-flink-planner</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
@@ -96,12 +103,6 @@
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
- <dependency>
- <groupId>com.github.nexmark</groupId>
- <artifactId>nexmark-flink</artifactId>
- <version>0.3-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>io.github.zhztheplayer</groupId>
<artifactId>velox4j</artifactId>
@@ -225,4 +226,24 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>nexmark-tests</id>
+ <activation>
+ <property>
+ <name>maven.test.skip</name>
+ <value>!true</value>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>com.github.nexmark</groupId>
+ <artifactId>nexmark-flink</artifactId>
+ <version>0.3-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index 3a40bda78c..14b250480b 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -71,7 +71,7 @@ public class NexmarkTest {
}
};
- private static final int KAFKA_PORT = 9092;
+ private static final int KAFKA_PORT = 19092;
private static String topicName = "nexmark";
@RegisterExtension
@@ -83,7 +83,7 @@ public class NexmarkTest {
private static final Map<String, String> KAFKA_VARIABLES =
new HashMap<>() {
{
- put("BOOTSTRAP_SERVERS", "localhost:9092");
+ put("BOOTSTRAP_SERVERS", "localhost:" + KAFKA_PORT);
put("NEXMARK_TABLE", "kafka");
}
};
@@ -284,6 +284,16 @@ public class NexmarkTest {
}
}
+ String queryFilter = System.getProperty("nexmark.queries");
+ if (queryFilter != null && !queryFilter.trim().isEmpty()) {
+ List<String> selectedQueries =
+ Arrays.stream(queryFilter.split(","))
+ .map(String::trim)
+ .filter(query -> !query.isEmpty())
+ .collect(Collectors.toList());
+ queryFiles.retainAll(selectedQueries);
+ }
+
return queryFiles.stream().sorted().collect(Collectors.toList());
} catch (URISyntaxException | IOException e) {
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
index 8f149cfb73..eda06f2711 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
@@ -141,7 +141,7 @@ class NexmarkSourceFactoryTest {
long.class,
long.class);
Object generatorConfig =
- generatorConfigCtor.newInstance(nexmarkConfig, 0L, 0L, maxEvents,
maxEvents, 0L);
+ generatorConfigCtor.newInstance(nexmarkConfig, 0L, 0L, maxEvents, 0L,
0L);
Class<?> nexmarkSourceCls = Class.forName(NEXMARK_SOURCE_CN);
Constructor<?> nexmarkSourceCtor =
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PulsarSourceSinkFactoryTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PulsarSourceSinkFactoryTest.java
new file mode 100644
index 0000000000..33acfb8f0f
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PulsarSourceSinkFactoryTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.table.runtime.config.VeloxConnectorConfig;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class PulsarSourceSinkFactoryTest {
+
+ @Test
+ void serviceLoaderDiscoversPulsarFactory() {
+ List<String> factoryNames =
+
StreamSupport.stream(ServiceLoader.load(VeloxSourceSinkFactory.class).spliterator(),
false)
+ .map(factory -> factory.getClass().getName())
+ .collect(Collectors.toList());
+
+
assertThat(factoryNames).contains("org.apache.gluten.velox.PulsarSourceSinkFactory");
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void runtimeConnectorConfigIncludesPulsarConnector() throws Exception {
+ Field connectors =
VeloxConnectorConfig.class.getDeclaredField("CONNECTORS");
+ connectors.setAccessible(true);
+
+ assertThat((List<String>)
connectors.get(null)).contains("connector-pulsar");
+ }
+
+ @Test
+ void tableParametersMapFlinkSqlOptionsToVeloxPulsarOptions() {
+ PulsarSource source = new PulsarSource();
+ source.options.put("pulsar.client.serviceUrl", "pulsar://127.0.0.1:16650");
+ source.options.put("admin-url", "http://127.0.0.1:18080");
+ source.options.put("topics",
"persistent://public/default/gluten-pulsar-smoke");
+ source.options.put("pulsar.consumer.subscriptionName", "gluten-test-sub");
+ source.options.put("format", "raw");
+ source.options.put("value.format", "json");
+ source.options.put("source.start.message-id", "earliest");
+ source.subscriptionType = FakeSubscriptionType.Shared;
+
+ Map<String, String> tableParameters =
+ PulsarSourceSinkFactory.buildTableParameters(null, source);
+
+ assertThat(tableParameters)
+ .containsEntry("service.url", "pulsar://127.0.0.1:16650")
+ .containsEntry("admin.url", "http://127.0.0.1:18080")
+ .containsEntry("topic",
"persistent://public/default/gluten-pulsar-smoke")
+ .containsEntry("subscription.name", "gluten-test-sub")
+ .containsEntry("subscription.type", "shared")
+ .containsEntry("format", "json")
+ .containsEntry("initial.position", "earliest");
+ }
+
+ @Test
+ void detectsWrappedPulsarSource() {
+ assertThat(PulsarSourceSinkFactory.isPulsarSource(new
WrappedSource())).isTrue();
+ }
+
+ private static class PulsarSource {
+ private final Map<String, String> options = new HashMap<>();
+ private FakeSubscriptionType subscriptionType;
+ }
+
+ private static class WrappedSource {
+ private final Object source = new PulsarSource();
+ }
+
+ private enum FakeSubscriptionType {
+ Shared
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]