shunping opened a new pull request, #39159: URL: https://github.com/apache/beam/pull/39159
Wraps the subprocess startup and connection sequence in a retry loop with a limit of 3 attempts. This mitigates "address already in use" errors caused by race conditions during dynamic port allocation. An example failed test: https://github.com/apache/beam/actions/runs/28321549831/job/83904243793 Traceback: ``` ________________ CrossLanguageKinesisIOTest.test_kinesis_write _________________ self = <apache_beam.io.external.xlang_kinesisio_it_test.CrossLanguageKinesisIOTest testMethod=test_kinesis_write> @unittest.skipIf( TestPipeline().get_option('aws_kinesis_stream'), 'Do not test on localstack when pipeline options were provided') def test_kinesis_write(self): # TODO: remove this test once # https://github.com/apache/beam/issues/20416 is resolved > self.run_kinesis_write() apache_beam/io/external/xlang_kinesisio_it_test.py:97: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/io/external/xlang_kinesisio_it_test.py:105: in run_kinesis_write with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p: apache_beam/pipeline.py:652: in __exit__ self.result = self.run() ^^^^^^^^^^ apache_beam/testing/test_pipeline.py:119: in run result = super().run( apache_beam/pipeline.py:563: in run return self._run_internal(test_runner_api) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ apache_beam/pipeline.py:629: in _run_internal return self.runner.run_pipeline(self, self._options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ apache_beam/runners/runner.py:180: in run_pipeline return self.run_portable_pipeline( apache_beam/runners/portability/portable_runner.py:375: in run_portable_pipeline job_service_handle = self.create_job_service(options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ apache_beam/runners/portability/portable_runner.py:289: in create_job_service return self.create_job_service_handle(server.start(), options) ^^^^^^^^^^^^^^ apache_beam/runners/portability/job_server.py:88: in start self._endpoint = self._job_server.start() ^^^^^^^^^^^^^^^^^^^^^^^^ apache_beam/runners/portability/job_server.py:131: in start return self._server.start() ^^^^^^^^^^^^^^^^^^^^ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.utils.subprocess_server.SubprocessServer object at 0x7881fc5f21d0> def start(self): try: INFO FlinkJarJobServer:subprocess_server.py:266 at org.apache.beam.runners.flink.FlinkJobServerDriver.main(FlinkJobServerDriver.java:77) INFO FlinkJarJobServer:subprocess_server.py:266 Caused by: org.apache.beam.vendor.grpc.v1p69p0.io.netty.channel.unix.Errors$NativeIoException: bind(..) failed: Address already in use INFO FlinkJarJobServer:subprocess_server.py:266 [main] INFO org.apache.beam.runners.jobsubmission.JobServerDriver - ArtifactStagingServer stopped on localhost:36825 INFO FlinkJarJobServer:subprocess_server.py:266 [main] INFO org.apache.beam.runners.jobsubmission.JobServerDriver - Expansion stopped on localhost:44161 ERROR apache_beam.utils.subprocess_server:subprocess_server.py:226 Failed to start job service with ('java', '-jar', '-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider', '-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error', '-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error', '-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.flink.FlinkPipelineRunner=warn', '-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.fnexecution.control=warn', '/runner/_work/beam/beam/runners/flink/1.20/job-server/build/libs/beam-runners-flink-1.20-job-server-2.76.0-SNAPSHOT.jar', '--flink-master', '[auto]', '--artifacts-dir', '/tmp/beam-temp_3pmqphj/artifactsixp2vgsm', '--job-port', '36825', '--artifact-port', '0', '--expansion-port', '0') ERROR apache_beam.utils.subprocess_server:subprocess_server.py:240 Error bringing up service Traceback (most recent call last): File "/runner/_work/beam/beam/sdks/python/apache_beam/utils/subprocess_server.py", line 227, in start raise RuntimeError( RuntimeError: Service failed to start up with error 0 WARNING apache_beam.utils.subprocess_server:subprocess_server.py:311 Really destroying service at localhost:36825 with cmd: ('java', '-jar', '-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider', '-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error', '-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error', '-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.flink.FlinkPipelineRunner=warn', '-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.fnexecution.control=warn', '/runner/_work/beam/beam/runners/flink/1.20/job-server/build/libs/beam-runners-flink-1.20-job-server-2.76.0-SNAPSHOT.jar', '--flink-master', '[auto]', '--artifacts-dir', '/tmp/beam-temp_3pmqphj/artifactsixp2vgsm', '--job-port', '36825', '--artifact-port', '0', '--expansion-port', '0') Really destroying service at localhost:52571 with cmd: ('java', '-jar', '-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider', '-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error', '-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error', '-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.flink.FlinkPipelineRunner=warn', '-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.fnexecution.control=warn', '/runner/_work/beam/beam/runners/flink/1.20/job-server/build/libs/beam-runners-flink-1.20-job-server-2.76.0-SNAPSHOT.jar', '--flink-master', '[auto]', '--artifacts-dir', '/tmp/beam-tempjp99srkd/artifactse8apq8m9', '--job-port', '52571', '--artifact-port', '0', '--expansion-port', '0') WARNING apache_beam.utils.subprocess_server:subprocess_server.py:311 Really destroying service at localhost:36579 with cmd: ['/opt/hostedtoolcache/Java_Temurin-Hotspot_jdk/21.0.11-10.0.LTS/x64/bin/java', '-jar', '/runner/_work/beam/beam/sdks/java/io/amazon-web-services2/expansion-service/build/libs/beam-sdks-java-io-amazon-web-services2-expansion-service-2.76.0-SNAPSHOT.jar', '36579', '--filesToStage=/runner/_work/beam/beam/sdks/java/io/amazon-web-services2/expansion-service/build/libs/beam-sdks-java-io-amazon-web-services2-expansion-service-2.76.0-SNAPSHOT.jar', '--alsoStartLoopbackWorker'] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
