This is an automated email from the ASF dual-hosted git repository.
joellubi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new e9f6667291 GH-43443: [Go] [IPC] Infer schema from first record if not
specified (#43484)
e9f6667291 is described below
commit e9f6667291b68ef5d82b4a193fdd84c8ef06a2cf
Author: Joel Lubinitsky <[email protected]>
AuthorDate: Wed Jul 31 15:07:15 2024 -0500
GH-43443: [Go] [IPC] Infer schema from first record if not specified
(#43484)
### Rationale for this change
Fixes: #43443
Makes usage of the IPC writer and any writers that use it such the flight
writer simpler.
### What changes are included in this PR?
- Infer schema from first record if schema is not specified
- IPC and Flight tests
### Are these changes tested?
Yes
### Are there any user-facing changes?
Any `ipc.Writer` that does not specify the optional argument
`ipc.WithSchema` will no longer return an error as long as the incoming stream
of records has a consistent schema.
* GitHub Issue: #43443
Authored-by: Joel Lubinitsky <[email protected]>
Signed-off-by: Joel Lubinitsky <[email protected]>
---
go/arrow/flight/flight_test.go | 35 +++++++++++++++++++++++++++++++++++
go/arrow/ipc/writer.go | 8 ++++++--
go/arrow/ipc/writer_test.go | 19 +++++++++++++++++++
3 files changed, 60 insertions(+), 2 deletions(-)
diff --git a/go/arrow/flight/flight_test.go b/go/arrow/flight/flight_test.go
index fe896f39a2..a03d839e94 100755
--- a/go/arrow/flight/flight_test.go
+++ b/go/arrow/flight/flight_test.go
@@ -23,11 +23,13 @@ import (
"io"
"testing"
+ "github.com/apache/arrow/go/v18/arrow"
"github.com/apache/arrow/go/v18/arrow/array"
"github.com/apache/arrow/go/v18/arrow/flight"
"github.com/apache/arrow/go/v18/arrow/internal/arrdata"
"github.com/apache/arrow/go/v18/arrow/ipc"
"github.com/apache/arrow/go/v18/arrow/memory"
+ "github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
@@ -449,3 +451,36 @@ func TestReaderError(t *testing.T) {
t.Fatal("should have errored")
}
}
+
+func TestWriterInferSchema(t *testing.T) {
+ recs, ok := arrdata.Records["primitives"]
+ require.True(t, ok)
+
+ fs := flightStreamWriter{}
+ w := flight.NewRecordWriter(&fs)
+
+ for _, rec := range recs {
+ require.NoError(t, w.Write(rec))
+ }
+
+ require.NoError(t, w.Close())
+}
+
+func TestWriterInconsistentSchema(t *testing.T) {
+ recs, ok := arrdata.Records["primitives"]
+ require.True(t, ok)
+
+ schema := arrow.NewSchema([]arrow.Field{{Name: "unknown", Type:
arrow.PrimitiveTypes.Int8}}, nil)
+ fs := flightStreamWriter{}
+ w := flight.NewRecordWriter(&fs, ipc.WithSchema(schema))
+
+ require.ErrorContains(t, w.Write(recs[0]), "arrow/ipc: tried to write
record batch with different schema")
+ require.NoError(t, w.Close())
+}
+
+type flightStreamWriter struct{}
+
+// Send implements flight.DataStreamWriter.
+func (f *flightStreamWriter) Send(data *flight.FlightData) error { return nil }
+
+var _ flight.DataStreamWriter = (*flightStreamWriter)(nil)
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index ca4f77d35e..02c67635bb 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -159,15 +159,19 @@ func (w *Writer) Write(rec arrow.Record) (err error) {
}
}()
+ incomingSchema := rec.Schema()
+
if !w.started {
+ if w.schema == nil {
+ w.schema = incomingSchema
+ }
err := w.start()
if err != nil {
return err
}
}
- schema := rec.Schema()
- if schema == nil || !schema.Equal(w.schema) {
+ if incomingSchema == nil || !incomingSchema.Equal(w.schema) {
return errInconsistentSchema
}
diff --git a/go/arrow/ipc/writer_test.go b/go/arrow/ipc/writer_test.go
index e5683243e4..60d811e68e 100644
--- a/go/arrow/ipc/writer_test.go
+++ b/go/arrow/ipc/writer_test.go
@@ -235,3 +235,22 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
}
}
}
+
+func TestWriterInferSchema(t *testing.T) {
+ bldr := array.NewRecordBuilder(memory.DefaultAllocator,
arrow.NewSchema([]arrow.Field{{Name: "col", Type: arrow.PrimitiveTypes.Int8}},
nil))
+ bldr.Field(0).(*array.Int8Builder).AppendValues([]int8{1, 2, 3, 4, 5},
nil)
+ rec := bldr.NewRecord()
+ defer rec.Release()
+
+ var buf bytes.Buffer
+ w := NewWriter(&buf)
+
+ require.NoError(t, w.Write(rec))
+ require.NoError(t, w.Close())
+
+ r, err := NewReader(&buf)
+ require.NoError(t, err)
+ defer r.Release()
+
+ require.True(t, r.Schema().Equal(rec.Schema()))
+}