shunping opened a new pull request, #39159:
URL: https://github.com/apache/beam/pull/39159

   Wraps the subprocess startup and connection sequence in a retry loop with a 
limit of 3 attempts. This mitigates "address already in use" errors caused by 
race conditions during dynamic port allocation.
   
   An example failed test:
   https://github.com/apache/beam/actions/runs/28321549831/job/83904243793
   
   Traceback:
   ```
   ________________ CrossLanguageKinesisIOTest.test_kinesis_write 
_________________
   
   self = 
<apache_beam.io.external.xlang_kinesisio_it_test.CrossLanguageKinesisIOTest 
testMethod=test_kinesis_write>
   
       @unittest.skipIf(
           TestPipeline().get_option('aws_kinesis_stream'),
           'Do not test on localstack when pipeline options were provided')
       def test_kinesis_write(self):
         # TODO: remove this test once
         # https://github.com/apache/beam/issues/20416 is resolved
   >     self.run_kinesis_write()
   
   apache_beam/io/external/xlang_kinesisio_it_test.py:97: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   apache_beam/io/external/xlang_kinesisio_it_test.py:105: in run_kinesis_write
       with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p:
   apache_beam/pipeline.py:652: in __exit__
       self.result = self.run()
                     ^^^^^^^^^^
   apache_beam/testing/test_pipeline.py:119: in run
       result = super().run(
   apache_beam/pipeline.py:563: in run
       return self._run_internal(test_runner_api)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   apache_beam/pipeline.py:629: in _run_internal
       return self.runner.run_pipeline(self, self._options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   apache_beam/runners/runner.py:180: in run_pipeline
       return self.run_portable_pipeline(
   apache_beam/runners/portability/portable_runner.py:375: in 
run_portable_pipeline
       job_service_handle = self.create_job_service(options)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   apache_beam/runners/portability/portable_runner.py:289: in create_job_service
       return self.create_job_service_handle(server.start(), options)
                                             ^^^^^^^^^^^^^^
   apache_beam/runners/portability/job_server.py:88: in start
       self._endpoint = self._job_server.start()
                        ^^^^^^^^^^^^^^^^^^^^^^^^
   apache_beam/runners/portability/job_server.py:131: in start
       return self._server.start()
              ^^^^^^^^^^^^^^^^^^^^
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <apache_beam.utils.subprocess_server.SubprocessServer object at 
0x7881fc5f21d0>
   
       def start(self):
         try:
   INFO     FlinkJarJobServer:subprocess_server.py:266  at 
org.apache.beam.runners.flink.FlinkJobServerDriver.main(FlinkJobServerDriver.java:77)
   INFO     FlinkJarJobServer:subprocess_server.py:266 Caused by: 
org.apache.beam.vendor.grpc.v1p69p0.io.netty.channel.unix.Errors$NativeIoException:
 bind(..) failed: Address already in use
   INFO     FlinkJarJobServer:subprocess_server.py:266 [main] INFO 
org.apache.beam.runners.jobsubmission.JobServerDriver - ArtifactStagingServer 
stopped on localhost:36825
   INFO     FlinkJarJobServer:subprocess_server.py:266 [main] INFO 
org.apache.beam.runners.jobsubmission.JobServerDriver - Expansion stopped on 
localhost:44161
   ERROR    apache_beam.utils.subprocess_server:subprocess_server.py:226 Failed 
to start job service with ('java', '-jar', 
'-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider', 
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error', 
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error', 
'-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.flink.FlinkPipelineRunner=warn',
 
'-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.fnexecution.control=warn',
 
'/runner/_work/beam/beam/runners/flink/1.20/job-server/build/libs/beam-runners-flink-1.20-job-server-2.76.0-SNAPSHOT.jar',
 '--flink-master', '[auto]', '--artifacts-dir', 
'/tmp/beam-temp_3pmqphj/artifactsixp2vgsm', '--job-port', '36825', 
'--artifact-port', '0', '--expansion-port', '0')
   ERROR    apache_beam.utils.subprocess_server:subprocess_server.py:240 Error 
bringing up service
   Traceback (most recent call last):
     File 
"/runner/_work/beam/beam/sdks/python/apache_beam/utils/subprocess_server.py", 
line 227, in start
       raise RuntimeError(
   RuntimeError: Service failed to start up with error 0
   WARNING  apache_beam.utils.subprocess_server:subprocess_server.py:311 Really 
destroying service at localhost:36825 with cmd: ('java', '-jar', 
'-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider', 
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error', 
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error', 
'-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.flink.FlinkPipelineRunner=warn',
 
'-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.fnexecution.control=warn',
 
'/runner/_work/beam/beam/runners/flink/1.20/job-server/build/libs/beam-runners-flink-1.20-job-server-2.76.0-SNAPSHOT.jar',
 '--flink-master', '[auto]', '--artifacts-dir', 
'/tmp/beam-temp_3pmqphj/artifactsixp2vgsm', '--job-port', '36825', 
'--artifact-port', '0', '--expansion-port', '0')
   Really destroying service at localhost:52571 with cmd: ('java', '-jar', 
'-Dslf4j.provider=org.slf4j.simple.SimpleServiceProvider', 
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.streaming=error', 
'-Dorg.slf4j.simpleLogger.log.org.apache.flink.runtime=error', 
'-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.flink.FlinkPipelineRunner=warn',
 
'-Dorg.slf4j.simpleLogger.log.org.apache.beam.runners.fnexecution.control=warn',
 
'/runner/_work/beam/beam/runners/flink/1.20/job-server/build/libs/beam-runners-flink-1.20-job-server-2.76.0-SNAPSHOT.jar',
 '--flink-master', '[auto]', '--artifacts-dir', 
'/tmp/beam-tempjp99srkd/artifactse8apq8m9', '--job-port', '52571', 
'--artifact-port', '0', '--expansion-port', '0')
   WARNING  apache_beam.utils.subprocess_server:subprocess_server.py:311 Really 
destroying service at localhost:36579 with cmd: 
['/opt/hostedtoolcache/Java_Temurin-Hotspot_jdk/21.0.11-10.0.LTS/x64/bin/java', 
'-jar', 
'/runner/_work/beam/beam/sdks/java/io/amazon-web-services2/expansion-service/build/libs/beam-sdks-java-io-amazon-web-services2-expansion-service-2.76.0-SNAPSHOT.jar',
 '36579', 
'--filesToStage=/runner/_work/beam/beam/sdks/java/io/amazon-web-services2/expansion-service/build/libs/beam-sdks-java-io-amazon-web-services2-expansion-service-2.76.0-SNAPSHOT.jar',
 '--alsoStartLoopbackWorker']
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to