This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 001093e5e0b [mqtt] Add portable MqttIO Read/Write transforms for batch
and streaming (revives #32385) (#38493)
001093e5e0b is described below
commit 001093e5e0bdeb0a360b3b0043f4dd2729550b0d
Author: Tobias Kaymak <[email protected]>
AuthorDate: Sat Jun 13 20:55:05 2026 +0200
[mqtt] Add portable MqttIO Read/Write transforms for batch and streaming
(revives #32385) (#38493)
* [mqtt] Add SchemaTransform providers for MqttIO Read/Write
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
so MqttIO can be used through the portable SchemaTransform API and exposed
as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
@DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
config round-trips through Beam Schemas.
Both batch and streaming are supported on the read side: omitting
maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
while setting either bounds it to a batch read. The provider descriptions
document this and note that streaming requires a portable streaming runner
(e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
not execute portable streaming cross-language reads.
Tests cover read-with-timeout-no-data, an unbounded streaming read
(publish/collect/cancel), and a write-then-read round trip against an
embedded ActiveMQ broker.
Revives the approved diff from PR #32385 (ahmedabu98, twosom) and adapts
it to the post-#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
* [mqtt] Add messaging expansion service and wire MqttIO into Python xlang
Adds a new :sdks:java:io:messaging-expansion-service module that serves
messaging IOs (MQTT for now, with room for JMS/Solace later) instead of
adding MqttIO to the shared :sdks:java:io:expansion-service, per review
feedback from @Abacn and @chamikaramj.
Registers MqttIO's SchemaTransforms in standard_expansion_services.yaml
under the new service with kafka-style names (ReadFromMqtt / WriteToMqtt),
skipping the core SchemaTransforms it bundles transitively (those are
generated from the Java IO expansion service). Regenerates
standard_external_transforms.yaml so the generated Python wrappers are
served by the messaging expansion service, and registers the new target in
the generateExternalTransformsConfig task and the xlang wrapper-validation
list.
The CHANGES.md announcement is deferred to the follow-up PR that sets up
the Xlang Messaging PostCommit, per review feedback.
* [expansion-service] Remove obsolete upToDateWhen workaround
outputs.upToDateWhen { false } in the shadowJar block was a workaround for
a corrupted gradle cache and is no longer needed (review feedback on
PR #38493).
---
sdks/java/io/expansion-service/build.gradle | 1 -
.../io/messaging-expansion-service/build.gradle | 52 +++++
.../java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 7 +
.../io/mqtt/MqttReadSchemaTransformProvider.java | 150 ++++++++++++
.../io/mqtt/MqttWriteSchemaTransformProvider.java | 132 +++++++++++
.../io/mqtt/MqttSchemaTransformProviderTest.java | 252 +++++++++++++++++++++
sdks/python/build.gradle | 1 +
sdks/python/test-suites/xlang/build.gradle | 3 +-
sdks/standard_expansion_services.yaml | 15 ++
sdks/standard_external_transforms.yaml | 57 ++++-
settings.gradle.kts | 1 +
11 files changed, 668 insertions(+), 3 deletions(-)
diff --git a/sdks/java/io/expansion-service/build.gradle
b/sdks/java/io/expansion-service/build.gradle
index 60ef89ed223..70a3fce538b 100644
--- a/sdks/java/io/expansion-service/build.gradle
+++ b/sdks/java/io/expansion-service/build.gradle
@@ -72,7 +72,6 @@ shadowJar {
attributes(["Multi-Release": true])
}
mergeServiceFiles()
- outputs.upToDateWhen { false }
}
description = "Apache Beam :: SDKs :: Java :: IO :: Expansion Service"
diff --git a/sdks/java/io/messaging-expansion-service/build.gradle
b/sdks/java/io/messaging-expansion-service/build.gradle
new file mode 100644
index 00000000000..6cdf67b86d6
--- /dev/null
+++ b/sdks/java/io/messaging-expansion-service/build.gradle
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
+
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.io.messaging.expansion.service',
+ exportJavadoc: false,
+ validateShadowJar: false,
+ shadowClosure: {},
+)
+
+shadowJar {
+ manifest {
+ attributes(["Multi-Release": true])
+ }
+ mergeServiceFiles()
+}
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Messaging Expansion
Service"
+ext.summary = "Expansion service serving messaging IOs (e.g. MQTT)"
+
+dependencies {
+ implementation project(":sdks:java:expansion-service")
+ permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
+ implementation project(":sdks:java:io:mqtt")
+ permitUnusedDeclared project(":sdks:java:io:mqtt") // BEAM-11761
+ runtimeOnly library.java.slf4j_jdk14
+}
+
+task runExpansionService (type: JavaExec) {
+ mainClass = "org.apache.beam.sdk.expansion.service.ExpansionService"
+ classpath = sourceSets.test.runtimeClasspath
+ args = [project.findProperty("constructionService.port") ?: "8097"]
+}
diff --git
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 78876eb6534..72449c0697a 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -37,7 +37,10 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -205,13 +208,17 @@ public class MqttIO {
private MqttIO() {}
/** A POJO describing a MQTT connection. */
+ @DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class ConnectionConfiguration implements Serializable
{
+ @SchemaFieldDescription("The MQTT broker URI.")
abstract String getServerUri();
+ @SchemaFieldDescription("The MQTT topic pattern.")
abstract @Nullable String getTopic();
+ @SchemaFieldDescription("The client ID prefix, which is used to construct
a unique client ID.")
abstract @Nullable String getClientId();
abstract @Nullable String getUsername();
diff --git
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java
new file mode 100644
index 00000000000..b83d9bba9f4
--- /dev/null
+++
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
+import static
org.apache.beam.sdk.io.mqtt.MqttReadSchemaTransformProvider.ReadConfiguration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.joda.time.Duration;
+
+@AutoService(SchemaTransformProvider.class)
+public class MqttReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<ReadConfiguration> {
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class ReadConfiguration implements Serializable {
+ public static Builder builder() {
+ return new
AutoValue_MqttReadSchemaTransformProvider_ReadConfiguration.Builder();
+ }
+
+ @SchemaFieldDescription("Configuration options to set up the MQTT
connection.")
+ public abstract ConnectionConfiguration getConnectionConfiguration();
+
+ @SchemaFieldDescription(
+ "The max number of records to receive. Setting this will result in a
bounded PCollection.")
+ @Nullable
+ public abstract Long getMaxNumRecords();
+
+ @SchemaFieldDescription(
+ "The maximum time for this source to read messages. Setting this will
result in a bounded PCollection.")
+ @Nullable
+ public abstract Long getMaxReadTimeSeconds();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setConnectionConfiguration(
+ ConnectionConfiguration connectionConfiguration);
+
+ public abstract Builder setMaxNumRecords(Long maxNumRecords);
+
+ public abstract Builder setMaxReadTimeSeconds(Long maxReadTimeSeconds);
+
+ public abstract ReadConfiguration build();
+ }
+ }
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:mqtt_read:v1";
+ }
+
+ @Override
+ public String description() {
+ return "Reads messages from an MQTT broker and outputs each payload as a
single `bytes` "
+ + "field.\n"
+ + "\n"
+ + "By default the read is unbounded (streaming): it keeps consuming
messages from the "
+ + "subscribed topic until the pipeline is stopped. Setting
`maxNumRecords` and/or "
+ + "`maxReadTimeSeconds` bounds the read, producing a bounded (batch)
PCollection.\n"
+ + "\n"
+ + "Note: streaming reads require a runner that supports portable
streaming (e.g. Prism, "
+ + "Flink, or Dataflow). The legacy local Python DirectRunner does not
execute portable "
+ + "streaming cross-language reads.";
+ }
+
+ @Override
+ protected SchemaTransform from(ReadConfiguration configuration) {
+ return new MqttReadSchemaTransform(configuration);
+ }
+
+ private static class MqttReadSchemaTransform extends SchemaTransform {
+ private final ReadConfiguration config;
+
+ MqttReadSchemaTransform(ReadConfiguration configuration) {
+ this.config = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ Preconditions.checkState(
+ input.getAll().isEmpty(),
+ "Expected zero input PCollections for this source, but found: %s",
+ input.getAll().keySet());
+
+ MqttIO.Read<byte[]> readTransform =
+
MqttIO.read().withConnectionConfiguration(config.getConnectionConfiguration());
+
+ Long maxRecords = config.getMaxNumRecords();
+ Long maxReadTime = config.getMaxReadTimeSeconds();
+ if (maxRecords != null) {
+ readTransform = readTransform.withMaxNumRecords(maxRecords);
+ }
+ if (maxReadTime != null) {
+ readTransform =
readTransform.withMaxReadTime(Duration.standardSeconds(maxReadTime));
+ }
+
+ Schema outputSchema =
Schema.builder().addByteArrayField("bytes").build();
+
+ PCollection<Row> outputRows =
+ input
+ .getPipeline()
+ .apply(readTransform)
+ .apply(
+ "Wrap in Beam Rows",
+ ParDo.of(
+ new DoFn<byte[], Row>() {
+ @ProcessElement
+ public void processElement(
+ @Element byte[] data, OutputReceiver<Row>
outputReceiver) {
+ outputReceiver.output(
+
Row.withSchema(outputSchema).addValue(data).build());
+ }
+ }))
+ .setRowSchema(outputSchema);
+
+ return PCollectionRowTuple.of("output", outputRows);
+ }
+ }
+}
diff --git
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..95ee00c8c3a
--- /dev/null
+++
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
+import static
org.apache.beam.sdk.io.mqtt.MqttWriteSchemaTransformProvider.WriteConfiguration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+@AutoService(SchemaTransformProvider.class)
+public class MqttWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider<WriteConfiguration> {
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class WriteConfiguration implements Serializable {
+ public static Builder builder() {
+ return new
AutoValue_MqttWriteSchemaTransformProvider_WriteConfiguration.Builder();
+ }
+
+ @SchemaFieldDescription("Configuration options to set up the MQTT
connection.")
+ public abstract ConnectionConfiguration getConnectionConfiguration();
+
+ @SchemaFieldDescription(
+ "Whether or not the publish message should be retained by the
messaging engine. "
+ + "When a subscriber connects, it gets the latest retained
message. "
+ + "Defaults to `False`, which will clear the retained message from
the server.")
+ @Nullable
+ public abstract Boolean getRetained();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setConnectionConfiguration(
+ ConnectionConfiguration connectionConfiguration);
+
+ public abstract Builder setRetained(Boolean retained);
+
+ public abstract WriteConfiguration build();
+ }
+ }
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:mqtt_write:v1";
+ }
+
+ @Override
+ public String description() {
+ return "Publishes messages to an MQTT broker. Expects an input PCollection
of rows with a "
+ + "single `bytes` field, each of which is published as one MQTT
message.\n"
+ + "\n"
+ + "Works with both bounded (batch) and unbounded (streaming) input
PCollections.";
+ }
+
+ @Override
+ protected SchemaTransform from(WriteConfiguration configuration) {
+ return new MqttWriteSchemaTransform(configuration);
+ }
+
+ private static class MqttWriteSchemaTransform extends SchemaTransform {
+ private final WriteConfiguration config;
+
+ MqttWriteSchemaTransform(WriteConfiguration configuration) {
+ this.config = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PCollection<Row> inputRows = input.getSinglePCollection();
+
+ Preconditions.checkState(
+ inputRows.getSchema().getFieldCount() == 1
+ &&
inputRows.getSchema().getField(0).getType().equals(Schema.FieldType.BYTES),
+ "Expected only one Schema field containing bytes, but instead
received: %s",
+ inputRows.getSchema());
+
+ MqttIO.Write<byte[]> writeTransform =
+
MqttIO.write().withConnectionConfiguration(config.getConnectionConfiguration());
+ Boolean retained = config.getRetained();
+ if (retained != null) {
+ writeTransform = writeTransform.withRetained(retained);
+ }
+
+ inputRows
+ .apply(
+ "Extract bytes",
+ ParDo.of(
+ new DoFn<Row, byte[]>() {
+ @ProcessElement
+ public void processElement(
+ @Element Row row, OutputReceiver<byte[]>
outputReceiver) {
+ outputReceiver.output(
+
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
+ row.getBytes(0)));
+ }
+ }))
+ .apply(writeTransform);
+
+ return PCollectionRowTuple.empty(inputRows.getPipeline());
+ }
+ }
+}
diff --git
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..60bdd1104db
--- /dev/null
+++
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.common.NetworkTestHelper;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttSchemaTransformProviderTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(MqttSchemaTransformProviderTest.class);
+
+ private BrokerService brokerService;
+
+ private int port;
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ @Before
+ public void startBroker() throws Exception {
+ port = NetworkTestHelper.getAvailableLocalPort();
+ LOG.info("Starting ActiveMQ brokerService on {}", port);
+ brokerService = new BrokerService();
+ brokerService.setDeleteAllMessagesOnStartup(true);
+ // use memory persistence for the test: it's faster and don't pollute test
folder with KahaDB
+ brokerService.setPersistent(false);
+ brokerService.addConnector("mqtt://localhost:" + port);
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ }
+
+ @Test(timeout = 30 * 1000)
+ public void testReceiveWithTimeoutAndNoData() {
+ MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration =
+ MqttReadSchemaTransformProvider.ReadConfiguration.builder()
+ .setConnectionConfiguration(
+ MqttIO.ConnectionConfiguration.create("tcp://localhost:" +
port, "READ_TOPIC")
+ .withClientId("READ_PIPELINE"))
+ .setMaxReadTimeSeconds(2L)
+ .build();
+
+ PCollectionRowTuple.empty(pipeline)
+ .apply(new MqttReadSchemaTransformProvider().from(readConfiguration));
+
+ // should stop before the test timeout
+ pipeline.run().waitUntilFinish();
+ }
+
+ /** Collects the bytes field of every output row into a shared queue
(DirectRunner is in-JVM). */
+ private static final ConcurrentLinkedQueue<String> STREAMING_RECEIVED =
+ new ConcurrentLinkedQueue<>();
+
+ private static class CollectBytesFn extends DoFn<Row, Void> {
+ @ProcessElement
+ public void processElement(@Element Row row) {
+ byte[] bytes = row.getBytes("bytes");
+ if (bytes != null) {
+ STREAMING_RECEIVED.add(new String(bytes, StandardCharsets.UTF_8));
+ }
+ }
+ }
+
+ /**
+ * Reads in streaming mode: when neither {@code maxNumRecords} nor {@code
maxReadTimeSeconds} is
+ * set the SchemaTransform produces an unbounded PCollection. Verifies that
messages published
+ * after the reader subscribes flow through continuously, then cancels the
running pipeline.
+ */
+ @Test(timeout = 90 * 1000)
+ public void testReadUnboundedStreaming() throws Exception {
+ STREAMING_RECEIVED.clear();
+ final String topicName = "STREAM_READ_TOPIC";
+
+ // No bound -> unbounded (streaming) read.
+ MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration =
+ MqttReadSchemaTransformProvider.ReadConfiguration.builder()
+ .setConnectionConfiguration(
+ MqttIO.ConnectionConfiguration.create("tcp://localhost:" +
port, topicName)
+ .withClientId("STREAM_READ_PIPELINE"))
+ .build();
+
+ // Use a local pipeline so run() does not block (the read never terminates
on its own).
+ PipelineOptions options =
+
PipelineOptionsFactory.fromArgs("--blockOnRun=false").withoutStrictParsing().create();
+ Pipeline p = Pipeline.create(options);
+ PCollectionRowTuple.empty(p)
+ .apply(new MqttReadSchemaTransformProvider().from(readConfiguration))
+ .get("output")
+ .apply(ParDo.of(new CollectBytesFn()));
+
+ // Publish a steady stream of messages until the reader has consumed
enough.
+ final boolean[] keepPublishing = {true};
+ MQTT client = new MQTT();
+ client.setHost("tcp://localhost:" + port);
+ final BlockingConnection publishConnection = client.blockingConnection();
+ publishConnection.connect();
+ Thread publisher =
+ new Thread(
+ () -> {
+ int i = 0;
+ try {
+ while (keepPublishing[0]) {
+ publishConnection.publish(
+ topicName,
+ ("stream-" + i).getBytes(StandardCharsets.UTF_8),
+ QoS.AT_LEAST_ONCE,
+ false);
+ i++;
+ Thread.sleep(200);
+ }
+ } catch (Exception e) {
+ // ignore: connection closed on teardown
+ }
+ });
+
+ PipelineResult result = p.run();
+ publisher.start();
+
+ // Wait until the unbounded read delivers a meaningful number of records.
+ int expected = 10;
+ long deadline = System.currentTimeMillis() + 60 * 1000;
+ while (STREAMING_RECEIVED.size() < expected && System.currentTimeMillis()
< deadline) {
+ Thread.sleep(500);
+ }
+
+ keepPublishing[0] = false;
+ publisher.join();
+ publishConnection.disconnect();
+ result.cancel();
+
+ assertTrue(
+ "expected at least " + expected + " streamed records, got " +
STREAMING_RECEIVED.size(),
+ STREAMING_RECEIVED.size() >= expected);
+ for (String received : STREAMING_RECEIVED) {
+ assertTrue("unexpected payload: " + received,
received.startsWith("stream-"));
+ }
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ final int numberOfTestMessages = 200;
+ MQTT client = new MQTT();
+ client.setHost("tcp://localhost:" + port);
+ final BlockingConnection connection = client.blockingConnection();
+ connection.connect();
+ connection.subscribe(new Topic[] {new Topic(Buffer.utf8("WRITE_TOPIC"),
QoS.EXACTLY_ONCE)});
+
+ final Set<String> messages = new ConcurrentSkipListSet<>();
+
+ Thread subscriber =
+ new Thread(
+ () -> {
+ try {
+ for (int i = 0; i < numberOfTestMessages; i++) {
+ Message message = connection.receive();
+ messages.add(new String(message.getPayload(),
StandardCharsets.UTF_8));
+ message.ack();
+ LOG.info("message: {}", new String(message.getPayload(),
StandardCharsets.UTF_8));
+ }
+ } catch (Exception e) {
+ LOG.error("Can't receive message", e);
+ }
+ });
+ subscriber.start();
+
+ ArrayList<byte[]> data = new ArrayList<>();
+ for (int i = 0; i < numberOfTestMessages; i++) {
+ data.add(("Test " + i).getBytes(StandardCharsets.UTF_8));
+ }
+
+ MqttWriteSchemaTransformProvider.WriteConfiguration writeConfiguration =
+ MqttWriteSchemaTransformProvider.WriteConfiguration.builder()
+ .setConnectionConfiguration(
+ MqttIO.ConnectionConfiguration.create("tcp://localhost:" +
port, "WRITE_TOPIC"))
+ .build();
+ Schema dataSchema = Schema.builder().addByteArrayField("bytes").build();
+
+ PCollection<Row> inputRows =
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(d -> Row.withSchema(dataSchema).addValue(d).build()))
+ .setRowSchema(dataSchema);
+ PCollectionRowTuple.of("input", inputRows)
+ .apply(new
MqttWriteSchemaTransformProvider().from(writeConfiguration));
+ pipeline.run();
+ subscriber.join();
+
+ connection.disconnect();
+
+ assertEquals(numberOfTestMessages, messages.size());
+ for (int i = 0; i < numberOfTestMessages; i++) {
+ assertTrue(messages.contains("Test " + i));
+ }
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ brokerService = null;
+ }
+ }
+}
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index b39b12f198e..9e2fe232c42 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -73,6 +73,7 @@ tasks.register("generateExternalTransformsConfig") {
// Need to build all expansion services listed in
sdks/standard_expansion_services.yaml
dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build"
dependsOn ":sdks:java:io:expansion-service:build"
+ dependsOn ":sdks:java:io:messaging-expansion-service:build"
// Keep this in-sync with pyproject.toml
def PyYaml = "'pyyaml>=3.12,<7.0.0'"
diff --git a/sdks/python/test-suites/xlang/build.gradle
b/sdks/python/test-suites/xlang/build.gradle
index 3065ad8377e..1cbbaa0db53 100644
--- a/sdks/python/test-suites/xlang/build.gradle
+++ b/sdks/python/test-suites/xlang/build.gradle
@@ -25,6 +25,7 @@ project.evaluationDependsOn(":sdks:python")
// relevant fields as done here, then add it to `xlangTasks`.
def gcpExpansionPath =
project.project(':sdks:java:io:google-cloud-platform:expansion-service').getPath()
def ioExpansionPath =
project.project(':sdks:java:io:expansion-service').getPath()
+def messagingExpansionPath =
project.project(':sdks:java:io:messaging-expansion-service').getPath()
// Properties that are common across runners.
// Used to launch the expansion service, collect the right tests, and cleanup
afterwards
def gcpXlang = new CrossLanguageTask().tap {
@@ -42,7 +43,7 @@ def ioXlang = new CrossLanguageTask().tap {
}
// This list should include all expansion service targets in
sdks/python/standard_expansion_services.yaml
-def servicesToGenerateFrom = [ioExpansionPath, gcpExpansionPath]
+def servicesToGenerateFrom = [ioExpansionPath, messagingExpansionPath,
gcpExpansionPath]
def xlangWrapperValidation = new CrossLanguageTask().tap {
name = "xlangWrapperValidation"
expansionProjectPaths = servicesToGenerateFrom
diff --git a/sdks/standard_expansion_services.yaml
b/sdks/standard_expansion_services.yaml
index 531caca5a37..79c7e06280d 100644
--- a/sdks/standard_expansion_services.yaml
+++ b/sdks/standard_expansion_services.yaml
@@ -53,6 +53,21 @@
- 'beam:schematransform:org.apache.beam:iceberg_read:v1'
- 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
+- gradle_target: 'sdks:java:io:messaging-expansion-service:shadowJar'
+ destinations:
+ python: 'apache_beam/io'
+ transforms:
+ 'beam:schematransform:org.apache.beam:mqtt_write:v1':
+ name: 'WriteToMqtt'
+ 'beam:schematransform:org.apache.beam:mqtt_read:v1':
+ name: 'ReadFromMqtt'
+ skip_transforms:
+ # Core SchemaTransforms bundled via :sdks:java:expansion-service; already
+ # generated from the Java IO expansion service above.
+ - 'beam:schematransform:org.apache.beam:generate_sequence:v1'
+ - 'beam:schematransform:org.apache.beam:tfrecord_read:v1'
+ - 'beam:schematransform:org.apache.beam:tfrecord_write:v1'
+
# TODO(ahmedabu98): Enable this service in a future PR
#- gradle_target:
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
# destinations:
diff --git a/sdks/standard_external_transforms.yaml
b/sdks/standard_external_transforms.yaml
index b50402a64d5..b9802f11b6c 100644
--- a/sdks/standard_external_transforms.yaml
+++ b/sdks/standard_external_transforms.yaml
@@ -19,7 +19,7 @@
# configuration in /sdks/standard_expansion_services.yaml.
# Refer to gen_xlang_wrappers.py for more info.
#
-# Last updated on: 2026-05-06
+# Last updated on: 2026-06-11
- default_service: sdks:java:io:expansion-service:shadowJar
description: ''
@@ -180,3 +180,58 @@
type: str
identifier: beam:schematransform:org.apache.beam:tfrecord_write:v1
name: TfrecordWrite
+- default_service: sdks:java:io:messaging-expansion-service:shadowJar
+ description: 'Reads messages from an MQTT broker and outputs each payload as
a single
+ `bytes` field.
+
+
+ By default the read is unbounded (streaming): it keeps consuming messages
from
+ the subscribed topic until the pipeline is stopped. Setting
`maxNumRecords` and/or
+ `maxReadTimeSeconds` bounds the read, producing a bounded (batch)
PCollection.
+
+
+ Note: streaming reads require a runner that supports portable streaming
(e.g.
+ Prism, Flink, or Dataflow). The legacy local Python DirectRunner does not
execute
+ portable streaming cross-language reads.'
+ destinations:
+ python: apache_beam/io
+ fields:
+ - description: Configuration options to set up the MQTT connection.
+ name: connection_configuration
+ nullable: false
+ type: Row(client_id=typing.Optional[str], password=typing.Optional[str],
server_uri=<class
+ 'str'>, topic=typing.Optional[str], username=typing.Optional[str])
+ - description: The max number of records to receive. Setting this will
result in
+ a bounded PCollection.
+ name: max_num_records
+ nullable: true
+ type: int64
+ - description: The maximum time for this source to read messages. Setting
this will
+ result in a bounded PCollection.
+ name: max_read_time_seconds
+ nullable: true
+ type: int64
+ identifier: beam:schematransform:org.apache.beam:mqtt_read:v1
+ name: ReadFromMqtt
+- default_service: sdks:java:io:messaging-expansion-service:shadowJar
+ description: 'Publishes messages to an MQTT broker. Expects an input
PCollection
+ of rows with a single `bytes` field, each of which is published as one
MQTT message.
+
+
+ Works with both bounded (batch) and unbounded (streaming) input
PCollections.'
+ destinations:
+ python: apache_beam/io
+ fields:
+ - description: Configuration options to set up the MQTT connection.
+ name: connection_configuration
+ nullable: false
+ type: Row(client_id=typing.Optional[str], password=typing.Optional[str],
server_uri=<class
+ 'str'>, topic=typing.Optional[str], username=typing.Optional[str])
+ - description: Whether or not the publish message should be retained by the
messaging
+ engine. When a subscriber connects, it gets the latest retained message.
Defaults
+ to `False`, which will clear the retained message from the server.
+ name: retained
+ nullable: true
+ type: boolean
+ identifier: beam:schematransform:org.apache.beam:mqtt_write:v1
+ name: WriteToMqtt
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 443d9c56775..3d4346661a4 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -236,6 +236,7 @@
include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-8")
include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-9")
include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common")
include(":sdks:java:io:expansion-service")
+include(":sdks:java:io:messaging-expansion-service")
include(":sdks:java:io:file-based-io-tests")
include(":sdks:java:io:bigquery-io-perf-tests")
include(":sdks:java:io:cdap")