[
https://issues.apache.org/jira/browse/BEAM-10844?focusedWorklogId=486019&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-486019
]
ASF GitHub Bot logged work on BEAM-10844:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Sep/20 01:03
Start Date: 18/Sep/20 01:03
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #12727:
URL: https://github.com/apache/beam/pull/12727#discussion_r490632829
##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
'staged in the staging area (--staging_location option) and the '
'workers will install them in same order they were specified on '
'the command line.'))
+ parser.add_argument(
+ '--docker_registry_url',
Review comment:
Maybe `docker_registry_push_url` to make it more clear.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -151,7 +151,8 @@ def create_job_resources(options, # type: PipelineOptions
setup_options = options.view_as(SetupOptions)
# Stage a requirements file if present.
- if setup_options.requirements_file is not None:
+ if (setup_options.requirements_file is not None and
Review comment:
I don't think we should have to do these checks here. We should get the
requirements file (and other artifacts) and populate them in the environment
(which is docker + ArtifactInfo). We should then reduce this to a new (docker +
ArtifactInfo) invoking the docker build routines, followed by staging the
artifacts in ArtifactInfo (if any).
##########
File path: sdks/python/container/boot.go
##########
@@ -283,3 +306,51 @@ func multiProcessExactlyOnce(actionFunc func(),
completeFileName string) {
os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
}
+
+func processArtifactsInSetupOnlyMode() error {
+ if *artifacts == "" {
+ log.Fatal("No --artifacts provided along with --setup_only
flag.")
+ }
+ workDir := filepath.Dir(*artifacts)
+ metadata, err := ioutil.ReadFile(*artifacts)
+ if err != nil {
+ log.Fatalf("Unable to open artifacts metadata file %v with
error %v", *artifacts, err)
+ }
+ var infoJsons []string
+ if err := json.Unmarshal(metadata, &infoJsons); err != nil {
+ log.Fatalf("Unable to parse metadata, error: %v", err)
+ }
+
+ files := make([]string, len(infoJsons))
+ for i, info := range infoJsons {
+ var artifactInformation pipepb.ArtifactInformation
+ if err := jsonpb.UnmarshalString(info, &artifactInformation);
err != nil {
+ log.Fatalf("Unable to unmarshal artifact information
from json string %v", info)
+ }
+
+ // For now we only expect artifacts in file type. The condition
should be revisited if the assumption is not valid any more.
+ if artifactInformation.GetTypeUrn() !=
standardArtifactFileTypeUrn {
+ log.Fatalf("Expect file artifact type in setup only
mode, found %v.", artifactInformation.GetTypeUrn())
+ }
+ filePayload := &pipepb.ArtifactFilePayload{}
+ if err := proto.Unmarshal(artifactInformation.GetTypePayload(),
filePayload); err != nil {
+ log.Fatal("Unable to unmarshal artifact information
type payload.")
+ }
+ if dir := filepath.Dir(filePayload.GetPath()); dir != workDir {
+ log.Fatalf("Artifact %v not stored in the same work
directory %v of metadata file", filePayload.GetPath(), workDir)
Review comment:
Why is this a problem?
##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
'staged in the staging area (--staging_location option) and the '
'workers will install them in same order they were specified on '
'the command line.'))
+ parser.add_argument(
+ '--docker_registry_url',
+ default=None,
+ help=(
+ 'The default docker registry to for pushing additional docker '
Review comment:
For pushing pre-prepared worker images?
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -1070,7 +1075,28 @@ def get_container_image_from_options(pipeline_options):
Returns:
str: Container image for remote execution.
"""
+ debug_options = pipeline_options.view_as(DebugOptions)
worker_options = pipeline_options.view_as(WorkerOptions)
+ container_build_engine = debug_options.lookup_experiment(
+ 'prebuild_sdk_container')
+ if (debug_options.lookup_experiment('beam_fn_api') and
Review comment:
Omit the check on beam_fn_api, as we hope to remove that soon.
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -1070,7 +1075,28 @@ def get_container_image_from_options(pipeline_options):
Returns:
str: Container image for remote execution.
"""
+ debug_options = pipeline_options.view_as(DebugOptions)
worker_options = pipeline_options.view_as(WorkerOptions)
+ container_build_engine = debug_options.lookup_experiment(
Review comment:
Why is this in apiclient? Does not makes sense to limit this to
dataflow; perhaps this should instead be put where we get the default
environment?
##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import uuid
+
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+DOCKERFILE_TEMPLATE = (
+ """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN /opt/apache/beam/boot --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerBuilder(object):
Review comment:
Maybe make a local and cloud builder with a common parent, rather than
the cloud extending from the local?
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -51,13 +51,18 @@
from apache_beam.io.gcp.internal.clients import storage
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow
from apache_beam.runners.dataflow.internal.names import PropertyNames
+from apache_beam.runners.dataflow.internal.sdk_container_cloud_builder import \
Review comment:
Just let imports flow long. (Also, if you need to wrap, do so with ()'s.)
##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -994,6 +994,23 @@ def _add_argparse_args(cls, parser):
'staged in the staging area (--staging_location option) and the '
'workers will install them in same order they were specified on '
'the command line.'))
+ parser.add_argument(
+ '--docker_registry_url',
+ default=None,
+ help=(
+ 'The default docker registry to for pushing additional docker '
+ 'images for the setup.'))
+ parser.add_argument(
Review comment:
We should simply omit such dependencies, not have an extra flag to skip
them. (Or is that not possible?)
##########
File path: sdks/python/container/boot.go
##########
@@ -203,13 +225,14 @@ func setupAcceptableWheelSpecs() error {
}
// installSetupPackages installs Beam SDK and user dependencies.
-func installSetupPackages(mds []*jobpb.ArtifactMetadata, workDir string) error
{
+func installSetupPackages(files []string, workDir string) error {
log.Printf("Installing setup packages ...")
- files := make([]string, len(mds))
- for i, v := range mds {
- log.Printf("Found artifact: %s", v.Name)
- files[i] = v.Name
+ // Check if setupResultFile exists, if so we can skip the dependency
installation since
Review comment:
We should install anything offered here, and ensure this list is empty
if we don't have anything to install, rather than relying on the details of the
install script to leave certain files around.
##########
File path: sdks/python/container/boot.go
##########
@@ -61,11 +66,22 @@ const (
sdkSrcFile = "dataflow_python_sdk.tar"
extraPackagesFile = "extra_packages.txt"
workerPoolIdEnv = "BEAM_PYTHON_WORKER_POOL_ID"
+
+ // Setup result for the setup only mode.
+ setupResultFile = "/opt/apache/beam/setup_result.json"
Review comment:
We probably don't need to hard cod this anywhere.
##########
File path: sdks/python/container/boot.go
##########
@@ -61,11 +66,22 @@ const (
sdkSrcFile = "dataflow_python_sdk.tar"
extraPackagesFile = "extra_packages.txt"
workerPoolIdEnv = "BEAM_PYTHON_WORKER_POOL_ID"
+
+ // Setup result for the setup only mode.
+ setupResultFile = "/opt/apache/beam/setup_result.json"
+ standardArtifactFileTypeUrn = "beam:artifact:type:file:v1"
Review comment:
Isn't this a constant elsewhere?
##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -1070,7 +1075,28 @@ def get_container_image_from_options(pipeline_options):
Returns:
str: Container image for remote execution.
"""
+ debug_options = pipeline_options.view_as(DebugOptions)
worker_options = pipeline_options.view_as(WorkerOptions)
+ container_build_engine = debug_options.lookup_experiment(
Review comment:
This would mean we could more easily test it as well.
##########
File path: sdks/python/apache_beam/runners/portability/sdk_container_builder.py
##########
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import uuid
+
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+DOCKERFILE_TEMPLATE = (
+ """FROM apache/beam_python{major}.{minor}_sdk:latest
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN /opt/apache/beam/boot --setup_only --artifacts {workdir}/{manifest_file}
Review comment:
Make `/opt/apache/beam/boot` a constant as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 486019)
Time Spent: 3h 10m (was: 3h)
> Add a way to prebuild python sdk container with dependencies
> ------------------------------------------------------------
>
> Key: BEAM-10844
> URL: https://issues.apache.org/jira/browse/BEAM-10844
> Project: Beam
> Issue Type: New Feature
> Components: runner-dataflow
> Reporter: Yichi Zhang
> Assignee: Yichi Zhang
> Priority: P2
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> We should add a way to prebuild python sdk container on top of latest public
> sdk image, and have all the dependencies installed, so that the setup steps
> won't need to be executed again every time a new worker vm is launched, on
> dataflow.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)