y1chi commented on a change in pull request #12727:
URL: https://github.com/apache/beam/pull/12727#discussion_r492220801
##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -252,6 +254,14 @@ def from_runner_api_parameter(payload, capabilities,
artifacts, context):
@classmethod
def from_options(cls, options):
# type: (PipelineOptions) -> DockerEnvironment
+ if options.view_as(DebugOptions).lookup_experiment(
+ 'prebuild_sdk_container'):
+ prebuilt_container_image = SdkContainerBuilder.build_container_imge(
Review comment:
Changed to invoking from_options in dataflow runner.
##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,275 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions # pylint:
disable=unused-import
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+ """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
+ def __init__(self, options):
+ self._options = options
+ self._temp_src_dir = tempfile.mkdtemp()
+ self._docker_registry_push_url = self._options.view_as(
+ DebugOptions).lookup_experiment('docker_registry_push_url')
+
+ def build(self):
+ container_id = str(uuid.uuid4())
+ container_tag = os.path.join(
+ self._docker_registry_push_url or '',
+ 'beam_python_prebuilt_sdk:%s' % container_id)
+ self.prepare_dependencies()
+ self.invoke_docker_build_and_push(container_id, container_tag)
+
+ return container_tag
+
+ def prepare_dependencies(self):
+ tmp = tempfile.mkdtemp()
+ resources = Stager.create_job_resources(self._options, tmp)
+ # make a copy of the staged artifacts into the temp source folder.
+ for path, name in resources:
+ shutil.copyfile(path, os.path.join(self._temp_src_dir, name))
+ with open(os.path.join(self._temp_src_dir, 'Dockerfile'), 'w') as file:
+ file.write(
+ DOCKERFILE_TEMPLATE.format(
+ major=sys.version_info[0],
+ minor=sys.version_info[1],
+ workdir=ARTIFACTS_CONTAINER_DIR,
+ manifest_file=ARTIFACTS_MANIFEST_FILE,
+ entrypoint=SDK_CONTAINER_ENTRYPOINT))
+ self.generate_artifacts_manifests_json_file(resources, self._temp_src_dir)
+
+ def invoke_docker_build_and_push(self, container_id, container_tag):
+ raise NotImplementedError
+
+ @staticmethod
+ def generate_artifacts_manifests_json_file(resources, temp_dir):
+ infos = []
+ for _, name in resources:
+ info = beam_runner_api_pb2.ArtifactInformation(
+ type_urn=common_urns.StandardArtifacts.Types.FILE.urn,
+ type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+ path=name).SerializeToString(),
+ )
+ infos.append(json.dumps(MessageToJson(info)))
+ with open(os.path.join(temp_dir, ARTIFACTS_MANIFEST_FILE), 'w') as file:
+ file.write('[\n' + ',\n'.join(infos) + '\n]')
+
+ @classmethod
+ def build_container_imge(cls, pipeline_options):
Review comment:
done.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -119,6 +119,7 @@ def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type:
Optional[str]
+ skip_boot_dependencies=False, # type: Optional[bool]
Review comment:
There are other dependencies that doesn't seem to be handled explicitly
by the boot sequence, for example pickled main session and java jars for x-lang.
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -473,10 +474,19 @@ def run_pipeline(self, pipeline, options):
use_fnapi = apiclient._use_fnapi(options)
from apache_beam.transforms import environments
- self._default_environment = (
- environments.DockerEnvironment.from_container_image(
- apiclient.get_container_image_from_options(options),
- artifacts=environments.python_sdk_dependencies(options)))
+ if options.view_as(DebugOptions).lookup_experiment(
+ 'prebuild_sdk_container'):
Review comment:
done.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -119,6 +119,7 @@ def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type:
Optional[str]
+ skip_boot_dependencies=False, # type: Optional[bool]
Review comment:
Sounds good to me. The pickled main session is expected to be stored in
semi persistent directory and I believe that is re-mounted at container launch
time right now. Should we add another flag to boot.go to specify a separate
path for prestaged picked main session directory?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]