y1chi commented on a change in pull request #12727:
URL: https://github.com/apache/beam/pull/12727#discussion_r491077972



##########
File path: sdks/python/container/boot.go
##########
@@ -283,3 +306,51 @@ func multiProcessExactlyOnce(actionFunc func(), 
completeFileName string) {
        os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
 
 }
+
+func processArtifactsInSetupOnlyMode() error {
+       if *artifacts == "" {
+               log.Fatal("No --artifacts provided along with --setup_only 
flag.")
+       }
+       workDir := filepath.Dir(*artifacts)
+       metadata, err := ioutil.ReadFile(*artifacts)
+       if err != nil {
+               log.Fatalf("Unable to open artifacts metadata file %v with 
error %v", *artifacts, err)
+       }
+       var infoJsons []string
+       if err := json.Unmarshal(metadata, &infoJsons); err != nil {
+               log.Fatalf("Unable to parse metadata, error: %v", err)
+       }
+
+       files := make([]string, len(infoJsons))
+       for i, info := range infoJsons {
+               var artifactInformation pipepb.ArtifactInformation
+               if err := jsonpb.UnmarshalString(info, &artifactInformation); 
err != nil {
+                       log.Fatalf("Unable to unmarshal artifact information 
from json string %v", info)
+               }
+
+               // For now we only expect artifacts in file type. The condition 
should be revisited if the assumption is not valid any more.
+               if artifactInformation.GetTypeUrn() != 
standardArtifactFileTypeUrn {
+                       log.Fatalf("Expect file artifact type in setup only 
mode, found %v.", artifactInformation.GetTypeUrn())
+               }
+               filePayload := &pipepb.ArtifactFilePayload{}
+               if err := proto.Unmarshal(artifactInformation.GetTypePayload(), 
filePayload); err != nil {
+                       log.Fatal("Unable to unmarshal artifact information 
type payload.")
+               }
+               if dir := filepath.Dir(filePayload.GetPath()); dir != workDir {
+                       log.Fatalf("Artifact %v not stored in the same work 
directory %v of metadata file", filePayload.GetPath(), workDir)

Review comment:
       removed it. the installation script has an implicit requirements that 
all the artifacts are in same workdir. we can guarantee it while copying the 
artifacts.

##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import uuid
+
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+DOCKERFILE_TEMPLATE = (
+    """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN /opt/apache/beam/boot --setup_only --artifacts {workdir}/{manifest_file}

Review comment:
       done.

##########
File path: sdks/python/container/boot.go
##########
@@ -61,11 +66,22 @@ const (
        sdkSrcFile        = "dataflow_python_sdk.tar"
        extraPackagesFile = "extra_packages.txt"
        workerPoolIdEnv   = "BEAM_PYTHON_WORKER_POOL_ID"
+
+       // Setup result for the setup only mode.
+       setupResultFile             = "/opt/apache/beam/setup_result.json"

Review comment:
       done.

##########
File path: sdks/python/container/boot.go
##########
@@ -203,13 +225,14 @@ func setupAcceptableWheelSpecs() error {
 }
 
 // installSetupPackages installs Beam SDK and user dependencies.
-func installSetupPackages(mds []*jobpb.ArtifactMetadata, workDir string) error 
{
+func installSetupPackages(files []string, workDir string) error {
        log.Printf("Installing setup packages ...")
 
-       files := make([]string, len(mds))
-       for i, v := range mds {
-               log.Printf("Found artifact: %s", v.Name)
-               files[i] = v.Name
+       // Check if setupResultFile exists, if so we can skip the dependency 
installation since

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -151,7 +151,8 @@ def create_job_resources(options,  # type: PipelineOptions
     setup_options = options.view_as(SetupOptions)
 
     # Stage a requirements file if present.
-    if setup_options.requirements_file is not None:
+    if (setup_options.requirements_file is not None and

Review comment:
       the artifact infos are generated from the result of stager 
create_resources, so it is easiest to reduce it in the stager, also this means 
we don't re-create or re-download unnecessary resources.

##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import uuid
+
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+DOCKERFILE_TEMPLATE = (
+    """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN /opt/apache/beam/boot --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -1070,7 +1075,28 @@ def get_container_image_from_options(pipeline_options):
     Returns:
       str: Container image for remote execution.
   """
+  debug_options = pipeline_options.view_as(DebugOptions)
   worker_options = pipeline_options.view_as(WorkerOptions)
+  container_build_engine = debug_options.lookup_experiment(

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -1070,7 +1075,28 @@ def get_container_image_from_options(pipeline_options):
     Returns:
       str: Container image for remote execution.
   """
+  debug_options = pipeline_options.view_as(DebugOptions)
   worker_options = pipeline_options.view_as(WorkerOptions)
+  container_build_engine = debug_options.lookup_experiment(
+      'prebuild_sdk_container')
+  if (debug_options.lookup_experiment('beam_fn_api') and

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -51,13 +51,18 @@
 from apache_beam.io.gcp.internal.clients import storage
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow
 from apache_beam.runners.dataflow.internal.names import PropertyNames
+from apache_beam.runners.dataflow.internal.sdk_container_cloud_builder import \

Review comment:
       done.

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
             'staged in the staging area (--staging_location option) and the '
             'workers will install them in same order they were specified on '
             'the command line.'))
+    parser.add_argument(
+        '--docker_registry_url',
+        default=None,
+        help=(
+            'The default docker registry to for pushing additional docker '

Review comment:
       done.

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
             'staged in the staging area (--staging_location option) and the '
             'workers will install them in same order they were specified on '
             'the command line.'))
+    parser.add_argument(
+        '--docker_registry_url',

Review comment:
       done.

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
             'staged in the staging area (--staging_location option) and the '
             'workers will install them in same order they were specified on '
             'the command line.'))
+    parser.add_argument(
+        '--docker_registry_url',
+        default=None,
+        help=(
+            'The default docker registry to for pushing additional docker '
+            'images for the setup.'))
+    parser.add_argument(

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to