This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b28373b  [BEAM-12141] Print sha256 and size when downloading artifacts 
via artifact retrieval service
     new 974c2de  Merge pull request #14492 from ihji/BEAM-12141
b28373b is described below

commit b28373b6ca9e251a6b3b0ca234f8e12720301838
Author: Heejong Lee <[email protected]>
AuthorDate: Thu Apr 8 21:15:40 2021 -0700

    [BEAM-12141] Print sha256 and size when downloading artifacts via artifact 
retrieval service
---
 sdks/go/pkg/beam/artifact/materialize.go | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/sdks/go/pkg/beam/artifact/materialize.go 
b/sdks/go/pkg/beam/artifact/materialize.go
index 7f975a2..c730f51 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -204,7 +204,7 @@ func (a artifact) retrieve(ctx context.Context, dest 
string) error {
        }
        w := bufio.NewWriter(fd)
 
-       err = writeChunks(stream, w)
+       sha256Hash, err := writeChunks(stream, w)
        if err != nil {
                fd.Close() // drop any buffered content
                return errors.Wrapf(err, "failed to retrieve chunk for %v", 
filename)
@@ -213,24 +213,30 @@ func (a artifact) retrieve(ctx context.Context, dest 
string) error {
                fd.Close()
                return errors.Wrapf(err, "failed to flush chunks for %v", 
filename)
        }
+       stat, _ := fd.Stat()
+       log.Printf("Downloaded: %v (sha256: %v, size: %v)", filename, 
sha256Hash, stat.Size())
+
        return fd.Close()
 }
 
-func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w 
io.Writer) error {
+func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w 
io.Writer) (string, error) {
+       sha256W := sha256.New()
        for {
                chunk, err := stream.Recv()
                if err == io.EOF {
                        break
                }
                if err != nil {
-                       return err
+                       return "", err
+               }
+               if _, err := sha256W.Write(chunk.Data); err != nil {
+                       panic(err) // cannot fail
                }
-
                if _, err := w.Write(chunk.Data); err != nil {
-                       return errors.Wrapf(err, "chunk write failed")
+                       return "", errors.Wrapf(err, "chunk write failed")
                }
        }
-       return nil
+       return hex.EncodeToString(sha256W.Sum(nil)), nil
 }
 
 func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest 
string) ([]*pipepb.ArtifactInformation, error) {

Reply via email to