gemini-code-assist[bot] commented on code in PR #38493:
URL: https://github.com/apache/beam/pull/38493#discussion_r3390702978


##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
+import static 
org.apache.beam.sdk.io.mqtt.MqttReadSchemaTransformProvider.ReadConfiguration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.joda.time.Duration;
+
+@AutoService(SchemaTransformProvider.class)
+public class MqttReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<ReadConfiguration> {
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class ReadConfiguration implements Serializable {
+    public static Builder builder() {
+      return new 
AutoValue_MqttReadSchemaTransformProvider_ReadConfiguration.Builder();
+    }
+
+    @SchemaFieldDescription("Configuration options to set up the MQTT 
connection.")
+    public abstract ConnectionConfiguration getConnectionConfiguration();
+
+    @SchemaFieldDescription(
+        "The max number of records to receive. Setting this will result in a 
bounded PCollection.")
+    @Nullable
+    public abstract Long getMaxNumRecords();
+
+    @SchemaFieldDescription(
+        "The maximum time for this source to read messages. Setting this will 
result in a bounded PCollection.")
+    @Nullable
+    public abstract Long getMaxReadTimeSeconds();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setConnectionConfiguration(
+          ConnectionConfiguration connectionConfiguration);
+
+      public abstract Builder setMaxNumRecords(Long maxNumRecords);
+
+      public abstract Builder setMaxReadTimeSeconds(Long maxReadTimeSeconds);
+
+      public abstract ReadConfiguration build();
+    }
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:mqtt_read:v1";
+  }
+
+  @Override
+  public String description() {
+    return "Reads messages from an MQTT broker and outputs each payload as a 
single `bytes` "
+        + "field.\n"
+        + "\n"
+        + "By default the read is unbounded (streaming): it keeps consuming 
messages from the "
+        + "subscribed topic until the pipeline is stopped. Setting 
`maxNumRecords` and/or "
+        + "`maxReadTimeSeconds` bounds the read, producing a bounded (batch) 
PCollection.\n"
+        + "\n"
+        + "Note: streaming reads require a runner that supports portable 
streaming (e.g. Prism, "
+        + "Flink, or Dataflow). The legacy local Python DirectRunner does not 
execute portable "
+        + "streaming cross-language reads.";
+  }
+
+  @Override
+  protected SchemaTransform from(ReadConfiguration configuration) {
+    return new MqttReadSchemaTransform(configuration);
+  }
+
+  private static class MqttReadSchemaTransform extends SchemaTransform {
+    private final ReadConfiguration config;
+
+    MqttReadSchemaTransform(ReadConfiguration configuration) {
+      this.config = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      Preconditions.checkState(
+          input.getAll().isEmpty(),
+          "Expected zero input PCollections for this source, but found: %",
+          input.getAll().keySet());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The Guava `Preconditions.checkState` error message template uses `%` instead 
of `%s` as the placeholder. This will prevent the key set from being formatted 
correctly in the error message. Please use `%s` instead.
   
   ```suggestion
         Preconditions.checkState(
             input.getAll().isEmpty(),
             "Expected zero input PCollections for this source, but found: %s",
             input.getAll().keySet());
   ```



##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
+import static 
org.apache.beam.sdk.io.mqtt.MqttWriteSchemaTransformProvider.WriteConfiguration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+@AutoService(SchemaTransformProvider.class)
+public class MqttWriteSchemaTransformProvider
+    extends TypedSchemaTransformProvider<WriteConfiguration> {
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class WriteConfiguration implements Serializable {
+    public static Builder builder() {
+      return new 
AutoValue_MqttWriteSchemaTransformProvider_WriteConfiguration.Builder();
+    }
+
+    @SchemaFieldDescription("Configuration options to set up the MQTT 
connection.")
+    public abstract ConnectionConfiguration getConnectionConfiguration();
+
+    @SchemaFieldDescription(
+        "Whether or not the publish message should be retained by the 
messaging engine. "
+            + "When a subscriber connects, it gets the latest retained 
message. "
+            + "Defaults to `False`, which will clear the retained message from 
the server.")
+    @Nullable
+    public abstract Boolean getRetained();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder 
setConnectionConfiguration(ConnectionConfiguration foo);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The parameter name `foo` is generic and non-descriptive. It should be 
renamed to `connectionConfiguration` to match the property name and improve 
readability/maintainability.
   
   ```suggestion
         public abstract Builder 
setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
   ```



##########
sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.common.NetworkTestHelper;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttSchemaTransformProviderTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MqttSchemaTransformProviderTest.class);
+
+  private BrokerService brokerService;
+
+  private int port;
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void startBroker() throws Exception {
+    port = NetworkTestHelper.getAvailableLocalPort();
+    LOG.info("Starting ActiveMQ brokerService on {}", port);
+    brokerService = new BrokerService();
+    brokerService.setDeleteAllMessagesOnStartup(true);
+    // use memory persistence for the test: it's faster and don't pollute test 
folder with KahaDB
+    brokerService.setPersistent(false);
+    brokerService.addConnector("mqtt://localhost:" + port);
+    brokerService.start();
+    brokerService.waitUntilStarted();
+  }
+
+  @Test(timeout = 30 * 1000)
+  public void testReceiveWithTimeoutAndNoData() {
+    MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration =
+        MqttReadSchemaTransformProvider.ReadConfiguration.builder()
+            .setConnectionConfiguration(
+                MqttIO.ConnectionConfiguration.create("tcp://localhost:" + 
port, "READ_TOPIC")
+                    .withClientId("READ_PIPELINE"))
+            .setMaxReadTimeSeconds(2L)
+            .build();
+
+    PCollectionRowTuple.empty(pipeline)
+        .apply(new MqttReadSchemaTransformProvider().from(readConfiguration));
+
+    // should stop before the test timeout
+    pipeline.run().waitUntilFinish();
+  }
+
+  /** Collects the bytes field of every output row into a shared queue 
(DirectRunner is in-JVM). */
+  private static final ConcurrentLinkedQueue<String> STREAMING_RECEIVED =
+      new ConcurrentLinkedQueue<>();
+
+  private static class CollectBytesFn extends DoFn<Row, Void> {
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      byte[] bytes = row.getBytes("bytes");
+      if (bytes != null) {
+        STREAMING_RECEIVED.add(new String(bytes, StandardCharsets.UTF_8));
+      }
+    }
+  }
+
+  /**
+   * Reads in streaming mode: when neither {@code maxNumRecords} nor {@code 
maxReadTimeSeconds} is
+   * set the SchemaTransform produces an unbounded PCollection. Verifies that 
messages published
+   * after the reader subscribes flow through continuously, then cancels the 
running pipeline.
+   */
+  @Test(timeout = 90 * 1000)
+  public void testReadUnboundedStreaming() throws Exception {
+    STREAMING_RECEIVED.clear();
+    final String topicName = "STREAM_READ_TOPIC";
+
+    // No bound -> unbounded (streaming) read.
+    MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration =
+        MqttReadSchemaTransformProvider.ReadConfiguration.builder()
+            .setConnectionConfiguration(
+                MqttIO.ConnectionConfiguration.create("tcp://localhost:" + 
port, topicName)
+                    .withClientId("STREAM_READ_PIPELINE"))
+            .build();
+
+    // Use a local pipeline so run() does not block (the read never terminates 
on its own).
+    PipelineOptions options =
+        
PipelineOptionsFactory.fromArgs("--blockOnRun=false").withoutStrictParsing().create();
+    Pipeline p = Pipeline.create(options);
+    PCollectionRowTuple.empty(p)
+        .apply(new MqttReadSchemaTransformProvider().from(readConfiguration))
+        .get("output")
+        .apply(ParDo.of(new CollectBytesFn()));
+
+    // Publish a steady stream of messages until the reader has consumed 
enough.
+    final boolean[] keepPublishing = {true};
+    MQTT client = new MQTT();
+    client.setHost("tcp://localhost:" + port);
+    final BlockingConnection publishConnection = client.blockingConnection();
+    publishConnection.connect();
+    Thread publisher =
+        new Thread(
+            () -> {
+              int i = 0;
+              try {
+                while (keepPublishing[0]) {
+                  publishConnection.publish(
+                      topicName,
+                      ("stream-" + i).getBytes(StandardCharsets.UTF_8),
+                      QoS.AT_LEAST_ONCE,
+                      false);
+                  i++;
+                  Thread.sleep(200);
+                }
+              } catch (Exception e) {
+                // ignore: connection closed on teardown
+              }
+            });
+
+    PipelineResult result = p.run();
+    publisher.start();
+
+    // Wait until the unbounded read delivers a meaningful number of records.
+    int expected = 10;
+    long deadline = System.currentTimeMillis() + 60 * 1000;
+    while (STREAMING_RECEIVED.size() < expected && System.currentTimeMillis() 
< deadline) {
+      Thread.sleep(500);
+    }
+
+    keepPublishing[0] = false;
+    publisher.join();
+    publishConnection.disconnect();
+    result.cancel();
+
+    assertTrue(
+        "expected at least " + expected + " streamed records, got " + 
STREAMING_RECEIVED.size(),
+        STREAMING_RECEIVED.size() >= expected);
+    for (String received : STREAMING_RECEIVED) {
+      assertTrue("unexpected payload: " + received, 
received.startsWith("stream-"));
+    }
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+    final int numberOfTestMessages = 200;
+    MQTT client = new MQTT();
+    client.setHost("tcp://localhost:" + port);
+    final BlockingConnection connection = client.blockingConnection();
+    connection.connect();
+    connection.subscribe(new Topic[] {new Topic(Buffer.utf8("WRITE_TOPIC"), 
QoS.EXACTLY_ONCE)});
+
+    final Set<String> messages = new ConcurrentSkipListSet<>();
+
+    Thread subscriber =
+        new Thread(
+            () -> {
+              try {
+                for (int i = 0; i < numberOfTestMessages; i++) {
+                  Message message = connection.receive();
+                  messages.add(new String(message.getPayload(), 
StandardCharsets.UTF_8));
+                  message.ack();
+                  System.out.println(
+                      "message: " + new String(message.getPayload(), 
StandardCharsets.UTF_8));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `System.out.println` in unit tests is discouraged as it pollutes the 
build output. Since a logger (`LOG`) is already defined in this class, please 
use `LOG.info` or `LOG.debug` with SLF4J parameterized formatting instead.
   
   ```suggestion
                     LOG.info(
                         "message: {}", new String(message.getPayload(), 
StandardCharsets.UTF_8));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to