y1chi commented on a change in pull request #12727:
URL: https://github.com/apache/beam/pull/12727#discussion_r493104239



##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,279 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions  # pylint: 
disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+    """FROM {base_image}
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
+  def __init__(self, options):
+    self._options = options
+    self._docker_registry_push_url = self._options.view_as(
+        SetupOptions).docker_registry_push_url
+    self._base_image = (
+        self._options.view_as(SetupOptions).prebuild_sdk_container_base_image 
or
+        'apache/beam_python%s.%s_sdk:latest' %
+        (sys.version_info[0], sys.version_info[1]))
+    self._temp_src_dir = None
+
+  def build(self):
+    container_id = str(uuid.uuid4())
+    container_tag = os.path.join(
+        self._docker_registry_push_url or '',
+        'beam_python_prebuilt_sdk:%s' % container_id)
+    with tempfile.TemporaryDirectory() as temp_folder:
+      self._temp_src_dir = temp_folder
+      self.prepare_dependencies()
+      self.invoke_docker_build_and_push(container_id, container_tag)
+
+    return container_tag
+
+  def prepare_dependencies(self):
+    with tempfile.TemporaryDirectory() as tmp:
+      resources = Stager.create_job_resources(self._options, tmp)
+      # make a copy of the staged artifacts into the temp source folder.
+      for path, name in resources:
+        shutil.copyfile(path, os.path.join(self._temp_src_dir, name))
+      with open(os.path.join(self._temp_src_dir, 'Dockerfile'), 'w') as file:
+        file.write(
+            DOCKERFILE_TEMPLATE.format(
+                base_image=self._base_image,
+                workdir=ARTIFACTS_CONTAINER_DIR,
+                manifest_file=ARTIFACTS_MANIFEST_FILE,
+                entrypoint=SDK_CONTAINER_ENTRYPOINT))
+      self.generate_artifacts_manifests_json_file(resources, 
self._temp_src_dir)
+
+  def invoke_docker_build_and_push(self, container_id, container_tag):
+    raise NotImplementedError
+
+  @staticmethod
+  def generate_artifacts_manifests_json_file(resources, temp_dir):
+    infos = []
+    for _, name in resources:
+      info = beam_runner_api_pb2.ArtifactInformation(
+          type_urn=common_urns.StandardArtifacts.Types.FILE.urn,
+          type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+              path=name).SerializeToString(),
+      )
+      infos.append(json.dumps(MessageToJson(info)))
+    with open(os.path.join(temp_dir, ARTIFACTS_MANIFEST_FILE), 'w') as file:
+      file.write('[\n' + ',\n'.join(infos) + '\n]')
+
+  @classmethod
+  def build_container_image(cls, pipeline_options):
+    # type: (PipelineOptions) -> str

Review comment:
       done.

##########
File path: sdks/python/apache_beam/examples/wordcount_it_test.py
##########
@@ -56,6 +57,17 @@ def test_wordcount_it(self):
   def test_wordcount_fnapi_it(self):
     self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
 
+  @attr('ValidatesContainer')
+  def test_wordcount_it_with_prebuilt_sdk_container(self):
+    if sys.version_info[0] < 3:

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,274 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions  # pylint: 
disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+    """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
+  def __init__(self, options):
+    self._options = options
+    self._temp_src_dir = tempfile.mkdtemp()
+    self._docker_registry_push_url = self._options.view_as(
+        SetupOptions).docker_registry_push_url
+
+  def build(self):

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -473,10 +473,16 @@ def run_pipeline(self, pipeline, options):
 
     use_fnapi = apiclient._use_fnapi(options)
     from apache_beam.transforms import environments
-    self._default_environment = (
-        environments.DockerEnvironment.from_container_image(
-            apiclient.get_container_image_from_options(options),
-            artifacts=environments.python_sdk_dependencies(options)))
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:

Review comment:
       done.

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,21 @@ def _add_argparse_args(cls, parser):
             'staged in the staging area (--staging_location option) and the '
             'workers will install them in same order they were specified on '
             'the command line.'))
+    parser.add_argument(
+        '--prebuild_sdk_container_engine',
+        choices=['local_docker', 'cloud_build'],
+        help=(
+            'Pre-builds the sdk worker container image with boot dependencies '

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to