lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present. URL: https://github.com/apache/beam/pull/11305#discussion_r403270941
########## File path: sdks/go/pkg/beam/artifact/materialize_test.go ########## @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te return md } +// Test for new artifact retrieval. + +func TestNewRetrieveWithManyFiles(t *testing.T) { + expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"} + + client := fakeRetrievalService{ + artifacts: expected, + } + + dest := makeTempDir(t) + defer os.RemoveAll(dest) + ctx := grpcx.WriteWorkerID(context.Background(), "worker") + + mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest) + if err != nil { + t.Fatalf("materialize failed: %v", err) + } + + checkStagedFiles(mds, dest, expected, t) +} + +func TestNewRetrieveWithResolution(t *testing.T) { + expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"} + + client := fakeRetrievalService{ + artifacts: expected, + } + + dest := makeTempDir(t) + defer os.RemoveAll(dest) + ctx := grpcx.WriteWorkerID(context.Background(), "worker") + + mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest) + if err != nil { + t.Fatalf("materialize failed: %v", err) + } + + checkStagedFiles(mds, dest, expected, t) +} + +func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) { + if len(mds) != len(expected) { + t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected)) + } + for _, md := range mds { + filename := filepath.Join(dest, filepath.FromSlash(md.Name)) + fd, err := os.Open(filename) + if err != nil { + t.Errorf("error opening file %v", err) + } + defer fd.Close() + + data := make([]byte, 1<<20) + n, err := fd.Read(data) + if err != nil { + t.Errorf("error reading file %v", err) + } + + if string(data[:n]) != expected[md.Name] { + t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name]) + } + } +} + +type fakeRetrievalService struct { + artifacts map[string]string // name -> content +} + +func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation { + var artifacts []*pipeline_v1.ArtifactInformation + for name, contents := range fake.artifacts { + payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{ + StagedName: name}) + artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{ + TypeUrn: "resolved", + TypePayload: []byte(contents), + RoleUrn: URNStagingTo, + RolePayload: payload, + }) + } + return artifacts +} + +func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation { + return []*pipeline_v1.ArtifactInformation{ + &pipeline_v1.ArtifactInformation{ + TypeUrn: "unresolved", + }, + } +} + +func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) { + response := pb.ResolveArtifactResponse{} + for _, dep := range request.Artifacts { + if dep.TypeUrn == "unresolved" { + response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...) + } else { + response.Replacements = append(response.Replacements, dep) + } + } + return &response, nil +} + +func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) { + var index int + if request.Artifact.TypeUrn == "resolved" { + return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil + } else { + return nil, errors.Errorf("Unsupported artifact %v", request.Artifact) + } +} + +func (fake fakeGetArtifactResponse) Recv() (*pb.GetArtifactResponse, error) { + if *fake.index < len(fake.data) { + *fake.index += 1 + return &pb.GetArtifactResponse{Data: fake.data[*fake.index-1 : *fake.index]}, nil + } else { + return nil, io.EOF + } +} + +type fakeGetArtifactResponse struct { + data []byte + index *int Review comment: Why a pointer to the index rather than make the methods on the pointer to the struct? The implementation seems to be trying to avoid the implicit value method copying, instead of just using pointer methods. See https://golang.org/doc/effective_go.html#methods for more information. tldr; replacing all the (fake fakeGetArtifactResponse) with (fake *fakeGetArtifactResponse) puts the method on the pointer, which means that repeated calls have access to the full state, and you don't need the indirections to have index increment as expected. This also applies to the other fakes or interfaces you've written, but given that you're only modifying reference state if at all (eg. maps, or pointers), then it doesn't matter so much that a value method is being used. The general rule is that when in doubt, use a pointer receiver. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services