gergely-g opened a new issue, #31227:
URL: https://github.com/apache/beam/issues/31227

   ### What happened?
   
   When building a Pipeline with multiple SqlTransforms from Beam Python, the 
expansion that happens in SqlTransforms is currently (Beam 2.55.0) extremely 
inefficient.
   
   This inefficiency has multiple sources.
   1. By default, a new BeamJarExpansionService() is started for each 
ExpansionService.
   2. The ResolveArtifacts call will unconditionally download the 300MB 
beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar to a temporary 
directory for each step (!).
   
   The latter dominates execution time.
   For example running a Beam from a 4 vCPU, 2 core, 16 GB memory machine 
(standard Dataflow workbench setup) a Pipeline with 31 trivial SQL transforms 
takes 200 seconds to execute.
   (See example below.)
   
   We found a somewhat dirty workaround to speed things up by skipping the 
`SqlTransform._resolve_artifacts()` altogether when working from inside Jupyter.
   
   This brings down the execution speed from 200s to 22s.
   
   I suspect these inefficiencies also contribute to [beam_sql being extremely 
slow even for trivialĀ 
queries](https://stackoverflow.com/questions/78221123/is-there-a-ways-to-speed-up-beam-sql-magic-execution).
   
   
[apache_beam/runners/portability/artifact_service.py](https://github.com/apache/beam/blob/bb51380f1b29a2b69ab82ef795a8895ebd89f87e/sdks/python/apache_beam/runners/portability/artifact_service.py#L294)
 contains this code snippet that might be the main culprit for this 
inefficiency (note the `and False`):
   
   ```python
       if os.path.exists(
           payload.path) and payload.sha256 and payload.sha256 == sha256(
               payload.path) and False:
         return artifact
       else:
         return store_artifact(artifact, service, dest_dir)
   ```
   
   
   Code to repro and demonstrate the workaround.
   
   ```python
   import apache_beam as beam
   from apache_beam.transforms.sql import SqlTransform
   from apache_beam.transforms.external import BeamJarExpansionService
   import time
   
   # By default this pipeline of 31 SQL transforms takes 200s to execute.
   # With the short-circuiting below execution time is reduced to 22s.
   
   class FasterSqlTransform(SqlTransform):
       def _resolve_artifacts(self, components, service, dest):
           # Short circuit unnecessary call that results in the downloading of 
the 300MB
           # beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar for 
each transform.
           return components
   
   start_time = time.time()
   
   # FasterSqlTransform with naive but shared BeamJarExpansionService
   with beam.Pipeline() as p:
       with 
BeamJarExpansionService(':sdks:java:extensions:sql:expansion-service:shadowJar')
 as shared_expansion_service:
           sql_result = (p |
                         "Begin SQL" >> FasterSqlTransform("SELECT 'hello' AS 
message, 1 AS counter",
                                                              
expansion_service=shared_expansion_service))
           for i in range(30):
               sql_result = (sql_result
                             | f"SQL {i}" >>
                             FasterSqlTransform("SELECT message, counter + 1 AS 
counter FROM PCOLLECTION",
                                                   
expansion_service=shared_expansion_service))
           sql_result | beam.LogElements()
   
   print(f"Pipeline took {time.time() - start_time} s to execute")
   ```
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to