This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ec307a52496 Emit warning when Mqtt waiting for connection for extended
period of time (#32500)
ec307a52496 is described below
commit ec307a52496087f34249c928ce037aff7f32e20b
Author: Yi Hu <[email protected]>
AuthorDate: Thu Sep 19 10:11:44 2024 -0400
Emit warning when Mqtt waiting for connection for extended period of time
(#32500)
* Emit warning when Matt waiting for connection for extended period of time
* address comment; adjust timeout
---
.../java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 24 ++++++++++++++++++----
.../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 7 +++----
2 files changed, 23 insertions(+), 8 deletions(-)
diff --git
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 8b7f0991c2d..0e584d564b5 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -30,6 +30,7 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
@@ -45,6 +46,7 @@ import org.apache.beam.sdk.values.PDone;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
@@ -431,8 +433,7 @@ public class MqttIO {
client = spec.connectionConfiguration().createClient();
LOG.debug("Reader client ID is {}", client.getClientId());
checkpointMark.clientId = client.getClientId().toString();
- connection = client.blockingConnection();
- connection.connect();
+ connection = createConnection(client);
connection.subscribe(
new Topic[] {new Topic(spec.connectionConfiguration().getTopic(),
QoS.AT_LEAST_ONCE)});
return advance();
@@ -569,8 +570,7 @@ public class MqttIO {
LOG.debug("Starting MQTT writer");
client = spec.connectionConfiguration().createClient();
LOG.debug("MQTT writer client ID is {}", client.getClientId());
- connection = client.blockingConnection();
- connection.connect();
+ connection = createConnection(client);
}
@ProcessElement
@@ -590,4 +590,20 @@ public class MqttIO {
}
}
}
+
+ /** Create a connected MQTT BlockingConnection from given client, aware of
connection timeout. */
+ static BlockingConnection createConnection(MQTT client) throws Exception {
+ FutureConnection futureConnection = client.futureConnection();
+ org.fusesource.mqtt.client.Future<Void> connecting =
futureConnection.connect();
+ while (true) {
+ try {
+ connecting.await(1, TimeUnit.MINUTES);
+ } catch (TimeoutException e) {
+ LOG.warn("Connection to {} pending after waiting for 1 minute",
client.getHost());
+ continue;
+ }
+ break;
+ }
+ return new BlockingConnection(futureConnection);
+ }
}
diff --git
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 30adad708f8..7d60d6d6578 100644
---
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -142,8 +142,7 @@ public class MqttIOTest {
publisherThread.join();
}
- @Test(timeout = 30 * 1000)
- @Ignore("https://github.com/apache/beam/issues/19092 Flake Non-deterministic
output.")
+ @Test(timeout = 40 * 1000)
public void testRead() throws Exception {
PCollection<byte[]> output =
pipeline.apply(
@@ -151,7 +150,7 @@ public class MqttIOTest {
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create("tcp://localhost:" +
port, "READ_TOPIC")
.withClientId("READ_PIPELINE"))
- .withMaxReadTime(Duration.standardSeconds(3)));
+ .withMaxReadTime(Duration.standardSeconds(5)));
PAssert.that(output)
.containsInAnyOrder(
"This is test 0".getBytes(StandardCharsets.UTF_8),
@@ -180,12 +179,12 @@ public class MqttIOTest {
+ "messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
- Thread.sleep(1000);
for (Connection connection :
brokerService.getBroker().getClients()) {
if
(connection.getConnectionId().startsWith("READ_PIPELINE")) {
pipelineConnected = true;
}
}
+ Thread.sleep(1000);
}
for (int i = 0; i < 10; i++) {
publishConnection.publish(