gemini-code-assist[bot] commented on code in PR #38493: URL: https://github.com/apache/beam/pull/38493#discussion_r3390702978
########## sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration; +import static org.apache.beam.sdk.io.mqtt.MqttReadSchemaTransformProvider.ReadConfiguration; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.joda.time.Duration; + +@AutoService(SchemaTransformProvider.class) +public class MqttReadSchemaTransformProvider + extends TypedSchemaTransformProvider<ReadConfiguration> { + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class ReadConfiguration implements Serializable { + public static Builder builder() { + return new AutoValue_MqttReadSchemaTransformProvider_ReadConfiguration.Builder(); + } + + @SchemaFieldDescription("Configuration options to set up the MQTT connection.") + public abstract ConnectionConfiguration getConnectionConfiguration(); + + @SchemaFieldDescription( + "The max number of records to receive. Setting this will result in a bounded PCollection.") + @Nullable + public abstract Long getMaxNumRecords(); + + @SchemaFieldDescription( + "The maximum time for this source to read messages. Setting this will result in a bounded PCollection.") + @Nullable + public abstract Long getMaxReadTimeSeconds(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setConnectionConfiguration( + ConnectionConfiguration connectionConfiguration); + + public abstract Builder setMaxNumRecords(Long maxNumRecords); + + public abstract Builder setMaxReadTimeSeconds(Long maxReadTimeSeconds); + + public abstract ReadConfiguration build(); + } + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mqtt_read:v1"; + } + + @Override + public String description() { + return "Reads messages from an MQTT broker and outputs each payload as a single `bytes` " + + "field.\n" + + "\n" + + "By default the read is unbounded (streaming): it keeps consuming messages from the " + + "subscribed topic until the pipeline is stopped. Setting `maxNumRecords` and/or " + + "`maxReadTimeSeconds` bounds the read, producing a bounded (batch) PCollection.\n" + + "\n" + + "Note: streaming reads require a runner that supports portable streaming (e.g. Prism, " + + "Flink, or Dataflow). The legacy local Python DirectRunner does not execute portable " + + "streaming cross-language reads."; + } + + @Override + protected SchemaTransform from(ReadConfiguration configuration) { + return new MqttReadSchemaTransform(configuration); + } + + private static class MqttReadSchemaTransform extends SchemaTransform { + private final ReadConfiguration config; + + MqttReadSchemaTransform(ReadConfiguration configuration) { + this.config = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Preconditions.checkState( + input.getAll().isEmpty(), + "Expected zero input PCollections for this source, but found: %", + input.getAll().keySet()); Review Comment:  The Guava `Preconditions.checkState` error message template uses `%` instead of `%s` as the placeholder. This will prevent the key set from being formatted correctly in the error message. Please use `%s` instead. ```suggestion Preconditions.checkState( input.getAll().isEmpty(), "Expected zero input PCollections for this source, but found: %s", input.getAll().keySet()); ``` ########## sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration; +import static org.apache.beam.sdk.io.mqtt.MqttWriteSchemaTransformProvider.WriteConfiguration; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; + +@AutoService(SchemaTransformProvider.class) +public class MqttWriteSchemaTransformProvider + extends TypedSchemaTransformProvider<WriteConfiguration> { + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class WriteConfiguration implements Serializable { + public static Builder builder() { + return new AutoValue_MqttWriteSchemaTransformProvider_WriteConfiguration.Builder(); + } + + @SchemaFieldDescription("Configuration options to set up the MQTT connection.") + public abstract ConnectionConfiguration getConnectionConfiguration(); + + @SchemaFieldDescription( + "Whether or not the publish message should be retained by the messaging engine. " + + "When a subscriber connects, it gets the latest retained message. " + + "Defaults to `False`, which will clear the retained message from the server.") + @Nullable + public abstract Boolean getRetained(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setConnectionConfiguration(ConnectionConfiguration foo); Review Comment:  The parameter name `foo` is generic and non-descriptive. It should be renamed to `connectionConfiguration` to match the property name and improve readability/maintainability. ```suggestion public abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration); ``` ########## sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import org.apache.activemq.broker.BrokerService; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.common.NetworkTestHelper; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqttSchemaTransformProviderTest { + private static final Logger LOG = LoggerFactory.getLogger(MqttSchemaTransformProviderTest.class); + + private BrokerService brokerService; + + private int port; + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Before + public void startBroker() throws Exception { + port = NetworkTestHelper.getAvailableLocalPort(); + LOG.info("Starting ActiveMQ brokerService on {}", port); + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + // use memory persistence for the test: it's faster and don't pollute test folder with KahaDB + brokerService.setPersistent(false); + brokerService.addConnector("mqtt://localhost:" + port); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @Test(timeout = 30 * 1000) + public void testReceiveWithTimeoutAndNoData() { + MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration = + MqttReadSchemaTransformProvider.ReadConfiguration.builder() + .setConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "READ_TOPIC") + .withClientId("READ_PIPELINE")) + .setMaxReadTimeSeconds(2L) + .build(); + + PCollectionRowTuple.empty(pipeline) + .apply(new MqttReadSchemaTransformProvider().from(readConfiguration)); + + // should stop before the test timeout + pipeline.run().waitUntilFinish(); + } + + /** Collects the bytes field of every output row into a shared queue (DirectRunner is in-JVM). */ + private static final ConcurrentLinkedQueue<String> STREAMING_RECEIVED = + new ConcurrentLinkedQueue<>(); + + private static class CollectBytesFn extends DoFn<Row, Void> { + @ProcessElement + public void processElement(@Element Row row) { + byte[] bytes = row.getBytes("bytes"); + if (bytes != null) { + STREAMING_RECEIVED.add(new String(bytes, StandardCharsets.UTF_8)); + } + } + } + + /** + * Reads in streaming mode: when neither {@code maxNumRecords} nor {@code maxReadTimeSeconds} is + * set the SchemaTransform produces an unbounded PCollection. Verifies that messages published + * after the reader subscribes flow through continuously, then cancels the running pipeline. + */ + @Test(timeout = 90 * 1000) + public void testReadUnboundedStreaming() throws Exception { + STREAMING_RECEIVED.clear(); + final String topicName = "STREAM_READ_TOPIC"; + + // No bound -> unbounded (streaming) read. + MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration = + MqttReadSchemaTransformProvider.ReadConfiguration.builder() + .setConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, topicName) + .withClientId("STREAM_READ_PIPELINE")) + .build(); + + // Use a local pipeline so run() does not block (the read never terminates on its own). + PipelineOptions options = + PipelineOptionsFactory.fromArgs("--blockOnRun=false").withoutStrictParsing().create(); + Pipeline p = Pipeline.create(options); + PCollectionRowTuple.empty(p) + .apply(new MqttReadSchemaTransformProvider().from(readConfiguration)) + .get("output") + .apply(ParDo.of(new CollectBytesFn())); + + // Publish a steady stream of messages until the reader has consumed enough. + final boolean[] keepPublishing = {true}; + MQTT client = new MQTT(); + client.setHost("tcp://localhost:" + port); + final BlockingConnection publishConnection = client.blockingConnection(); + publishConnection.connect(); + Thread publisher = + new Thread( + () -> { + int i = 0; + try { + while (keepPublishing[0]) { + publishConnection.publish( + topicName, + ("stream-" + i).getBytes(StandardCharsets.UTF_8), + QoS.AT_LEAST_ONCE, + false); + i++; + Thread.sleep(200); + } + } catch (Exception e) { + // ignore: connection closed on teardown + } + }); + + PipelineResult result = p.run(); + publisher.start(); + + // Wait until the unbounded read delivers a meaningful number of records. + int expected = 10; + long deadline = System.currentTimeMillis() + 60 * 1000; + while (STREAMING_RECEIVED.size() < expected && System.currentTimeMillis() < deadline) { + Thread.sleep(500); + } + + keepPublishing[0] = false; + publisher.join(); + publishConnection.disconnect(); + result.cancel(); + + assertTrue( + "expected at least " + expected + " streamed records, got " + STREAMING_RECEIVED.size(), + STREAMING_RECEIVED.size() >= expected); + for (String received : STREAMING_RECEIVED) { + assertTrue("unexpected payload: " + received, received.startsWith("stream-")); + } + } + + @Test + public void testWrite() throws Exception { + final int numberOfTestMessages = 200; + MQTT client = new MQTT(); + client.setHost("tcp://localhost:" + port); + final BlockingConnection connection = client.blockingConnection(); + connection.connect(); + connection.subscribe(new Topic[] {new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.EXACTLY_ONCE)}); + + final Set<String> messages = new ConcurrentSkipListSet<>(); + + Thread subscriber = + new Thread( + () -> { + try { + for (int i = 0; i < numberOfTestMessages; i++) { + Message message = connection.receive(); + messages.add(new String(message.getPayload(), StandardCharsets.UTF_8)); + message.ack(); + System.out.println( + "message: " + new String(message.getPayload(), StandardCharsets.UTF_8)); Review Comment:  Using `System.out.println` in unit tests is discouraged as it pollutes the build output. Since a logger (`LOG`) is already defined in this class, please use `LOG.info` or `LOG.debug` with SLF4J parameterized formatting instead. ```suggestion LOG.info( "message: {}", new String(message.getPayload(), StandardCharsets.UTF_8)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
