This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ec307a52496 Emit warning when Mqtt waiting for connection for extended 
period of time (#32500)
ec307a52496 is described below

commit ec307a52496087f34249c928ce037aff7f32e20b
Author: Yi Hu <[email protected]>
AuthorDate: Thu Sep 19 10:11:44 2024 -0400

    Emit warning when Mqtt waiting for connection for extended period of time 
(#32500)
    
    * Emit warning when Matt waiting for connection for extended period of time
    
    * address comment; adjust timeout
---
 .../java/org/apache/beam/sdk/io/mqtt/MqttIO.java   | 24 ++++++++++++++++++----
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java    |  7 +++----
 2 files changed, 23 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java 
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 8b7f0991c2d..0e584d564b5 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -30,6 +30,7 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -45,6 +46,7 @@ import org.apache.beam.sdk.values.PDone;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.FutureConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
@@ -431,8 +433,7 @@ public class MqttIO {
         client = spec.connectionConfiguration().createClient();
         LOG.debug("Reader client ID is {}", client.getClientId());
         checkpointMark.clientId = client.getClientId().toString();
-        connection = client.blockingConnection();
-        connection.connect();
+        connection = createConnection(client);
         connection.subscribe(
             new Topic[] {new Topic(spec.connectionConfiguration().getTopic(), 
QoS.AT_LEAST_ONCE)});
         return advance();
@@ -569,8 +570,7 @@ public class MqttIO {
         LOG.debug("Starting MQTT writer");
         client = spec.connectionConfiguration().createClient();
         LOG.debug("MQTT writer client ID is {}", client.getClientId());
-        connection = client.blockingConnection();
-        connection.connect();
+        connection = createConnection(client);
       }
 
       @ProcessElement
@@ -590,4 +590,20 @@ public class MqttIO {
       }
     }
   }
+
+  /** Create a connected MQTT BlockingConnection from given client, aware of 
connection timeout. */
+  static BlockingConnection createConnection(MQTT client) throws Exception {
+    FutureConnection futureConnection = client.futureConnection();
+    org.fusesource.mqtt.client.Future<Void> connecting = 
futureConnection.connect();
+    while (true) {
+      try {
+        connecting.await(1, TimeUnit.MINUTES);
+      } catch (TimeoutException e) {
+        LOG.warn("Connection to {} pending after waiting for 1 minute", 
client.getHost());
+        continue;
+      }
+      break;
+    }
+    return new BlockingConnection(futureConnection);
+  }
 }
diff --git 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 30adad708f8..7d60d6d6578 100644
--- 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -142,8 +142,7 @@ public class MqttIOTest {
     publisherThread.join();
   }
 
-  @Test(timeout = 30 * 1000)
-  @Ignore("https://github.com/apache/beam/issues/19092 Flake Non-deterministic 
output.")
+  @Test(timeout = 40 * 1000)
   public void testRead() throws Exception {
     PCollection<byte[]> output =
         pipeline.apply(
@@ -151,7 +150,7 @@ public class MqttIOTest {
                 .withConnectionConfiguration(
                     MqttIO.ConnectionConfiguration.create("tcp://localhost:" + 
port, "READ_TOPIC")
                         .withClientId("READ_PIPELINE"))
-                .withMaxReadTime(Duration.standardSeconds(3)));
+                .withMaxReadTime(Duration.standardSeconds(5)));
     PAssert.that(output)
         .containsInAnyOrder(
             "This is test 0".getBytes(StandardCharsets.UTF_8),
@@ -180,12 +179,12 @@ public class MqttIOTest {
                         + "messages ...");
                 boolean pipelineConnected = false;
                 while (!pipelineConnected) {
-                  Thread.sleep(1000);
                   for (Connection connection : 
brokerService.getBroker().getClients()) {
                     if 
(connection.getConnectionId().startsWith("READ_PIPELINE")) {
                       pipelineConnected = true;
                     }
                   }
+                  Thread.sleep(1000);
                 }
                 for (int i = 0; i < 10; i++) {
                   publishConnection.publish(

Reply via email to