mxm commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463133822



##########
File path: sdks/python/apache_beam/runners/portability/spark_runner_test.py
##########
@@ -21,141 +21,158 @@
 
 import argparse
 import logging
-import sys
 import unittest
 from shutil import rmtree
 from tempfile import mkdtemp
 
-from apache_beam.options.pipeline_options import DebugOptions
+import pytest
+
 from apache_beam.options.pipeline_options import PortableOptions
 from apache_beam.runners.portability import job_server
 from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability import portable_runner_test
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.spark_runner_test \
-  #     --spark_job_server_jar=/path/to/job_server.jar \
-  #     [SparkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-      '--spark_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-      '--environment_type',
-      default='loopback',
-      help='Environment type. docker, process, or loopback')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-      '--environment_cache_millis',
-      help='Environment cache TTL in milliseconds.')
-  parser.add_argument(
-      '--extra_experiments',
-      default=[],
-      action='append',
-      help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  spark_job_server_jar = (
-      known_args.spark_job_server_jar or
-      job_server.JavaJarJobServer.path_to_beam_jar(
-          'runners:spark:job-server:shadowJar'))
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-      known_args.environment_config if known_args.environment_config else None)
-  environment_cache_millis = known_args.environment_cache_millis
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class SparkRunnerTest(portable_runner_test.PortableRunnerTest):
-    _use_grpc = True
-    _use_subprocesses = True
-
-    @classmethod
-    def _subprocess_command(cls, job_port, expansion_port):
-      # will be cleaned up at the end of this method, and recreated and used by
-      # the job server
-      tmp_dir = mkdtemp(prefix='sparktest')
-
-      try:
-        return [
-            'java',
-            '-Dbeam.spark.test.reuseSparkContext=true',
-            '-jar',
-            spark_job_server_jar,
-            '--spark-master-url',
-            'local',
-            '--artifacts-dir',
-            tmp_dir,
-            '--job-port',
-            str(job_port),
-            '--artifact-port',
-            '0',
-            '--expansion-port',
-            str(expansion_port),
-        ]
-      finally:
-        rmtree(tmp_dir)
-
-    @classmethod
-    def get_runner(cls):
-      return portable_runner.PortableRunner()
-
-    def create_options(self):
-      options = super(SparkRunnerTest, self).create_options()
-      options.view_as(
-          DebugOptions).experiments = ['beam_fn_api'] + extra_experiments
-      portable_options = options.view_as(PortableOptions)
-      portable_options.environment_type = environment_type.upper()
-      if environment_config:
-        portable_options.environment_config = environment_config
-      if environment_cache_millis:
-        portable_options.environment_cache_millis = environment_cache_millis
-
-      return options
-
-    def test_metrics(self):
-      # Skip until Spark runner supports metrics.
-      raise unittest.SkipTest("BEAM-7219")
-
-    def test_sdf(self):
-      # Skip until Spark runner supports SDF.
-      raise unittest.SkipTest("BEAM-7222")
-
-    def test_sdf_with_watermark_tracking(self):
-      # Skip until Spark runner supports SDF.
-      raise unittest.SkipTest("BEAM-7222")
-
-    def test_sdf_with_sdf_initiated_checkpointing(self):
-      # Skip until Spark runner supports SDF.
-      raise unittest.SkipTest("BEAM-7222")
-
-    def test_sdf_synthetic_source(self):
-      # Skip until Spark runner supports SDF.
-      raise unittest.SkipTest("BEAM-7222")
-
-    def test_external_transforms(self):
-      # Skip until Spark runner supports external transforms.
-      raise unittest.SkipTest("BEAM-7232")
-
-    def test_callbacks_with_exception(self):
-      # Skip until Spark runner supports bundle finalization.
-      raise unittest.SkipTest("BEAM-7233")
-
-    def test_register_finalizations(self):
-      # Skip until Spark runner supports bundle finalization.
-      raise unittest.SkipTest("BEAM-7233")
-
-    def test_flattened_side_input(self):
-      # Blocked on support for transcoding
-      # https://jira.apache.org/jira/browse/BEAM-7236
-      super(SparkRunnerTest,
-            self).test_flattened_side_input(with_transcoding=False)
-
-    # Inherits all other tests from PortableRunnerTest.
+# Run as
+#
+# pytest spark_runner_test.py \
+#     [--test_pipeline_options "--spark_job_server_jar=/path/to/job_server.jar 
\
+#                               --environment_type=DOCKER"] \
+#     [SparkRunnerTest.test_method, ...]
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class SparkRunnerTest(portable_runner_test.PortableRunnerTest):
+  _use_grpc = True
+  _use_subprocesses = True
+
+  expansion_port = None
+  spark_job_server_jar = None
+
+  @pytest.fixture(autouse=True)
+  def parse_options(self, request):
+    test_pipeline_options = (
+        request.config.option.test_pipeline_options
+        if request.config.option.test_pipeline_options else '')
+    parser = argparse.ArgumentParser(add_help=True)
+    parser.add_argument(
+        '--spark_job_server_jar',
+        help='Job server jar to submit jobs.',
+        action='store')
+    parser.add_argument(
+        '--environment_type',
+        default='LOOPBACK',
+        choices=['DOCKER', 'PROCESS', 'LOOPBACK'],
+        help='Set the environment type for running user code. DOCKER runs '
+        'user code in a container. PROCESS runs user code in '
+        'automatically started processes. LOOPBACK runs user code on '
+        'the same process that originally submitted the job.')
+    parser.add_argument(
+        '--environment_config',
+        help='Set environment configuration for running the user code.\n For '
+        'DOCKER: Url for the docker image.\n For PROCESS: json of the '
+        'form {"os": "<OS>", "arch": "<ARCHITECTURE>", "command": '
+        '"<process to execute>", "env":{"<Environment variables 1>": '
+        '"<ENV_VAL>"} }. All fields in the json are optional except '
+        'command.')
+    known_args, unknown_args = parser.parse_known_args(
+        test_pipeline_options.split())

Review comment:
       Same here, what about whitespace in the values?

##########
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##########
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+#     [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#                               --environment_type=DOCKER"] \
+#     [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  #     --flink_job_server_jar=/path/to/job_server.jar \
-  #     --environment_type=docker \
-  #     --extra_experiments=beam_experiments \
-  #     [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-      '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-      '--streaming',
-      default=False,
-      action='store_true',
-      help='Job type. batch or streaming')
-  parser.add_argument(
-      '--environment_type',
-      default='loopback',
-      help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-      '--extra_experiments',
-      default=[],
-      action='append',
-      help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-      known_args.flink_job_server_jar or
-      job_server.JavaJarJobServer.path_to_beam_jar(
-          'runners:flink:%s:job-server:shadowJar' %
-          FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-      known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-    _use_grpc = True
-    _use_subprocesses = True
-
-    conf_dir = None
-    expansion_port = None
-
-    @classmethod
-    def tearDownClass(cls):
-      if cls.conf_dir and exists(cls.conf_dir):
-        _LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-        rmtree(cls.conf_dir)
-      super(FlinkRunnerTest, cls).tearDownClass()
-
-    @classmethod
-    def _create_conf_dir(cls):
-      """Create (and save a static reference to) a "conf dir", used to provide
-       metrics configs and verify metrics output
-
-       It gets cleaned up when the suite is done executing"""
-
-      if hasattr(cls, 'conf_dir'):
-        cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-        # path for a FileReporter to write metrics to
-        cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-        # path to write Flink configuration to
-        conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-        file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-        with open(conf_path, 'w') as f:
-          f.write(
-              linesep.join([
-                  'metrics.reporters: file',
-                  'metrics.reporter.file.class: %s' % file_reporter,
-                  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-                  'metrics.scope.operator: <operator_name>',
-              ]))
-
-    @classmethod
-    def _subprocess_command(cls, job_port, expansion_port):
-      # will be cleaned up at the end of this method, and recreated and used by
-      # the job server
-      tmp_dir = mkdtemp(prefix='flinktest')
-
-      cls._create_conf_dir()
-      cls.expansion_port = expansion_port
-
-      try:
-        return [
-            'java',
-            '-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-            '-jar',
-            flink_job_server_jar,
-            '--flink-master',
-            '[local]',
-            '--flink-conf-dir',
-            cls.conf_dir,
-            '--artifacts-dir',
-            tmp_dir,
-            '--job-port',
-            str(job_port),
-            '--artifact-port',
-            '0',
-            '--expansion-port',
-            str(expansion_port),
-        ]
-      finally:
-        rmtree(tmp_dir)
-
-    @classmethod
-    def get_runner(cls):
-      return portable_runner.PortableRunner()
-
-    @classmethod
-    def get_expansion_service(cls):
-      # TODO Move expansion address resides into PipelineOptions
-      return 'localhost:%s' % cls.expansion_port
-
-    def create_options(self):
-      options = super(FlinkRunnerTest, self).create_options()
-      options.view_as(
-          DebugOptions).experiments = ['beam_fn_api'] + extra_experiments
-      options._all_options['parallelism'] = 2
-      options.view_as(PortableOptions).environment_type = (
-          environment_type.upper())
-      if environment_config:
-        options.view_as(PortableOptions).environment_config = 
environment_config
-
-      if streaming:
-        options.view_as(StandardOptions).streaming = True
-      return options
-
-    # Can't read host files from within docker, read a "local" file there.
-    def test_read(self):
-      with self.create_pipeline() as p:
-        lines = p | beam.io.ReadFromText('/etc/profile')
-        assert_that(lines, lambda lines: len(lines) > 0)
-
-    def test_no_subtransform_composite(self):
-      raise unittest.SkipTest("BEAM-4781")
 
-    def test_external_transform(self):
+class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
+  _use_grpc = True
+  _use_subprocesses = True
+
+  conf_dir = None
+  expansion_port = None
+  flink_job_server_jar = None
+
+  def __init__(self, *args, **kwargs):
+    super(FlinkRunnerTest, self).__init__(*args, **kwargs)
+    self.environment_type = None
+    self.environment_config = None
+
+  @pytest.fixture(autouse=True)
+  def parse_options(self, request):
+    test_pipeline_options = (
+        request.config.option.test_pipeline_options
+        if request.config.option.test_pipeline_options else '')

Review comment:
       nit:
   ```suggestion
           request.config.option.test_pipeline_options or '')
   ```

##########
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##########
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+#     [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#                               --environment_type=DOCKER"] \
+#     [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  #     --flink_job_server_jar=/path/to/job_server.jar \
-  #     --environment_type=docker \
-  #     --extra_experiments=beam_experiments \
-  #     [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-      '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-      '--streaming',
-      default=False,
-      action='store_true',
-      help='Job type. batch or streaming')
-  parser.add_argument(
-      '--environment_type',
-      default='loopback',
-      help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-      '--extra_experiments',
-      default=[],
-      action='append',
-      help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-      known_args.flink_job_server_jar or
-      job_server.JavaJarJobServer.path_to_beam_jar(
-          'runners:flink:%s:job-server:shadowJar' %
-          FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-      known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-    _use_grpc = True
-    _use_subprocesses = True
-
-    conf_dir = None
-    expansion_port = None
-
-    @classmethod
-    def tearDownClass(cls):
-      if cls.conf_dir and exists(cls.conf_dir):
-        _LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-        rmtree(cls.conf_dir)
-      super(FlinkRunnerTest, cls).tearDownClass()
-
-    @classmethod
-    def _create_conf_dir(cls):
-      """Create (and save a static reference to) a "conf dir", used to provide
-       metrics configs and verify metrics output
-
-       It gets cleaned up when the suite is done executing"""
-
-      if hasattr(cls, 'conf_dir'):
-        cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-        # path for a FileReporter to write metrics to
-        cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-        # path to write Flink configuration to
-        conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-        file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-        with open(conf_path, 'w') as f:
-          f.write(
-              linesep.join([
-                  'metrics.reporters: file',
-                  'metrics.reporter.file.class: %s' % file_reporter,
-                  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-                  'metrics.scope.operator: <operator_name>',
-              ]))
-
-    @classmethod
-    def _subprocess_command(cls, job_port, expansion_port):
-      # will be cleaned up at the end of this method, and recreated and used by
-      # the job server
-      tmp_dir = mkdtemp(prefix='flinktest')
-
-      cls._create_conf_dir()
-      cls.expansion_port = expansion_port
-
-      try:
-        return [
-            'java',
-            '-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-            '-jar',
-            flink_job_server_jar,
-            '--flink-master',
-            '[local]',
-            '--flink-conf-dir',
-            cls.conf_dir,
-            '--artifacts-dir',
-            tmp_dir,
-            '--job-port',
-            str(job_port),
-            '--artifact-port',
-            '0',
-            '--expansion-port',
-            str(expansion_port),
-        ]
-      finally:
-        rmtree(tmp_dir)
-
-    @classmethod
-    def get_runner(cls):
-      return portable_runner.PortableRunner()
-
-    @classmethod
-    def get_expansion_service(cls):
-      # TODO Move expansion address resides into PipelineOptions
-      return 'localhost:%s' % cls.expansion_port
-
-    def create_options(self):
-      options = super(FlinkRunnerTest, self).create_options()
-      options.view_as(
-          DebugOptions).experiments = ['beam_fn_api'] + extra_experiments
-      options._all_options['parallelism'] = 2
-      options.view_as(PortableOptions).environment_type = (
-          environment_type.upper())
-      if environment_config:
-        options.view_as(PortableOptions).environment_config = 
environment_config
-
-      if streaming:
-        options.view_as(StandardOptions).streaming = True
-      return options
-
-    # Can't read host files from within docker, read a "local" file there.
-    def test_read(self):
-      with self.create_pipeline() as p:
-        lines = p | beam.io.ReadFromText('/etc/profile')
-        assert_that(lines, lambda lines: len(lines) > 0)
-
-    def test_no_subtransform_composite(self):
-      raise unittest.SkipTest("BEAM-4781")
 
-    def test_external_transform(self):
+class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
+  _use_grpc = True
+  _use_subprocesses = True
+
+  conf_dir = None
+  expansion_port = None
+  flink_job_server_jar = None
+
+  def __init__(self, *args, **kwargs):
+    super(FlinkRunnerTest, self).__init__(*args, **kwargs)
+    self.environment_type = None
+    self.environment_config = None
+
+  @pytest.fixture(autouse=True)
+  def parse_options(self, request):
+    test_pipeline_options = (
+        request.config.option.test_pipeline_options
+        if request.config.option.test_pipeline_options else '')
+    parser = argparse.ArgumentParser(add_help=True)
+    parser.add_argument(
+        '--flink_job_server_jar',
+        help='Job server jar to submit jobs.',
+        action='store')
+    parser.add_argument(
+        '--environment_type',
+        default='LOOPBACK',
+        choices=['DOCKER', 'PROCESS', 'LOOPBACK'],
+        help='Set the environment type for running user code. DOCKER runs '
+        'user code in a container. PROCESS runs user code in '
+        'automatically started processes. LOOPBACK runs user code on '
+        'the same process that originally submitted the job.')
+    parser.add_argument(
+        '--environment_config',
+        help='Set environment configuration for running the user code.\n For '
+        'DOCKER: Url for the docker image.\n For PROCESS: json of the '
+        'form {"os": "<OS>", "arch": "<ARCHITECTURE>", "command": '
+        '"<process to execute>", "env":{"<Environment variables 1>": '
+        '"<ENV_VAL>"} }. All fields in the json are optional except '
+        'command.')
+    known_args, unknown_args = parser.parse_known_args(
+        test_pipeline_options.split())

Review comment:
       Could this be problematic if one of the option values contains a space, 
e.g. as part of a json string for the environment config?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to