gergely-g opened a new issue, #31227: URL: https://github.com/apache/beam/issues/31227
### What happened? When building a Pipeline with multiple SqlTransforms from Beam Python, the expansion that happens in SqlTransforms is currently (Beam 2.55.0) extremely inefficient. This inefficiency has multiple sources. 1. By default, a new BeamJarExpansionService() is started for each ExpansionService. 2. The ResolveArtifacts call will unconditionally download the 300MB beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar to a temporary directory for each step (!). The latter dominates execution time. For example running a Beam from a 4 vCPU, 2 core, 16 GB memory machine (standard Dataflow workbench setup) a Pipeline with 31 trivial SQL transforms takes 200 seconds to execute. (See example below.) We found a somewhat dirty workaround to speed things up by skipping the `SqlTransform._resolve_artifacts()` altogether when working from inside Jupyter. This brings down the execution speed from 200s to 22s. I suspect these inefficiencies also contribute to [beam_sql being extremely slow even for trivialĀ queries](https://stackoverflow.com/questions/78221123/is-there-a-ways-to-speed-up-beam-sql-magic-execution). [apache_beam/runners/portability/artifact_service.py](https://github.com/apache/beam/blob/bb51380f1b29a2b69ab82ef795a8895ebd89f87e/sdks/python/apache_beam/runners/portability/artifact_service.py#L294) contains this code snippet that might be the main culprit for this inefficiency (note the `and False`): ```python if os.path.exists( payload.path) and payload.sha256 and payload.sha256 == sha256( payload.path) and False: return artifact else: return store_artifact(artifact, service, dest_dir) ``` Code to repro and demonstrate the workaround. ```python import apache_beam as beam from apache_beam.transforms.sql import SqlTransform from apache_beam.transforms.external import BeamJarExpansionService import time # By default this pipeline of 31 SQL transforms takes 200s to execute. # With the short-circuiting below execution time is reduced to 22s. class FasterSqlTransform(SqlTransform): def _resolve_artifacts(self, components, service, dest): # Short circuit unnecessary call that results in the downloading of the 300MB # beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar for each transform. return components start_time = time.time() # FasterSqlTransform with naive but shared BeamJarExpansionService with beam.Pipeline() as p: with BeamJarExpansionService(':sdks:java:extensions:sql:expansion-service:shadowJar') as shared_expansion_service: sql_result = (p | "Begin SQL" >> FasterSqlTransform("SELECT 'hello' AS message, 1 AS counter", expansion_service=shared_expansion_service)) for i in range(30): sql_result = (sql_result | f"SQL {i}" >> FasterSqlTransform("SELECT message, counter + 1 AS counter FROM PCOLLECTION", expansion_service=shared_expansion_service)) sql_result | beam.LogElements() print(f"Pipeline took {time.time() - start_time} s to execute") ``` ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [X] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
