[ 
https://issues.apache.org/jira/browse/BEAM-10527?focusedWorklogId=464816&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-464816
 ]

ASF GitHub Bot logged work on BEAM-10527:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/20 00:43
            Start Date: 31/Jul/20 00:43
    Worklog Time Spent: 10m 
      Work Description: ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463346402



##########
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##########
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+#     [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#                               --environment_type=DOCKER"] \
+#     [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  #     --flink_job_server_jar=/path/to/job_server.jar \
-  #     --environment_type=docker \
-  #     --extra_experiments=beam_experiments \
-  #     [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-      '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-      '--streaming',
-      default=False,
-      action='store_true',
-      help='Job type. batch or streaming')
-  parser.add_argument(
-      '--environment_type',
-      default='loopback',
-      help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-      '--extra_experiments',
-      default=[],
-      action='append',
-      help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-      known_args.flink_job_server_jar or
-      job_server.JavaJarJobServer.path_to_beam_jar(
-          'runners:flink:%s:job-server:shadowJar' %
-          FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-      known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-    _use_grpc = True
-    _use_subprocesses = True
-
-    conf_dir = None
-    expansion_port = None
-
-    @classmethod
-    def tearDownClass(cls):
-      if cls.conf_dir and exists(cls.conf_dir):
-        _LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-        rmtree(cls.conf_dir)
-      super(FlinkRunnerTest, cls).tearDownClass()
-
-    @classmethod
-    def _create_conf_dir(cls):
-      """Create (and save a static reference to) a "conf dir", used to provide
-       metrics configs and verify metrics output
-
-       It gets cleaned up when the suite is done executing"""
-
-      if hasattr(cls, 'conf_dir'):
-        cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-        # path for a FileReporter to write metrics to
-        cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-        # path to write Flink configuration to
-        conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-        file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-        with open(conf_path, 'w') as f:
-          f.write(
-              linesep.join([
-                  'metrics.reporters: file',
-                  'metrics.reporter.file.class: %s' % file_reporter,
-                  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-                  'metrics.scope.operator: <operator_name>',
-              ]))
-
-    @classmethod
-    def _subprocess_command(cls, job_port, expansion_port):
-      # will be cleaned up at the end of this method, and recreated and used by
-      # the job server
-      tmp_dir = mkdtemp(prefix='flinktest')
-
-      cls._create_conf_dir()
-      cls.expansion_port = expansion_port
-
-      try:
-        return [
-            'java',
-            '-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-            '-jar',
-            flink_job_server_jar,
-            '--flink-master',
-            '[local]',
-            '--flink-conf-dir',
-            cls.conf_dir,
-            '--artifacts-dir',
-            tmp_dir,
-            '--job-port',
-            str(job_port),
-            '--artifact-port',
-            '0',
-            '--expansion-port',
-            str(expansion_port),
-        ]
-      finally:
-        rmtree(tmp_dir)
-
-    @classmethod
-    def get_runner(cls):
-      return portable_runner.PortableRunner()
-
-    @classmethod
-    def get_expansion_service(cls):
-      # TODO Move expansion address resides into PipelineOptions
-      return 'localhost:%s' % cls.expansion_port
-
-    def create_options(self):
-      options = super(FlinkRunnerTest, self).create_options()
-      options.view_as(
-          DebugOptions).experiments = ['beam_fn_api'] + extra_experiments
-      options._all_options['parallelism'] = 2
-      options.view_as(PortableOptions).environment_type = (
-          environment_type.upper())
-      if environment_config:
-        options.view_as(PortableOptions).environment_config = 
environment_config
-
-      if streaming:
-        options.view_as(StandardOptions).streaming = True
-      return options
-
-    # Can't read host files from within docker, read a "local" file there.
-    def test_read(self):
-      with self.create_pipeline() as p:
-        lines = p | beam.io.ReadFromText('/etc/profile')
-        assert_that(lines, lambda lines: len(lines) > 0)
-
-    def test_no_subtransform_composite(self):
-      raise unittest.SkipTest("BEAM-4781")
 
-    def test_external_transform(self):
+class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
+  _use_grpc = True
+  _use_subprocesses = True
+
+  conf_dir = None
+  expansion_port = None
+  flink_job_server_jar = None
+
+  def __init__(self, *args, **kwargs):
+    super(FlinkRunnerTest, self).__init__(*args, **kwargs)
+    self.environment_type = None
+    self.environment_config = None
+
+  @pytest.fixture(autouse=True)
+  def parse_options(self, request):
+    test_pipeline_options = (
+        request.config.option.test_pipeline_options
+        if request.config.option.test_pipeline_options else '')
+    parser = argparse.ArgumentParser(add_help=True)
+    parser.add_argument(
+        '--flink_job_server_jar',
+        help='Job server jar to submit jobs.',
+        action='store')
+    parser.add_argument(
+        '--environment_type',
+        default='LOOPBACK',
+        choices=['DOCKER', 'PROCESS', 'LOOPBACK'],
+        help='Set the environment type for running user code. DOCKER runs '
+        'user code in a container. PROCESS runs user code in '
+        'automatically started processes. LOOPBACK runs user code on '
+        'the same process that originally submitted the job.')
+    parser.add_argument(
+        '--environment_config',
+        help='Set environment configuration for running the user code.\n For '
+        'DOCKER: Url for the docker image.\n For PROCESS: json of the '
+        'form {"os": "<OS>", "arch": "<ARCHITECTURE>", "command": '
+        '"<process to execute>", "env":{"<Environment variables 1>": '
+        '"<ENV_VAL>"} }. All fields in the json are optional except '
+        'command.')
+    known_args, unknown_args = parser.parse_known_args(
+        test_pipeline_options.split())

Review comment:
       Good catch. I used `shlex.split(...)` instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 464816)
    Time Spent: 2.5h  (was: 2h 20m)

> Python2_PVR_Flink precommit should publish test results to Jenkins
> ------------------------------------------------------------------
>
>                 Key: BEAM-10527
>                 URL: https://issues.apache.org/jira/browse/BEAM-10527
>             Project: Beam
>          Issue Type: Improvement
>          Components: testing
>            Reporter: Kyle Weaver
>            Assignee: Kyle Weaver
>            Priority: P2
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Right now we only have the logs, which often require scrolling up to see the 
> failure (which itself often requires curl'ing the logs because they are too 
> large for a browser to load comfortably). This causes frequent 
> misunderstandings. For example, folks often mistake errors printed by 
> pipelines that are meant to fail (e.g. test_error_message_includes_stage) for 
> actual test failures.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to