zeroshade commented on a change in pull request #12518:
URL: https://github.com/apache/arrow/pull/12518#discussion_r822107374



##########
File path: go/arrow/ipc/writer.go
##########
@@ -225,6 +225,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) 
error {
        defer cancel()
 
        for i := 0; i < w.compressNP; i++ {
+               wg.Add(1)

Review comment:
       if you apply this patch with `git apply` it'll add the updated unit 
tests, since I don't have permission to push to your branch :smile:.
   
   Just tag me when you update this PR with the patch and I'll take a look.
   
   ```
   diff --git a/go/arrow/internal/arrdata/ioutil.go 
b/go/arrow/internal/arrdata/ioutil.go
   index a5b00552a..8c3ffb1d9 100644
   --- a/go/arrow/internal/arrdata/ioutil.go
   +++ b/go/arrow/internal/arrdata/ioutil.go
   @@ -178,10 +178,10 @@ func WriteFile(t *testing.T, f *os.File, mem 
memory.Allocator, schema *arrow.Sch
    }
   
    // WriteFile writes a list of records to the given file descriptor, as an 
ARROW file.
   -func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, 
schema *arrow.Schema, recs []arrow.Record, codec flatbuf.CompressionType) {
   +func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, 
schema *arrow.Schema, recs []arrow.Record, codec flatbuf.CompressionType, 
concurrency int) {
           t.Helper()
   
   -       opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem)}
   +       opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem), 
ipc.WithCompressConcurrency(concurrency)}
           switch codec {
           case flatbuf.CompressionTypeLZ4_FRAME:
                   opts = append(opts, ipc.WithLZ4())
   diff --git a/go/arrow/ipc/file_test.go b/go/arrow/ipc/file_test.go
   index 3d7e71e6e..ed3940993 100644
   --- a/go/arrow/ipc/file_test.go
   +++ b/go/arrow/ipc/file_test.go
   @@ -17,6 +17,7 @@
    package ipc_test
   
    import (
   +       "fmt"
           "io/ioutil"
           "os"
           "testing"
   @@ -64,20 +65,22 @@ func TestFileCompressed(t *testing.T) {
   
           for _, codec := range compressTypes {
                   for name, recs := range arrdata.Records {
   -                       t.Run(name, func(t *testing.T) {
   -                               mem := 
memory.NewCheckedAllocator(memory.NewGoAllocator())
   -                               defer mem.AssertSize(t, 0)
   +                       for _, n := range []int{0, 1, 2, 3} {
   +                               t.Run(fmt.Sprintf("%s compress concurrency 
%d", name, n), func(t *testing.T) {
   +                                       mem := 
memory.NewCheckedAllocator(memory.NewGoAllocator())
   +                                       defer mem.AssertSize(t, 0)
   
   -                               f, err := ioutil.TempFile(tempDir, 
"go-arrow-file-")
   -                               if err != nil {
   -                                       t.Fatal(err)
   -                               }
   -                               defer f.Close()
   +                                       f, err := ioutil.TempFile(tempDir, 
"go-arrow-file-")
   +                                       if err != nil {
   +                                               t.Fatal(err)
   +                                       }
   +                                       defer f.Close()
   
   -                               arrdata.WriteFileCompressed(t, f, mem, 
recs[0].Schema(), recs, codec)
   -                               arrdata.CheckArrowFile(t, f, mem, 
recs[0].Schema(), recs)
   -                               arrdata.CheckArrowConcurrentFile(t, f, mem, 
recs[0].Schema(), recs)
   -                       })
   +                                       arrdata.WriteFileCompressed(t, f, 
mem, recs[0].Schema(), recs, codec, n)
   +                                       arrdata.CheckArrowFile(t, f, mem, 
recs[0].Schema(), recs)
   +                                       arrdata.CheckArrowConcurrentFile(t, 
f, mem, recs[0].Schema(), recs)
   +                               })
   +                       }
                   }
           }
    }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to