tkaymak commented on code in PR #38966:
URL: https://github.com/apache/beam/pull/38966#discussion_r3415613560


##########
sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py:
##########
@@ -0,0 +1,293 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Integration tests for the cross-language MQTT IO transforms
+(ReadFromMqtt / WriteToMqtt), served by the messaging expansion service.
+
+Runs against an MQTT broker (Eclipse Mosquitto) started via testcontainers.
+The DirectRunner tests use reads bounded with max_num_records; unbounded
+(streaming) reads require a portable streaming runner (see the
+MqttReadSchemaTransformProvider description) and are exercised by the
+Prism runner test below.
+"""
+
+import logging
+import threading
+import time
+import unittest
+
+import pytest
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.testing.util import assert_that
+from apache_beam.typehints.row_type import RowTypeConstraint
+
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  from apache_beam.io import ReadFromMqtt
+  from apache_beam.io import WriteToMqtt
+except ImportError:
+  ReadFromMqtt = None
+  WriteToMqtt = None
+
+try:
+  from testcontainers.core.container import DockerContainer
+  from testcontainers.core.waiting_utils import wait_for_logs
+except ImportError:
+  DockerContainer = None
+
+NUM_RECORDS = 3
+BYTES_ROW = RowTypeConstraint.from_fields([('bytes', bytes)])
+
+
+def _payload_count_and_prefix_matcher(expected_count, expected_prefix):
+  """Matches a bounded read of a continuous publisher: exactly
+  expected_count payloads, each starting with expected_prefix (the absolute
+  sequence numbers depend on when the reader subscribed)."""
+  def _matcher(actual):
+    actual = list(actual)
+    if len(actual) != expected_count:
+      raise BeamAssertException(
+          'Expected %d payloads, got %d: %s' %
+          (expected_count, len(actual), actual))
+    for payload in actual:
+      if not payload.startswith(expected_prefix):
+        raise BeamAssertException('Unexpected payload: %s' % payload)
+
+  return _matcher
+
+
[email protected]_messaging_java_expansion_service
[email protected](
+    DockerContainer is None, 'testcontainers package is not installed')
[email protected](
+    ReadFromMqtt is None or WriteToMqtt is None,
+    'MQTT cross-language wrappers are not generated')
[email protected](
+    TestPipeline().get_pipeline_options().view_as(StandardOptions).runner
+    is None,
+    'Do not run this test on precommit suites.')
[email protected](
+    'Dataflow' in (
+        TestPipeline().get_pipeline_options().view_as(StandardOptions).runner 
or
+        ''),
+    'The testcontainers broker is not reachable from Dataflow workers; '
+    'a Dataflow variant would need a remotely hosted MQTT broker.')
+class CrossLanguageMqttIOTest(unittest.TestCase):
+  def setUp(self):
+    self.start_mqtt_container(retries=3)
+    host = self.broker.get_container_host_ip()
+    port = self.broker.get_exposed_port(1883)
+    self.server_uri = 'tcp://%s:%s' % (host, port)
+
+  def tearDown(self):
+    # Sometimes stopping the container raises ReadTimeout. We can ignore it
+    # here to avoid the test failure.
+    try:
+      self.broker.stop()
+    except:  # pylint: disable=bare-except

Review Comment:
   Done in `8387d94710b` — changed to `except Exception:`.



##########
sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py:
##########
@@ -0,0 +1,293 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Integration tests for the cross-language MQTT IO transforms
+(ReadFromMqtt / WriteToMqtt), served by the messaging expansion service.
+
+Runs against an MQTT broker (Eclipse Mosquitto) started via testcontainers.
+The DirectRunner tests use reads bounded with max_num_records; unbounded
+(streaming) reads require a portable streaming runner (see the
+MqttReadSchemaTransformProvider description) and are exercised by the
+Prism runner test below.
+"""
+
+import logging
+import threading
+import time
+import unittest
+
+import pytest
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.testing.util import assert_that
+from apache_beam.typehints.row_type import RowTypeConstraint
+
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  from apache_beam.io import ReadFromMqtt
+  from apache_beam.io import WriteToMqtt
+except ImportError:
+  ReadFromMqtt = None
+  WriteToMqtt = None
+
+try:
+  from testcontainers.core.container import DockerContainer
+  from testcontainers.core.waiting_utils import wait_for_logs
+except ImportError:
+  DockerContainer = None
+
+NUM_RECORDS = 3
+BYTES_ROW = RowTypeConstraint.from_fields([('bytes', bytes)])
+
+
+def _payload_count_and_prefix_matcher(expected_count, expected_prefix):
+  """Matches a bounded read of a continuous publisher: exactly
+  expected_count payloads, each starting with expected_prefix (the absolute
+  sequence numbers depend on when the reader subscribed)."""
+  def _matcher(actual):
+    actual = list(actual)
+    if len(actual) != expected_count:
+      raise BeamAssertException(
+          'Expected %d payloads, got %d: %s' %
+          (expected_count, len(actual), actual))
+    for payload in actual:
+      if not payload.startswith(expected_prefix):
+        raise BeamAssertException('Unexpected payload: %s' % payload)
+
+  return _matcher
+
+
[email protected]_messaging_java_expansion_service
[email protected](
+    DockerContainer is None, 'testcontainers package is not installed')
[email protected](
+    ReadFromMqtt is None or WriteToMqtt is None,
+    'MQTT cross-language wrappers are not generated')
[email protected](
+    TestPipeline().get_pipeline_options().view_as(StandardOptions).runner
+    is None,
+    'Do not run this test on precommit suites.')
[email protected](
+    'Dataflow' in (
+        TestPipeline().get_pipeline_options().view_as(StandardOptions).runner 
or
+        ''),
+    'The testcontainers broker is not reachable from Dataflow workers; '
+    'a Dataflow variant would need a remotely hosted MQTT broker.')
+class CrossLanguageMqttIOTest(unittest.TestCase):
+  def setUp(self):
+    self.start_mqtt_container(retries=3)
+    host = self.broker.get_container_host_ip()
+    port = self.broker.get_exposed_port(1883)
+    self.server_uri = 'tcp://%s:%s' % (host, port)
+
+  def tearDown(self):
+    # Sometimes stopping the container raises ReadTimeout. We can ignore it
+    # here to avoid the test failure.
+    try:
+      self.broker.stop()
+    except:  # pylint: disable=bare-except
+      logging.error('Could not stop the MQTT broker container.')
+
+  # Creating a container with testcontainers sometimes raises ReadTimeout
+  # error, so retry a couple of times.
+  def start_mqtt_container(self, retries):
+    for i in range(retries):
+      try:
+        # /mosquitto-no-auth.conf ships with the image and enables an
+        # anonymous listener on port 1883.
+        self.broker = DockerContainer('eclipse-mosquitto:2').with_command(
+            'mosquitto -c /mosquitto-no-auth.conf').with_exposed_ports(1883)
+        self.broker.start()
+        wait_for_logs(self.broker, 'mosquitto version .* running', timeout=30)
+        break
+      except Exception as e:
+        if i == retries - 1:
+          logging.error('Unable to initialize the MQTT broker container.')
+          raise e

Review Comment:
   Good catch — fixed in `8387d94710b`. The retry handler now stops the 
partially started container before retrying/raising (wrapped in its own 
try/except so it's a no-op if the container was never started).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to