y1chi commented on a change in pull request #12727:
URL: https://github.com/apache/beam/pull/12727#discussion_r492888461



##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,274 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions  # pylint: 
disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+    """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
+  def __init__(self, options):
+    self._options = options
+    self._temp_src_dir = tempfile.mkdtemp()

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,274 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions  # pylint: 
disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+    """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
+  def __init__(self, options):
+    self._options = options
+    self._temp_src_dir = tempfile.mkdtemp()
+    self._docker_registry_push_url = self._options.view_as(
+        SetupOptions).docker_registry_push_url
+
+  def build(self):
+    container_id = str(uuid.uuid4())
+    container_tag = os.path.join(
+        self._docker_registry_push_url or '',
+        'beam_python_prebuilt_sdk:%s' % container_id)
+    self.prepare_dependencies()
+    self.invoke_docker_build_and_push(container_id, container_tag)
+
+    return container_tag
+
+  def prepare_dependencies(self):
+    tmp = tempfile.mkdtemp()

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -136,6 +137,8 @@ def create_job_resources(options,  # type: PipelineOptions
             only for testing.
           populate_requirements_cache: Callable for populating the requirements
             cache. Used only for testing.
+          skip_boot_dependencies: Skip apache beam sdk, requirements, extra
+            packages, workflow tarball installs by sdk boot program.

Review comment:
       sounds much better, thanks.

##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -252,6 +254,13 @@ def from_runner_api_parameter(payload, capabilities, 
artifacts, context):
   @classmethod
   def from_options(cls, options):
     # type: (PipelineOptions) -> DockerEnvironment
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      prebuilt_container_image = SdkContainerBuilder.build_container_image(
+          options)
+      return cls.from_container_image(
+          container_image=prebuilt_container_image,
+          artifacts=python_sdk_dependencies(
+              options, skip_boot_dependencies=True))

Review comment:
       I think we still need it for python_sdk_dependencies to produce two 
sets(complete set or the reduced set) of artifacts

##########
File path: sdks/python/container/boot.go
##########
@@ -203,15 +223,9 @@ func setupAcceptableWheelSpecs() error {
 }
 
 // installSetupPackages installs Beam SDK and user dependencies.
-func installSetupPackages(mds []*jobpb.ArtifactMetadata, workDir string) error 
{
+func installSetupPackages(files []string, workDir string) error {

Review comment:
       done.

##########
File path: sdks/python/container/boot.go
##########
@@ -30,18 +32,21 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/artifact"
-       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/provision"
        "github.com/apache/beam/sdks/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+       "github.com/golang/protobuf/jsonpb"
        "github.com/golang/protobuf/proto"
        "github.com/nightlyone/lockfile"
 )
 
 var (
        acceptableWhlSpecs []string
 
+       setupOnly = flag.Bool("setup_only", false, "Execute boot program in 
setup only mode (optional).")

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,279 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions  # pylint: 
disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+    """FROM {base_image}
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
+  def __init__(self, options):
+    self._options = options
+    self._docker_registry_push_url = self._options.view_as(
+        SetupOptions).docker_registry_push_url
+    self._base_image = (
+        self._options.view_as(SetupOptions).prebuild_sdk_container_base_image 
or
+        'apache/beam_python%s.%s_sdk:latest' %
+        (sys.version_info[0], sys.version_info[1]))
+    self._temp_src_dir = None
+
+  def build(self):
+    container_id = str(uuid.uuid4())
+    container_tag = os.path.join(
+        self._docker_registry_push_url or '',
+        'beam_python_prebuilt_sdk:%s' % container_id)
+    with tempfile.TemporaryDirectory() as temp_folder:
+      self._temp_src_dir = temp_folder
+      self.prepare_dependencies()
+      self.invoke_docker_build_and_push(container_id, container_tag)
+
+    return container_tag
+
+  def prepare_dependencies(self):
+    with tempfile.TemporaryDirectory() as tmp:
+      resources = Stager.create_job_resources(self._options, tmp)
+      # make a copy of the staged artifacts into the temp source folder.
+      for path, name in resources:
+        shutil.copyfile(path, os.path.join(self._temp_src_dir, name))
+      with open(os.path.join(self._temp_src_dir, 'Dockerfile'), 'w') as file:
+        file.write(
+            DOCKERFILE_TEMPLATE.format(
+                base_image=self._base_image,
+                workdir=ARTIFACTS_CONTAINER_DIR,
+                manifest_file=ARTIFACTS_MANIFEST_FILE,
+                entrypoint=SDK_CONTAINER_ENTRYPOINT))
+      self.generate_artifacts_manifests_json_file(resources, 
self._temp_src_dir)
+
+  def invoke_docker_build_and_push(self, container_id, container_tag):
+    raise NotImplementedError
+
+  @staticmethod
+  def generate_artifacts_manifests_json_file(resources, temp_dir):
+    infos = []
+    for _, name in resources:
+      info = beam_runner_api_pb2.ArtifactInformation(
+          type_urn=common_urns.StandardArtifacts.Types.FILE.urn,
+          type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+              path=name).SerializeToString(),
+      )
+      infos.append(json.dumps(MessageToJson(info)))
+    with open(os.path.join(temp_dir, ARTIFACTS_MANIFEST_FILE), 'w') as file:
+      file.write('[\n' + ',\n'.join(infos) + '\n]')
+
+  @classmethod
+  def build_container_image(cls, pipeline_options):
+    # type: (PipelineOptions) -> str

Review comment:
       done.

##########
File path: sdks/python/apache_beam/examples/wordcount_it_test.py
##########
@@ -56,6 +57,17 @@ def test_wordcount_it(self):
   def test_wordcount_fnapi_it(self):
     self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
 
+  @attr('ValidatesContainer')
+  def test_wordcount_it_with_prebuilt_sdk_container(self):
+    if sys.version_info[0] < 3:

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,274 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions  # pylint: 
disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+    """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
+  def __init__(self, options):
+    self._options = options
+    self._temp_src_dir = tempfile.mkdtemp()
+    self._docker_registry_push_url = self._options.view_as(
+        SetupOptions).docker_registry_push_url
+
+  def build(self):

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -473,10 +473,16 @@ def run_pipeline(self, pipeline, options):
 
     use_fnapi = apiclient._use_fnapi(options)
     from apache_beam.transforms import environments
-    self._default_environment = (
-        environments.DockerEnvironment.from_container_image(
-            apiclient.get_container_image_from_options(options),
-            artifacts=environments.python_sdk_dependencies(options)))
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:

Review comment:
       done.

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,21 @@ def _add_argparse_args(cls, parser):
             'staged in the staging area (--staging_location option) and the '
             'workers will install them in same order they were specified on '
             'the command line.'))
+    parser.add_argument(
+        '--prebuild_sdk_container_engine',
+        choices=['local_docker', 'cloud_build'],
+        help=(
+            'Pre-builds the sdk worker container image with boot dependencies '

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,279 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions  # pylint: 
disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+    """FROM {base_image}
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
+  def __init__(self, options):
+    self._options = options
+    self._docker_registry_push_url = self._options.view_as(
+        SetupOptions).docker_registry_push_url
+    self._base_image = (
+        self._options.view_as(SetupOptions).prebuild_sdk_container_base_image 
or
+        'apache/beam_python%s.%s_sdk:latest' %
+        (sys.version_info[0], sys.version_info[1]))
+    self._temp_src_dir = None
+
+  def build(self):
+    container_id = str(uuid.uuid4())
+    container_tag = os.path.join(
+        self._docker_registry_push_url or '',
+        'beam_python_prebuilt_sdk:%s' % container_id)
+    with tempfile.TemporaryDirectory() as temp_folder:
+      self._temp_src_dir = temp_folder
+      self.prepare_dependencies()
+      self.invoke_docker_build_and_push(container_id, container_tag)
+
+    return container_tag
+
+  def prepare_dependencies(self):
+    with tempfile.TemporaryDirectory() as tmp:
+      resources = Stager.create_job_resources(self._options, tmp)
+      # make a copy of the staged artifacts into the temp source folder.
+      for path, name in resources:
+        shutil.copyfile(path, os.path.join(self._temp_src_dir, name))
+      with open(os.path.join(self._temp_src_dir, 'Dockerfile'), 'w') as file:
+        file.write(
+            DOCKERFILE_TEMPLATE.format(
+                base_image=self._base_image,
+                workdir=ARTIFACTS_CONTAINER_DIR,
+                manifest_file=ARTIFACTS_MANIFEST_FILE,
+                entrypoint=SDK_CONTAINER_ENTRYPOINT))
+      self.generate_artifacts_manifests_json_file(resources, 
self._temp_src_dir)
+
+  def invoke_docker_build_and_push(self, container_id, container_tag):
+    raise NotImplementedError
+
+  @staticmethod
+  def generate_artifacts_manifests_json_file(resources, temp_dir):
+    infos = []
+    for _, name in resources:
+      info = beam_runner_api_pb2.ArtifactInformation(
+          type_urn=common_urns.StandardArtifacts.Types.FILE.urn,
+          type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+              path=name).SerializeToString(),
+      )
+      infos.append(json.dumps(MessageToJson(info)))
+    with open(os.path.join(temp_dir, ARTIFACTS_MANIFEST_FILE), 'w') as file:
+      file.write('[\n' + ',\n'.join(infos) + '\n]')
+
+  @classmethod
+  def build_container_image(cls, pipeline_options):
+    # type: (PipelineOptions) -> str
+    setup_options = pipeline_options.view_as(SetupOptions)
+    container_build_engine = setup_options.prebuild_sdk_container_engine
+    if container_build_engine:
+      if container_build_engine == 'local_docker':
+        builder = _SdkContainerLocalBuilder(
+            pipeline_options)  # type: SdkContainerBuilder
+      elif container_build_engine == 'cloud_build':
+        builder = _SdkContainerCloudBuilder(pipeline_options)
+      else:
+        raise ValueError(
+            'Only (--prebuild_sdk_container_engine local_docker) and '
+            '(--prebuild_sdk_container_engine cloud_build) are supported')
+    else:
+      raise ValueError('No --prebuild_sdk_container_engine option specified.')
+    return builder.build()
+
+
+class _SdkContainerLocalBuilder(SdkContainerBuilder):
+  """SdkContainerLocalBuilder builds the sdk container image with local
+  docker."""
+  def invoke_docker_build_and_push(self, container_id, container_tag):
+    try:
+      _LOGGER.info("Building sdk container, this may take a few minutes...")
+      now = time.time()
+      subprocess.run(['docker', 'build', '.', '-t', container_tag],
+                     capture_output=True,
+                     check=True,
+                     cwd=self._temp_src_dir)
+    except subprocess.CalledProcessError as err:
+      raise RuntimeError(
+          'Failed to build sdk container with local docker, '
+          'stderr:\n %s.' % err.stderr)
+    else:
+      _LOGGER.info(
+          "Successfully built %s in %.2f seconds" %
+          (container_tag, time.time() - now))
+
+    if self._docker_registry_push_url:
+      _LOGGER.info("Pushing prebuilt sdk container...")
+      try:
+        subprocess.run(['docker', 'push', container_tag],
+                       capture_output=True,
+                       check=True)
+      except subprocess.CalledProcessError as err:
+        raise RuntimeError(
+            'Failed to push prebuilt sdk container %s, stderr: \n%s' %
+            (container_tag, err.stderr))
+      _LOGGER.info(
+          "Successfully pushed %s in %.2f seconds" %
+          (container_tag, time.time() - now))
+    else:
+      _LOGGER.info(
+          "no --docker_registry_push_url option is specified in pipeline "
+          "options, specify it if the new image is intended to be "
+          "pushed to a registry.")
+
+
+class _SdkContainerCloudBuilder(SdkContainerBuilder):
+  """SdkContainerLocalBuilder builds the sdk container image with google cloud
+  build."""
+  def __init__(self, options):
+    super().__init__(options)
+    self._google_cloud_options = options.view_as(GoogleCloudOptions)
+    if self._google_cloud_options.no_auth:
+      credentials = None
+    else:
+      credentials = get_service_credentials()
+    self._storage_client = storage.StorageV1(
+        url='https://www.googleapis.com/storage/v1',
+        credentials=credentials,
+        get_credentials=(not self._google_cloud_options.no_auth),
+        http=get_new_http(),
+        response_encoding='utf8')
+    if not self._docker_registry_push_url:
+      self._docker_registry_push_url = (
+          'gcr.io/%s' % self._google_cloud_options.project)
+
+  def invoke_docker_build_and_push(self, container_id, container_tag):
+    project_id = self._google_cloud_options.project
+    temp_location = self._google_cloud_options.temp_location
+    # google cloud build service expects all the build source file to be
+    # compressed into a tarball.
+    tarball_path = os.path.join(self._temp_src_dir, '%s.tgz' % SOURCE_FOLDER)
+    self._make_tarfile(tarball_path, self._temp_src_dir)
+    _LOGGER.info(
+        "Compressed source files for building sdk container at %s" %
+        tarball_path)
+
+    gcs_location = os.path.join(
+        temp_location, '%s-%s.tgz' % (SOURCE_FOLDER, container_id))
+    self._upload_to_gcs(tarball_path, gcs_location)
+
+    from google.cloud.devtools import cloudbuild_v1
+    client = cloudbuild_v1.CloudBuildClient()
+    build = cloudbuild_v1.Build()
+    build.steps = []
+    step = cloudbuild_v1.BuildStep()
+    step.name = 'gcr.io/cloud-builders/docker'
+    step.args = ['build', '-t', container_tag, '.']
+    step.dir = SOURCE_FOLDER
+
+    build.steps.append(step)
+    build.images = [container_tag]
+
+    source = cloudbuild_v1.Source()
+    source.storage_source = cloudbuild_v1.StorageSource()
+    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
+    source.storage_source.bucket = os.path.join(gcs_bucket)
+    source.storage_source.object = gcs_object
+    build.source = source
+    # TODO(zyichi): make timeout configurable
+    build.timeout = Duration().FromSeconds(seconds=1800)
+
+    now = time.time()
+    _LOGGER.info('Building sdk container, this may take a few minutes...')
+    operation = client.create_build(project_id=project_id, build=build)
+    # if build fails exception will be raised and stops the job submission.

Review comment:
       I think use may need to check the failure log first, it could be caused 
by things such as mistakenly provided wrong base image or wrong docker push url.

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -1003,6 +1003,13 @@ def _add_argparse_args(cls, parser):
             'environment. Choose the docker build engine of local docker '
             'environment or google cloud build by passing the option '
             'local_docker or cloud_build.'))
+    parser.add_argument(
+        '--prebuild_sdk_container_base_image',
+        default=None,
+        help=(
+            'The base image to use when pre-building the sdk container image '
+            'with dependencies, if not specified, the latest public '

Review comment:
       For a dev sdk version, can we use the last released version? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to