This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 141e3e63662 Add script to cache provider artifacts for faster startup.
(#28335)
141e3e63662 is described below
commit 141e3e63662252071f74b95c8df087daa6daf12c
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Sep 12 17:05:19 2023 -0700
Add script to cache provider artifacts for faster startup. (#28335)
This should be run during template docker image creation.
---
.../apache_beam/yaml/cache_provider_artifacts.py | 46 +++++++++++++
sdks/python/apache_beam/yaml/yaml_provider.py | 76 ++++++++++++++++++----
2 files changed, 111 insertions(+), 11 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/cache_provider_artifacts.py
b/sdks/python/apache_beam/yaml/cache_provider_artifacts.py
new file mode 100644
index 00000000000..6c96dd3b0fd
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/cache_provider_artifacts.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import time
+
+from apache_beam.version import __version__ as beam_version
+from apache_beam.yaml import yaml_provider
+
+
+def cache_provider_artifacts():
+ providers_by_id = {}
+ for providers in yaml_provider.standard_providers().values():
+ for provider in providers:
+ # Dedup for better logging.
+ providers_by_id[id(provider)] = provider
+ for provider in providers_by_id.values():
+ t = time.time()
+ artifacts = provider.cache_artifacts()
+ if artifacts:
+ logging.info(
+ 'Cached %s in %0.03f seconds.', ', '.join(artifacts), time.time() -
t)
+ if '.dev' not in beam_version:
+ # Also cache a base python venv for fast cloning.
+ t = time.time()
+ artifacts = yaml_provider.PypiExpansionService._create_venv_to_clone()
+ logging.info('Cached %s in %0.03f seconds.', artifacts, time.time() - t)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ cache_provider_artifacts()
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py
b/sdks/python/apache_beam/yaml/yaml_provider.py
index d42d7aaffee..6e035811d4b 100644
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -32,6 +32,7 @@ from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Mapping
+from typing import Optional
import yaml
from yaml.loader import SafeLoader
@@ -57,6 +58,9 @@ class Provider:
"""Returns whether this provider is available to use in this
environment."""
raise NotImplementedError(type(self))
+ def cache_artifacts(self) -> Optional[Iterable[str]]:
+ raise NotImplementedError(type(self))
+
def provided_transforms(self) -> Iterable[str]:
"""Returns a list of transform type names this provider can handle."""
raise NotImplementedError(type(self))
@@ -256,17 +260,24 @@ class RemoteProvider(ExternalProvider):
self._is_available = False
return self._is_available
+ def cache_artifacts(self):
+ pass
+
class ExternalJavaProvider(ExternalProvider):
def __init__(self, urns, jar_provider):
super().__init__(
urns, lambda: external.JavaJarExpansionService(jar_provider()))
+ self._jar_provider = jar_provider
def available(self):
# pylint: disable=subprocess-run-check
return subprocess.run(['which', 'java'],
capture_output=True).returncode == 0
+ def cache_artifacts(self):
+ return [self._jar_provider()]
+
@ExternalProvider.register_provider_type('python')
def python(urns, packages=()):
@@ -289,6 +300,9 @@ class ExternalPythonProvider(ExternalProvider):
def available(self):
return True # If we're running this script, we have Python installed.
+ def cache_artifacts(self):
+ return [self._service._venv()]
+
def create_external_transform(self, urn, args):
# Python transforms are "registered" by fully qualified name.
return external.ExternalTransform(
@@ -351,6 +365,9 @@ class InlineProvider(Provider):
def available(self):
return True
+ def cache_artifacts(self):
+ pass
+
def provided_transforms(self):
return self._transform_factories.keys()
@@ -527,23 +544,60 @@ class PypiExpansionService:
self._packages = packages
self._base_python = base_python
- def _key(self):
- return json.dumps({'binary': self._base_python, 'packages':
self._packages})
+ @classmethod
+ def _key(cls, base_python, packages):
+ return json.dumps({
+ 'binary': base_python, 'packages': sorted(packages)
+ },
+ sort_keys=True)
- def _venv(self):
- venv = os.path.join(
- self.VENV_CACHE,
- hashlib.sha256(self._key().encode('utf-8')).hexdigest())
+ @classmethod
+ def _path(cls, base_python, packages):
+ return os.path.join(
+ cls.VENV_CACHE,
+ hashlib.sha256(cls._key(base_python,
+ packages).encode('utf-8')).hexdigest())
+
+ @classmethod
+ def _create_venv_from_scratch(cls, base_python, packages):
+ venv = cls._path(base_python, packages)
if not os.path.exists(venv):
- python_binary = os.path.join(venv, 'bin', 'python')
- subprocess.run([self._base_python, '-m', 'venv', venv], check=True)
- subprocess.run([python_binary, '-m', 'ensurepip'], check=True)
- subprocess.run([python_binary, '-m', 'pip', 'install'] + self._packages,
+ subprocess.run([base_python, '-m', 'venv', venv], check=True)
+ venv_python = os.path.join(venv, 'bin', 'python')
+ subprocess.run([venv_python, '-m', 'ensurepip'], check=True)
+ subprocess.run([venv_python, '-m', 'pip', 'install'] + packages,
check=True)
with open(venv + '-requirements.txt', 'w') as fout:
- fout.write('\n'.join(self._packages))
+ fout.write('\n'.join(packages))
return venv
+ @classmethod
+ def _create_venv_from_clone(cls, base_python, packages):
+ venv = cls._path(base_python, packages)
+ if not os.path.exists(venv):
+ clonable_venv = cls._create_venv_to_clone(base_python)
+ clonable_python = os.path.join(clonable_venv, 'bin', 'python')
+ subprocess.run(
+ [clonable_python, '-m', 'clonevirtualenv', clonable_venv, venv],
+ check=True)
+ venv_binary = os.path.join(venv, 'bin', 'python')
+ subprocess.run([venv_binary, '-m', 'pip', 'install'] + packages,
+ check=True)
+ with open(venv + '-requirements.txt', 'w') as fout:
+ fout.write('\n'.join(packages))
+ return venv
+
+ @classmethod
+ def _create_venv_to_clone(cls, base_python):
+ return cls._create_venv_from_scratch(
+ base_python, [
+ 'apache_beam[dataframe,gcp,test]==' + beam_version,
+ 'virtualenv-clone'
+ ])
+
+ def _venv(self):
+ return self._create_venv_from_clone(self._base_python, self._packages)
+
def __enter__(self):
venv = self._venv()
self._service_provider = subprocess_server.SubprocessServer(