y1chi commented on a change in pull request #12727:
URL: https://github.com/apache/beam/pull/12727#discussion_r491077972
##########
File path: sdks/python/container/boot.go
##########
@@ -283,3 +306,51 @@ func multiProcessExactlyOnce(actionFunc func(),
completeFileName string) {
os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
}
+
+func processArtifactsInSetupOnlyMode() error {
+ if *artifacts == "" {
+ log.Fatal("No --artifacts provided along with --setup_only
flag.")
+ }
+ workDir := filepath.Dir(*artifacts)
+ metadata, err := ioutil.ReadFile(*artifacts)
+ if err != nil {
+ log.Fatalf("Unable to open artifacts metadata file %v with
error %v", *artifacts, err)
+ }
+ var infoJsons []string
+ if err := json.Unmarshal(metadata, &infoJsons); err != nil {
+ log.Fatalf("Unable to parse metadata, error: %v", err)
+ }
+
+ files := make([]string, len(infoJsons))
+ for i, info := range infoJsons {
+ var artifactInformation pipepb.ArtifactInformation
+ if err := jsonpb.UnmarshalString(info, &artifactInformation);
err != nil {
+ log.Fatalf("Unable to unmarshal artifact information
from json string %v", info)
+ }
+
+ // For now we only expect artifacts in file type. The condition
should be revisited if the assumption is not valid any more.
+ if artifactInformation.GetTypeUrn() !=
standardArtifactFileTypeUrn {
+ log.Fatalf("Expect file artifact type in setup only
mode, found %v.", artifactInformation.GetTypeUrn())
+ }
+ filePayload := &pipepb.ArtifactFilePayload{}
+ if err := proto.Unmarshal(artifactInformation.GetTypePayload(),
filePayload); err != nil {
+ log.Fatal("Unable to unmarshal artifact information
type payload.")
+ }
+ if dir := filepath.Dir(filePayload.GetPath()); dir != workDir {
+ log.Fatalf("Artifact %v not stored in the same work
directory %v of metadata file", filePayload.GetPath(), workDir)
Review comment:
removed it. the installation script has an implicit requirements that
all the artifacts are in same workdir. we can guarantee it while copying the
artifacts.
##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import uuid
+
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+DOCKERFILE_TEMPLATE = (
+ """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN /opt/apache/beam/boot --setup_only --artifacts {workdir}/{manifest_file}
Review comment:
done.
##########
File path: sdks/python/container/boot.go
##########
@@ -61,11 +66,22 @@ const (
sdkSrcFile = "dataflow_python_sdk.tar"
extraPackagesFile = "extra_packages.txt"
workerPoolIdEnv = "BEAM_PYTHON_WORKER_POOL_ID"
+
+ // Setup result for the setup only mode.
+ setupResultFile = "/opt/apache/beam/setup_result.json"
Review comment:
done.
##########
File path: sdks/python/container/boot.go
##########
@@ -203,13 +225,14 @@ func setupAcceptableWheelSpecs() error {
}
// installSetupPackages installs Beam SDK and user dependencies.
-func installSetupPackages(mds []*jobpb.ArtifactMetadata, workDir string) error
{
+func installSetupPackages(files []string, workDir string) error {
log.Printf("Installing setup packages ...")
- files := make([]string, len(mds))
- for i, v := range mds {
- log.Printf("Found artifact: %s", v.Name)
- files[i] = v.Name
+ // Check if setupResultFile exists, if so we can skip the dependency
installation since
Review comment:
done.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -151,7 +151,8 @@ def create_job_resources(options, # type: PipelineOptions
setup_options = options.view_as(SetupOptions)
# Stage a requirements file if present.
- if setup_options.requirements_file is not None:
+ if (setup_options.requirements_file is not None and
Review comment:
the artifact infos are generated from the result of stager
create_resources, so it is easiest to reduce it in the stager, also this means
we don't re-create or re-download unnecessary resources.
##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import uuid
+
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+DOCKERFILE_TEMPLATE = (
+ """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN /opt/apache/beam/boot --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
Review comment:
done.
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -1070,7 +1075,28 @@ def get_container_image_from_options(pipeline_options):
Returns:
str: Container image for remote execution.
"""
+ debug_options = pipeline_options.view_as(DebugOptions)
worker_options = pipeline_options.view_as(WorkerOptions)
+ container_build_engine = debug_options.lookup_experiment(
Review comment:
done.
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -1070,7 +1075,28 @@ def get_container_image_from_options(pipeline_options):
Returns:
str: Container image for remote execution.
"""
+ debug_options = pipeline_options.view_as(DebugOptions)
worker_options = pipeline_options.view_as(WorkerOptions)
+ container_build_engine = debug_options.lookup_experiment(
+ 'prebuild_sdk_container')
+ if (debug_options.lookup_experiment('beam_fn_api') and
Review comment:
done.
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -51,13 +51,18 @@
from apache_beam.io.gcp.internal.clients import storage
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow
from apache_beam.runners.dataflow.internal.names import PropertyNames
+from apache_beam.runners.dataflow.internal.sdk_container_cloud_builder import \
Review comment:
done.
##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
'staged in the staging area (--staging_location option) and the '
'workers will install them in same order they were specified on '
'the command line.'))
+ parser.add_argument(
+ '--docker_registry_url',
+ default=None,
+ help=(
+ 'The default docker registry to for pushing additional docker '
Review comment:
done.
##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
'staged in the staging area (--staging_location option) and the '
'workers will install them in same order they were specified on '
'the command line.'))
+ parser.add_argument(
+ '--docker_registry_url',
Review comment:
done.
##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
'staged in the staging area (--staging_location option) and the '
'workers will install them in same order they were specified on '
'the command line.'))
+ parser.add_argument(
+ '--docker_registry_url',
+ default=None,
+ help=(
+ 'The default docker registry to for pushing additional docker '
+ 'images for the setup.'))
+ parser.add_argument(
Review comment:
done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]