[
https://issues.apache.org/jira/browse/BEAM-5939?focusedWorklogId=166656&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166656
]
ASF GitHub Bot logged work on BEAM-5939:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Nov/18 21:45
Start Date: 15/Nov/18 21:45
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #6976: [BEAM-5939] Dedupe
runner constants
URL: https://github.com/apache/beam/pull/6976
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index b833aaa61ae..a6756918704 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -51,6 +51,7 @@
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow
from apache_beam.runners.dataflow.internal.names import PropertyNames
+from apache_beam.runners.internal import names as shared_names
from apache_beam.runners.portability.stager import Stager
from apache_beam.transforms import cy_combiners
from apache_beam.transforms import DataflowDistributionCounter
@@ -152,7 +153,7 @@ def __init__(self, packages, options, environment_version,
pipeline_url):
self.proto.userAgent.additionalProperties.extend([
dataflow.Environment.UserAgentValue.AdditionalProperty(
key='name',
- value=to_json_value(names.BEAM_SDK_NAME)),
+ value=to_json_value(shared_names.BEAM_SDK_NAME)),
dataflow.Environment.UserAgentValue.AdditionalProperty(
key='version', value=to_json_value(beam_version.__version__))])
# Version information.
@@ -521,7 +522,7 @@ def create_job_description(self, job):
# Stage the pipeline for the runner harness
self.stage_file(job.google_cloud_options.staging_location,
- names.STAGED_PIPELINE_FILENAME,
+ shared_names.STAGED_PIPELINE_FILENAME,
io.BytesIO(job.proto_pipeline.SerializeToString()))
# Stage other resources for the SDK harness
@@ -529,7 +530,7 @@ def create_job_description(self, job):
job.proto.environment = Environment(
pipeline_url=FileSystems.join(job.google_cloud_options.staging_location,
- names.STAGED_PIPELINE_FILENAME),
+ shared_names.STAGED_PIPELINE_FILENAME),
packages=resources, options=job.options,
environment_version=self.environment_version).proto
logging.debug('JOB: %s', job)
@@ -794,7 +795,7 @@ def get_sdk_package_name():
Returns the PyPI package name to be staged to Google Cloud Dataflow.
"""
- return names.BEAM_PACKAGE_NAME
+ return shared_names.BEAM_PACKAGE_NAME
def to_split_int(n):
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 781ca2a049f..9f926e41b59 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -22,14 +22,10 @@
from __future__ import absolute_import
-# TODO (altay): Move shared names to a common location.
# Standard file names used for staging files.
from builtins import object
-PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
-STAGED_PIPELINE_FILENAME = "pipeline.pb"
-STAGED_PIPELINE_URL_METADATA_FIELD = "pipeline_url"
# String constants related to sources framework
SOURCE_FORMAT = 'custom_source'
@@ -47,12 +43,17 @@
# requires changes to SDK harness container or SDK harness launcher.
BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20181106'
+# TODO(BEAM-5939): Remove these shared names once Dataflow worker is updated.
+PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
+STAGED_PIPELINE_FILENAME = "pipeline.pb"
+STAGED_PIPELINE_URL_METADATA_FIELD = "pipeline_url"
+
# Package names for different distributions
-# TODO(BEAM-5939): Deduplicate with apache_beam/runners/portability/stager.py
BEAM_PACKAGE_NAME = 'apache-beam'
# SDK identifiers for different distributions
BEAM_SDK_NAME = 'Apache Beam SDK for Python'
+# TODO(BEAM-5393): End duplicated constants (see above).
DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'dataflow.gcr.io/v1beta3'
diff --git a/sdks/python/apache_beam/runners/internal/__init__.py
b/sdks/python/apache_beam/runners/internal/__init__.py
new file mode 100644
index 00000000000..cce3acad34a
--- /dev/null
+++ b/sdks/python/apache_beam/runners/internal/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/sdks/python/apache_beam/runners/internal/names.py
b/sdks/python/apache_beam/runners/internal/names.py
new file mode 100644
index 00000000000..c7d1b845f18
--- /dev/null
+++ b/sdks/python/apache_beam/runners/internal/names.py
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Various names shared by more than one runner."""
+
+# All constants are for internal use only; no backwards-compatibility
+# guarantees.
+
+PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
+STAGED_PIPELINE_FILENAME = "pipeline.pb"
+
+# Package names for different distributions
+BEAM_PACKAGE_NAME = 'apache-beam'
+
+# SDK identifiers for different distributions
+BEAM_SDK_NAME = 'Apache Beam SDK for Python'
diff --git a/sdks/python/apache_beam/runners/portability/stager.py
b/sdks/python/apache_beam/runners/portability/stager.py
index d5b7e005dff..fe159666803 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -61,7 +61,8 @@
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
# TODO(angoenka): Remove reference to dataflow internal names
-from apache_beam.runners.dataflow.internal import names
+from apache_beam.runners.dataflow.internal.names import
DATAFLOW_SDK_TARBALL_FILE
+from apache_beam.runners.internal import names
from apache_beam.utils import processes
# All constants are for internal use only; no backwards-compatibility
@@ -72,9 +73,6 @@
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
-# Package names for distributions
-BEAM_PACKAGE_NAME = 'apache-beam'
-
class Stager(object):
"""Abstract Stager identifies and copies the appropriate artifacts to the
@@ -96,7 +94,7 @@ def commit_manifest(self):
def get_sdk_package_name():
"""For internal use only; no backwards-compatibility guarantees.
Returns the PyPI package name to be staged."""
- return BEAM_PACKAGE_NAME
+ return names.BEAM_PACKAGE_NAME
def stage_job_resources(self,
options,
@@ -231,7 +229,7 @@ def stage_job_resources(self,
if os.path.isdir(setup_options.sdk_location):
# TODO(angoenka): remove reference to Dataflow
sdk_path = os.path.join(setup_options.sdk_location,
- names.DATAFLOW_SDK_TARBALL_FILE)
+ DATAFLOW_SDK_TARBALL_FILE)
else:
sdk_path = setup_options.sdk_location
@@ -451,7 +449,7 @@ def _desired_sdk_filename_in_staging_location(sdk_location):
else:
raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location)
else:
- return names.DATAFLOW_SDK_TARBALL_FILE
+ return DATAFLOW_SDK_TARBALL_FILE
def _stage_beam_sdk(self, sdk_remote_location, staging_location, temp_dir):
"""Stages a Beam SDK file with the appropriate version.
diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py
b/sdks/python/apache_beam/runners/portability/stager_test.py
index e700c0f9b94..d73366d807d 100644
--- a/sdks/python/apache_beam/runners/portability/stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -31,6 +31,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners.dataflow.internal import names
+from apache_beam.runners.internal import names as shared_names
from apache_beam.runners.portability import stager
@@ -145,12 +146,12 @@ def test_with_main_session(self):
options.view_as(SetupOptions).save_main_session = True
self.update_options(options)
- self.assertEqual([names.PICKLED_MAIN_SESSION_FILE],
+ self.assertEqual([shared_names.PICKLED_MAIN_SESSION_FILE],
self.stager.stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertTrue(
os.path.isfile(
- os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
+ os.path.join(staging_dir, shared_names.PICKLED_MAIN_SESSION_FILE)))
def test_default_resources(self):
staging_dir = self.make_temp_dir()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index da21418e4cc..65885096fea 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -34,7 +34,7 @@
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api import endpoints_pb2
-from apache_beam.runners.dataflow.internal import names
+from apache_beam.runners.internal import names
from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
from apache_beam.runners.worker.sdk_worker import SdkHarness
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 166656)
Time Spent: 3h 10m (was: 3h)
> Deduplicate constants
> ---------------------
>
> Key: BEAM-5939
> URL: https://issues.apache.org/jira/browse/BEAM-5939
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Ahmet Altay
> Priority: Minor
> Labels: starer
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> apache_beam/runners/dataflow/internal/names.py
> apache_beam/runners/portability/stager.py
> has same constants defined in both files.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)