This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 63705b05babb85ce5058013cae5a73daead733dc
Merge: 752bdfd 8798913
Author: Robert Burke <[email protected]>
AuthorDate: Thu Jul 30 20:42:31 2020 -0700

    [BEAM-10610][BEAM-9982] Support Loopback mode with universal runners in the 
Go SDK.
    
    This allows loopback mode to be supported in the Go SDK. This is useful for 
validating pipelines against real portable runners such as Flink, Spark, and 
the Python Portable runner.
    
    This PR also fixes some smaller bugs around logging, discovered through 
testing.
    
    Missing newlines when printing unsent log messages to the local machine.
    A small race condition when the control channel terminates prevented 
orderly shutdown of workers.
    Cleans up [BEAM-9982], and removes the redundant MustMarshal function from 
graphx.
    Increases the default GRPC recv limit to it's maximum. Testing indicates 
that this only leads to memory overage if the inputs are actually large, and 
unnecessarily restricts pipeline data.
    Loopback mode allows for convenient testing of pipelines, such as avoiding 
needing to dig into running docker VMs to access logs. Print output will appear 
as expected in the normal console. There are also some testing benefits such as 
access to the local file system for reading and writing files.
    
    The associated risk for this convenience is access to the local state of 
the program. Package level/global state is accessible in the process, and thus 
to the local workers. As per best practices, it's best to move DoFn state into 
Structural DoFns. Otherwise you risk writing pipelines that depend on state 
that doesn't exist on distributed machines.
    
    To use loopback mode, ensure that the runner package you're using is based 
on the universal runner (such as spark or flink) or use the universal runner 
directly. You can register the universal runner for use by beamx.Run by 
underscore importing it.
    
        "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    
        _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
    then add the following command lines to your job.
    --runner=universal --endpoint=<local Job management server instance> 
--environment_type=LOOPBACK
    
    This will have the framework start an BeamFnExternalWorkerPoolServer, which 
will spin up new workers in the main program process, with the runner 
orchestrating workers in the process as necessary.

 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  47 +++----
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |   7 +-
 sdks/go/pkg/beam/core/runtime/harness/logging.go   |   2 +-
 sdks/go/pkg/beam/options/jobopts/options.go        |   9 +-
 .../beam/runners/universal/extworker/extworker.go  | 120 ++++++++++++++++++
 .../runners/universal/extworker/extworker_test.go  | 141 +++++++++++++++++++++
 sdks/go/pkg/beam/runners/universal/universal.go    |  17 ++-
 sdks/go/pkg/beam/util/grpcx/dial.go                |   3 +-
 8 files changed, 314 insertions(+), 32 deletions(-)

Reply via email to