damccorm commented on code in PR #22132:
URL: https://github.com/apache/beam/pull/22132#discussion_r914780908
##########
sdks/typescript/boot.go:
##########
@@ -87,7 +90,71 @@ func main() {
log.Fatalf("Failed to convert pipeline options: %v", err)
}
- // (2) Invoke the Node entrypoint, passing the Fn API container
contract info as flags.
+ // (2) Retrieve and install the staged packages.
+
+ dir := filepath.Join(*semiPersistDir, *id, "staged")
+ artifacts, err := artifact.Materialize(ctx, *artifactEndpoint,
info.GetDependencies(), info.GetRetrievalToken(), dir)
+ if err != nil {
+ log.Fatalf("Failed to retrieve staged files: %v", err)
+ }
+
+ // Create a package.json that names given dependencies as overrides.
+ npmOverrides := make(map[string]string)
+ for _, v := range artifacts {
+ name, _ := artifact.MustExtractFilePayload(v)
+ path := filepath.Join(dir, name)
+ if v.RoleUrn == "beam:artifact:type:npm_dep:v1" {
+ // Npm cannot handle arbitrary suffixes.
+ suffixedPath := path + ".tar"
+ e := os.Rename(path, suffixedPath)
+ if e != nil {
+ log.Fatal(e)
+ }
+ npmOverrides[string(v.RolePayload)] = suffixedPath
+ }
+ }
+ if len(npmOverrides) > 0 {
Review Comment:
I don't quite follow why we're doing this - why can't we just install these
dependencies with `npm install`?
##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -348,12 +348,15 @@ def get_all_options(
del result[k]
if overrides:
- _LOGGER.warning("Discarding invalid overrides: %s", overrides)
+ if retain_unknown_options:
+ result.update(overrides)
Review Comment:
Above, we check:
```
if (drop_default and parser.get_default(k) == result[k] and
not isinstance(parser.get_default(k), ValueProvider)):
del result[k]
```
Do we need to do something similar here?
If so (or maybe regardless), it might be cleaner to conditionally do
something like `for k in overrides` instead of `for k in list(result)` above
when retain_unkown_options is set. That also keeps any future updates in one
place
##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -660,35 +665,42 @@ def _stage_resources(self, pipeline, options):
'Found duplicated artifact sha256: %s (%s)',
type_payload.path,
type_payload.sha256)
- staged_name = staged_hashes[type_payload.sha256]
- dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
- staged_name=staged_name).SerializeToString()
+ remote_name = staged_hashes[type_payload.sha256]
+ if is_staged_role:
+ # We should not be overriding this, as dep.role_payload.staged_name
+ # refers to the desired name on the worker, whereas staged_name
+ # refers to its placement in a distributed filesystem.
Review Comment:
I'm a little confused by this comment - are you saying we're currently doing
the wrong thing here? If so, should this be a TODO?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]