[
https://issues.apache.org/jira/browse/BEAM-12792?focusedWorklogId=754850&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754850
]
ASF GitHub Bot logged work on BEAM-12792:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Apr/22 22:54
Start Date: 08/Apr/22 22:54
Worklog Time Spent: 10m
Work Description: tvalentyn commented on code in PR #16658:
URL: https://github.com/apache/beam/pull/16658#discussion_r846529441
##########
sdks/python/container/boot.go:
##########
@@ -137,46 +145,49 @@ func main() {
options, err := provision.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
- log.Fatalf("Failed to convert pipeline options: %v", err)
+ return fmt.Errorf("Failed to convert pipeline options: %v", err)
}
// (2) Retrieve and install the staged packages.
//
- // Guard from concurrent artifact retrieval and installation,
- // when called by child processes in a worker pool.
+ // No log.Fatalf() from here on, otherwise deferred cleanups will not
be called!
- materializeArtifactsFunc := func() {
- dir := filepath.Join(*semiPersistDir, "staged")
+ // Trap signals, so we can clean up properly.
+ signalChannel := make(chan os.Signal, 1)
+ signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM)
- files, err := artifact.Materialize(ctx, *artifactEndpoint,
info.GetDependencies(), info.GetRetrievalToken(), dir)
- if err != nil {
- log.Fatalf("Failed to retrieve staged files: %v", err)
- }
+ venvDir, err := setupVenv(filepath.Join(*semiPersistDir, "beam-venv"),
*id)
+ if err != nil {
+ return fmt.Errorf("Failed to initialize Python venv.")
+ }
+ cleanupFunc := func() {
+ log.Printf("Cleaning up temporary venv ...")
Review Comment:
nit: also for messages ending with ... such as `doing <action> ...`, there
may be an expectation of an `<action> completed` message. Let's remove the
ellipsis or add a 'completed' counterpart per your choice.
Issue Time Tracking
-------------------
Worklog Id: (was: 754850)
Time Spent: 17.5h (was: 17h 20m)
> Multiple jobs running on Flink session cluster reuse the persistent Python
> environment.
> ---------------------------------------------------------------------------------------
>
> Key: BEAM-12792
> URL: https://issues.apache.org/jira/browse/BEAM-12792
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.27.0, 2.28.0, 2.29.0, 2.30.0, 2.31.0
> Environment: Kubernetes 1.20 on Ubuntu 18.04.
> Reporter: Jens Wiren
> Priority: P1
> Labels: FlinkRunner, beam
> Time Spent: 17.5h
> Remaining Estimate: 0h
>
> I'm running TFX pipelines on a Flink cluster using Beam in k8s. However,
> extra python packages passed to the Flink runner (or rather beam worker
> side-car) are only installed once per deployment cycle. Example:
> # Flink is deployed and is up and running
> # A TFX pipeline starts, submits a job to Flink along with a python whl of
> custom code and beam ops.
> # The beam worker installs the package and the pipeline finishes succesfully.
> # A new TFX pipeline is build where a new beam fn is introduced, the pipline
> is started and the new whl is submitted as in step 2).
> # This time, the new package is not being installed in the beam worker
> causing the job to fail due to a reference which does not exist in the beam
> worker, since it didn't install the new package.
>
> I started using Flink from beam version 2.27 and it has been an issue all the
> time.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)