This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push: new 0606cfb4 refactor(arrow): fourth increment of the Record -> RecordBatch migration (#486) 0606cfb4 is described below commit 0606cfb49ee29c3badfa7cd12082133eda4062a1 Author: Mandukhai Alimaa <114253933+mandukhai-ali...@users.noreply.github.com> AuthorDate: Fri Aug 29 05:49:29 2025 +0800 refactor(arrow): fourth increment of the Record -> RecordBatch migration (#486) ### Rationale for this change Rename the Record interface to RecordBatch for clarity, since Record commonly means a single row but this type represents a batch of rows. In addition, some method names such as NewRecord have been changed to NewRecordBatch, but the old methods still exist as wrappers around the newly named methods to maintain backward compatibility. ### What changes are included in this PR? The following packages now use RecordBatch instead of Record: arrow/ipc arrow/flight arrow/flight/flightsql arrow/internal/flight_integration arrow/array In addition: - Fixed interface compatibility issues in dependent packages (arrow/avro, arrow/csv, arrow/cdata, parquet/pqarrow) - Updated all Record() methods to call RecordBatch() for consistency ### Are these changes tested? All affected packages build successfully and core tests pass. Some tests requiring external dependencies were skipped. This is the fourth increment of the Record → RecordBatch migration. --------- Co-authored-by: MANDY Alimaa <ali...@wisc.edu> --- arrow/array/json_reader.go | 12 +++- arrow/array/record.go | 74 ++++++++++++++-------- arrow/array/table.go | 7 +- arrow/array/util.go | 2 +- arrow/avro/reader.go | 9 ++- arrow/cdata/cdata.go | 7 +- arrow/cdata/cdata_test.go | 5 +- arrow/csv/reader.go | 9 ++- arrow/flight/flightsql/client.go | 4 +- arrow/flight/flightsql/example/sql_batch_reader.go | 7 +- arrow/flight/flightsql/example/sqlite_server.go | 2 +- .../example/sqlite_tables_schema_batch_reader.go | 7 +- arrow/flight/flightsql/example/type_info.go | 4 +- arrow/flight/record_batch_reader.go | 6 +- arrow/flight/record_batch_writer.go | 6 +- arrow/internal/flight_integration/scenario.go | 14 ++-- arrow/ipc/file_reader.go | 56 ++++++++++------ arrow/ipc/file_writer.go | 2 +- arrow/ipc/reader.go | 21 ++++-- arrow/ipc/writer.go | 10 +-- parquet/pqarrow/file_reader.go | 5 +- 21 files changed, 176 insertions(+), 93 deletions(-) diff --git a/arrow/array/json_reader.go b/arrow/array/json_reader.go index b0698b3a..a5bc1ba8 100644 --- a/arrow/array/json_reader.go +++ b/arrow/array/json_reader.go @@ -75,7 +75,7 @@ type JSONReader struct { bldr *RecordBuilder refs atomic.Int64 - cur arrow.Record + cur arrow.RecordBatch err error chunk int @@ -124,9 +124,15 @@ func (r *JSONReader) Err() error { return r.err } func (r *JSONReader) Schema() *arrow.Schema { return r.schema } +// RecordBatch returns the last read in record batch. The returned record batch is only valid +// until the next call to Next unless Retain is called on the record batch itself. +func (r *JSONReader) RecordBatch() arrow.RecordBatch { return r.cur } + // Record returns the last read in record. The returned record is only valid // until the next call to Next unless Retain is called on the record itself. -func (r *JSONReader) Record() arrow.Record { return r.cur } +// +// Deprecated: Use [RecordBatch] instead. +func (r *JSONReader) Record() arrow.Record { return r.RecordBatch() } func (r *JSONReader) Retain() { r.refs.Add(1) @@ -144,7 +150,7 @@ func (r *JSONReader) Release() { } } -// Next returns true if it read in a record, which will be available via Record +// Next returns true if it read in a record, which will be available via RecordBatch // and false if there is either an error or the end of the reader. func (r *JSONReader) Next() bool { if r.cur != nil { diff --git a/arrow/array/record.go b/arrow/array/record.go index 69f4cbaf..e71e46e3 100644 --- a/arrow/array/record.go +++ b/arrow/array/record.go @@ -37,6 +37,8 @@ type RecordReader interface { Schema() *arrow.Schema Next() bool + RecordBatch() arrow.RecordBatch + // Deprecated: Use [RecordBatch] instead. Record() arrow.Record Err() error } @@ -46,12 +48,12 @@ type simpleRecords struct { refCount atomic.Int64 schema *arrow.Schema - recs []arrow.Record - cur arrow.Record + recs []arrow.RecordBatch + cur arrow.RecordBatch } // NewRecordReader returns a simple iterator over the given slice of records. -func NewRecordReader(schema *arrow.Schema, recs []arrow.Record) (RecordReader, error) { +func NewRecordReader(schema *arrow.Schema, recs []arrow.RecordBatch) (RecordReader, error) { rs := &simpleRecords{ schema: schema, recs: recs, @@ -96,8 +98,11 @@ func (rs *simpleRecords) Release() { } } -func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema } -func (rs *simpleRecords) Record() arrow.Record { return rs.cur } +func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema } +func (rs *simpleRecords) RecordBatch() arrow.RecordBatch { return rs.cur } + +// Deprecated: Use [RecordBatch] instead. +func (rs *simpleRecords) Record() arrow.Record { return rs.RecordBatch() } func (rs *simpleRecords) Next() bool { if len(rs.recs) == 0 { return false @@ -121,11 +126,11 @@ type simpleRecord struct { arrs []arrow.Array } -// NewRecord returns a basic, non-lazy in-memory record batch. +// NewRecordBatch returns a basic, non-lazy in-memory record batch. // -// NewRecord panics if the columns and schema are inconsistent. -// NewRecord panics if rows is larger than the height of the columns. -func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record { +// NewRecordBatch panics if the columns and schema are inconsistent. +// NewRecordBatch panics if rows is larger than the height of the columns. +func NewRecordBatch(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.RecordBatch { rec := &simpleRecord{ schema: schema, rows: nrows, @@ -156,7 +161,12 @@ func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Reco return rec } -func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error) { +// Deprecated: Use [NewRecordBatch] instead. +func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record { + return NewRecordBatch(schema, cols, nrows) +} + +func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.RecordBatch, error) { if i < 0 || i >= len(rec.arrs) { return nil, fmt.Errorf("arrow/array: column index out of range [0, %d): got=%d", len(rec.arrs), i) } @@ -179,7 +189,7 @@ func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error) copy(arrs, rec.arrs) arrs[i] = arr - return NewRecord(rec.schema, arrs, rec.rows), nil + return NewRecordBatch(rec.schema, arrs, rec.rows), nil } func (rec *simpleRecord) validate() error { @@ -242,7 +252,7 @@ func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i). // // NewSlice panics if the slice is outside the valid range of the record array. // NewSlice panics if j < i. -func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record { +func (rec *simpleRecord) NewSlice(i, j int64) arrow.RecordBatch { arrs := make([]arrow.Array, len(rec.arrs)) for ii, arr := range rec.arrs { arrs[ii] = NewSlice(arr, i, j) @@ -252,7 +262,7 @@ func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record { arr.Release() } }() - return NewRecord(rec.schema, arrs, j-i) + return NewRecordBatch(rec.schema, arrs, j-i) } func (rec *simpleRecord) String() string { @@ -325,13 +335,13 @@ func (b *RecordBuilder) Reserve(size int) { } } -// NewRecord creates a new record from the memory buffers and resets the -// RecordBuilder so it can be used to build a new record. +// NewRecordBatch creates a new record batch from the memory buffers and resets the +// RecordBuilder so it can be used to build a new record batch. // -// The returned Record must be Release()'d after use. +// The returned RecordBatch must be Release()'d after use. // -// NewRecord panics if the fields' builder do not have the same length. -func (b *RecordBuilder) NewRecord() arrow.Record { +// NewRecordBatch panics if the fields' builder do not have the same length. +func (b *RecordBuilder) NewRecordBatch() arrow.RecordBatch { cols := make([]arrow.Array, len(b.fields)) rows := int64(0) @@ -353,7 +363,12 @@ func (b *RecordBuilder) NewRecord() arrow.Record { rows = irow } - return NewRecord(b.schema, cols, rows) + return NewRecordBatch(b.schema, cols, rows) +} + +// Deprecated: Use [NewRecordBatch] instead. +func (b *RecordBuilder) NewRecord() arrow.Record { + return b.NewRecordBatch() } // UnmarshalJSON for record builder will read in a single object and add the values @@ -411,9 +426,9 @@ type iterReader struct { refCount atomic.Int64 schema *arrow.Schema - cur arrow.Record + cur arrow.RecordBatch - next func() (arrow.Record, error, bool) + next func() (arrow.RecordBatch, error, bool) stop func() err error @@ -434,7 +449,10 @@ func (ir *iterReader) Release() { } } -func (ir *iterReader) Record() arrow.Record { return ir.cur } +func (ir *iterReader) RecordBatch() arrow.RecordBatch { return ir.cur } + +// Deprecated: Use [RecordBatch] instead. +func (ir *iterReader) Record() arrow.Record { return ir.RecordBatch() } func (ir *iterReader) Err() error { return ir.err } func (ir *iterReader) Next() bool { @@ -452,9 +470,9 @@ func (ir *iterReader) Next() bool { return ok } -// ReaderFromIter wraps a go iterator for arrow.Record + error into a RecordReader +// ReaderFromIter wraps a go iterator for arrow.RecordBatch + error into a RecordReader // interface object for ease of use. -func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) RecordReader { +func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.RecordBatch, error]) RecordReader { next, stop := iter.Pull2(itr) rdr := &iterReader{ schema: schema, @@ -469,12 +487,12 @@ func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) Re // you can use range on. The semantics are still important, if a record // that is returned is desired to be utilized beyond the scope of an iteration // then Retain must be called on it. -func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] { +func IterFromReader(rdr RecordReader) iter.Seq2[arrow.RecordBatch, error] { rdr.Retain() return func(yield func(arrow.RecordBatch, error) bool) { defer rdr.Release() for rdr.Next() { - if !yield(rdr.Record(), nil) { + if !yield(rdr.RecordBatch(), nil) { return } } @@ -486,6 +504,6 @@ func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] { } var ( - _ arrow.Record = (*simpleRecord)(nil) - _ RecordReader = (*simpleRecords)(nil) + _ arrow.RecordBatch = (*simpleRecord)(nil) + _ RecordReader = (*simpleRecords)(nil) ) diff --git a/arrow/array/table.go b/arrow/array/table.go index 6c2f3365..9ba65bf2 100644 --- a/arrow/array/table.go +++ b/arrow/array/table.go @@ -320,8 +320,11 @@ func NewTableReader(tbl arrow.Table, chunkSize int64) *TableReader { return tr } -func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() } -func (tr *TableReader) Record() arrow.RecordBatch { return tr.rec } +func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() } +func (tr *TableReader) RecordBatch() arrow.RecordBatch { return tr.rec } + +// Deprecated: Use [RecordBatch] instead. +func (tr *TableReader) Record() arrow.Record { return tr.RecordBatch() } func (tr *TableReader) Next() bool { if tr.cur >= tr.max { diff --git a/arrow/array/util.go b/arrow/array/util.go index 9305e4c2..11430d86 100644 --- a/arrow/array/util.go +++ b/arrow/array/util.go @@ -180,7 +180,7 @@ func FromJSON(mem memory.Allocator, dt arrow.DataType, r io.Reader, opts ...From // RecordToStructArray constructs a struct array from the columns of the record batch // by referencing them, zero-copy. -func RecordToStructArray(rec arrow.Record) *Struct { +func RecordToStructArray(rec arrow.RecordBatch) *Struct { cols := make([]arrow.ArrayData, rec.NumCols()) for i, c := range rec.Columns() { cols[i] = c.Data() diff --git a/arrow/avro/reader.go b/arrow/avro/reader.go index bbab9975..db6de627 100644 --- a/arrow/avro/reader.go +++ b/arrow/avro/reader.go @@ -189,10 +189,17 @@ func (r *OCFReader) AvroSchema() string { return r.avroSchema } // Schema returns the converted Arrow schema of the Avro OCF func (r *OCFReader) Schema() *arrow.Schema { return r.schema } +// RecordBatch returns the current record batch that has been extracted from the +// underlying Avro OCF file. +// It is valid until the next call to Next. +func (r *OCFReader) RecordBatch() arrow.RecordBatch { return r.cur } + // Record returns the current record that has been extracted from the // underlying Avro OCF file. // It is valid until the next call to Next. -func (r *OCFReader) Record() arrow.RecordBatch { return r.cur } +// +// Deprecated: Use [RecordBatch] instead. +func (r *OCFReader) Record() arrow.Record { return r.RecordBatch() } // Metrics returns the maximum queue depth of the Avro record read cache and of the // converted Arrow record cache. diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go index 3e9e5e79..b38ceb5f 100644 --- a/arrow/cdata/cdata.go +++ b/arrow/cdata/cdata.go @@ -959,8 +959,11 @@ type nativeCRecordBatchReader struct { func (n *nativeCRecordBatchReader) Retain() {} func (n *nativeCRecordBatchReader) Release() {} -func (n *nativeCRecordBatchReader) Err() error { return n.err } -func (n *nativeCRecordBatchReader) Record() arrow.RecordBatch { return n.cur } +func (n *nativeCRecordBatchReader) Err() error { return n.err } +func (n *nativeCRecordBatchReader) RecordBatch() arrow.RecordBatch { return n.cur } + +// Deprecated: Use [RecordBatch] instead. +func (n *nativeCRecordBatchReader) Record() arrow.Record { return n.RecordBatch() } func (n *nativeCRecordBatchReader) Next() bool { err := n.next() diff --git a/arrow/cdata/cdata_test.go b/arrow/cdata/cdata_test.go index f06fc743..f504ce99 100644 --- a/arrow/cdata/cdata_test.go +++ b/arrow/cdata/cdata_test.go @@ -980,10 +980,13 @@ func (r *failingReader) Next() bool { r.opCount -= 1 return r.opCount > 0 } -func (r *failingReader) Record() arrow.RecordBatch { +func (r *failingReader) RecordBatch() arrow.RecordBatch { arrdata.Records["primitives"][0].Retain() return arrdata.Records["primitives"][0] } +func (r *failingReader) Record() arrow.Record { + return r.RecordBatch() +} func (r *failingReader) Err() error { if r.opCount == 0 { return fmt.Errorf("Expected error message") diff --git a/arrow/csv/reader.go b/arrow/csv/reader.go index 98b4a971..d83842b9 100644 --- a/arrow/csv/reader.go +++ b/arrow/csv/reader.go @@ -222,10 +222,17 @@ func (r *Reader) Err() error { return r.err } func (r *Reader) Schema() *arrow.Schema { return r.schema } +// RecordBatch returns the current record batch that has been extracted from the +// underlying CSV file. +// It is valid until the next call to Next. +func (r *Reader) RecordBatch() arrow.RecordBatch { return r.cur } + // Record returns the current record that has been extracted from the // underlying CSV file. // It is valid until the next call to Next. -func (r *Reader) Record() arrow.RecordBatch { return r.cur } +// +// Deprecated: Use [RecordBatch] instead. +func (r *Reader) Record() arrow.Record { return r.RecordBatch() } // Next returns whether a Record could be extracted from the underlying CSV file. // diff --git a/arrow/flight/flightsql/client.go b/arrow/flight/flightsql/client.go index f7660e31..cdba3620 100644 --- a/arrow/flight/flightsql/client.go +++ b/arrow/flight/flightsql/client.go @@ -1097,7 +1097,7 @@ type PreparedStatement struct { handle []byte datasetSchema *arrow.Schema paramSchema *arrow.Schema - paramBinding arrow.Record + paramBinding arrow.RecordBatch streamBinding array.RecordReader closed bool } @@ -1373,7 +1373,7 @@ func (p *PreparedStatement) clearParameters() { // from under the statement. Release will be called on a previous binding // record or reader if it existed, and will be called upon calling Close on the // PreparedStatement. -func (p *PreparedStatement) SetParameters(binding arrow.Record) { +func (p *PreparedStatement) SetParameters(binding arrow.RecordBatch) { p.clearParameters() p.paramBinding = binding if p.paramBinding != nil { diff --git a/arrow/flight/flightsql/example/sql_batch_reader.go b/arrow/flight/flightsql/example/sql_batch_reader.go index 74444db3..f3dce1df 100644 --- a/arrow/flight/flightsql/example/sql_batch_reader.go +++ b/arrow/flight/flightsql/example/sql_batch_reader.go @@ -104,7 +104,7 @@ type SqlBatchReader struct { schema *arrow.Schema rows *sql.Rows - record arrow.Record + record arrow.RecordBatch bldr *array.RecordBuilder err error @@ -253,7 +253,10 @@ func (r *SqlBatchReader) Release() { } func (r *SqlBatchReader) Schema() *arrow.Schema { return r.schema } -func (r *SqlBatchReader) Record() arrow.Record { return r.record } +func (r *SqlBatchReader) RecordBatch() arrow.RecordBatch { return r.record } + +// Deprecated: Use [RecordBatch] instead. +func (r *SqlBatchReader) Record() arrow.Record { return r.RecordBatch() } func (r *SqlBatchReader) Err() error { return r.err } diff --git a/arrow/flight/flightsql/example/sqlite_server.go b/arrow/flight/flightsql/example/sqlite_server.go index e4b501b8..d5f61980 100644 --- a/arrow/flight/flightsql/example/sqlite_server.go +++ b/arrow/flight/flightsql/example/sqlite_server.go @@ -363,7 +363,7 @@ func (s *SQLiteFlightSQLServer) GetFlightInfoXdbcTypeInfo(_ context.Context, _ f } func (s *SQLiteFlightSQLServer) DoGetXdbcTypeInfo(_ context.Context, cmd flightsql.GetXdbcTypeInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) { - var batch arrow.Record + var batch arrow.RecordBatch if cmd.GetDataType() == nil { batch = GetTypeInfoResult(s.Alloc) } else { diff --git a/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go b/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go index 17ca3c5f..99840b5d 100644 --- a/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go +++ b/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go @@ -44,7 +44,7 @@ type SqliteTablesSchemaBatchReader struct { rdr array.RecordReader stmt *sql.Stmt schemaBldr *array.BinaryBuilder - record arrow.Record + record arrow.RecordBatch err error } @@ -94,7 +94,10 @@ func (s *SqliteTablesSchemaBatchReader) Schema() *arrow.Schema { return arrow.NewSchema(fields, nil) } -func (s *SqliteTablesSchemaBatchReader) Record() arrow.Record { return s.record } +func (s *SqliteTablesSchemaBatchReader) RecordBatch() arrow.RecordBatch { return s.record } + +// Deprecated: Use [RecordBatch] instead. +func (s *SqliteTablesSchemaBatchReader) Record() arrow.Record { return s.RecordBatch() } func getSqlTypeFromTypeName(sqltype string) int { if sqltype == "" { diff --git a/arrow/flight/flightsql/example/type_info.go b/arrow/flight/flightsql/example/type_info.go index 57734f0c..35ecef65 100644 --- a/arrow/flight/flightsql/example/type_info.go +++ b/arrow/flight/flightsql/example/type_info.go @@ -28,7 +28,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/memory" ) -func GetTypeInfoResult(mem memory.Allocator) arrow.Record { +func GetTypeInfoResult(mem memory.Allocator) arrow.RecordBatch { typeNames, _, _ := array.FromJSON(mem, arrow.BinaryTypes.String, strings.NewReader(`["bit", "tinyint", "bigint", "longvarbinary", "varbinary", "text", "longvarchar", "char", @@ -97,7 +97,7 @@ func GetTypeInfoResult(mem memory.Allocator) arrow.Record { sqlDataType, sqlDateTimeSub, numPrecRadix, intervalPrecision}, 17) } -func GetFilteredTypeInfoResult(mem memory.Allocator, filter int32) arrow.Record { +func GetFilteredTypeInfoResult(mem memory.Allocator, filter int32) arrow.RecordBatch { batch := GetTypeInfoResult(mem) defer batch.Release() diff --git a/arrow/flight/record_batch_reader.go b/arrow/flight/record_batch_reader.go index 30a5bcb4..5f113682 100644 --- a/arrow/flight/record_batch_reader.go +++ b/arrow/flight/record_batch_reader.go @@ -121,7 +121,7 @@ func (r *Reader) Release() { // LatestAppMetadata returns the bytes from the AppMetadata field of the // most recently read FlightData message that was processed by calling // the Next function. The metadata returned would correspond to the record -// retrieved by calling Record(). +// retrieved by calling RecordBatch(). func (r *Reader) LatestAppMetadata() []byte { return r.dmr.lastAppMetadata } @@ -129,7 +129,7 @@ func (r *Reader) LatestAppMetadata() []byte { // LatestFlightDescriptor returns a pointer to the last FlightDescriptor object // that was received in the most recently read FlightData message that was // processed by calling the Next function. The descriptor returned would correspond -// to the record retrieved by calling Record(). +// to the record batch retrieved by calling RecordBatch(). func (r *Reader) LatestFlightDescriptor() *FlightDescriptor { return r.dmr.descr } @@ -188,7 +188,7 @@ func DeserializeSchema(info []byte, mem memory.Allocator) (*arrow.Schema, error) // StreamChunk represents a single chunk of a FlightData stream type StreamChunk struct { - Data arrow.Record + Data arrow.RecordBatch Desc *FlightDescriptor AppMetadata []byte Err error diff --git a/arrow/flight/record_batch_writer.go b/arrow/flight/record_batch_writer.go index 3d291788..9cfb6bd8 100644 --- a/arrow/flight/record_batch_writer.go +++ b/arrow/flight/record_batch_writer.go @@ -74,7 +74,7 @@ func (w *Writer) SetFlightDescriptor(descr *FlightDescriptor) { } // Write writes a recordbatch payload and returns any error, implementing the arrio.Writer interface -func (w *Writer) Write(rec arrow.Record) error { +func (w *Writer) Write(rec arrow.RecordBatch) error { if w.pw.fd.FlightDescriptor != nil { defer func() { w.pw.fd.FlightDescriptor = nil @@ -83,9 +83,9 @@ func (w *Writer) Write(rec arrow.Record) error { return w.Writer.Write(rec) } -// WriteWithAppMetadata will write this record with the supplied application +// WriteWithAppMetadata will write this record batch with the supplied application // metadata attached in the flightData message. -func (w *Writer) WriteWithAppMetadata(rec arrow.Record, appMeta []byte) error { +func (w *Writer) WriteWithAppMetadata(rec arrow.RecordBatch, appMeta []byte) error { w.pw.fd.AppMetadata = appMeta defer func() { w.pw.fd.AppMetadata = nil diff --git a/arrow/internal/flight_integration/scenario.go b/arrow/internal/flight_integration/scenario.go index a640d06b..106669e9 100644 --- a/arrow/internal/flight_integration/scenario.go +++ b/arrow/internal/flight_integration/scenario.go @@ -101,10 +101,10 @@ func initServer(port int, srv flight.Server) int { type integrationDataSet struct { schema *arrow.Schema - chunks []arrow.Record + chunks []arrow.RecordBatch } -func consumeFlightLocation(ctx context.Context, loc *flight.Location, tkt *flight.Ticket, orig []arrow.Record, opts ...grpc.DialOption) error { +func consumeFlightLocation(ctx context.Context, loc *flight.Location, tkt *flight.Ticket, orig []arrow.RecordBatch, opts ...grpc.DialOption) error { client, err := flight.NewClientWithMiddleware(loc.GetUri(), nil, nil, opts...) if err != nil { return err @@ -177,7 +177,7 @@ func (s *defaultIntegrationTester) RunClient(addr string, opts ...grpc.DialOptio } dataSet := integrationDataSet{ - chunks: make([]arrow.Record, 0), + chunks: make([]arrow.RecordBatch, 0), schema: rdr.Schema(), } @@ -332,7 +332,7 @@ func (s *defaultIntegrationTester) DoPut(stream flight.FlightService_DoPutServer key = desc.Path[0] dataset.schema = rdr.Schema() - dataset.chunks = make([]arrow.Record, 0) + dataset.chunks = make([]arrow.RecordBatch, 0) for rdr.Next() { rec := rdr.Record() rec.Retain() @@ -568,7 +568,7 @@ func (o *orderedScenarioTester) RunClient(addr string, opts ...grpc.DialOption) return fmt.Errorf("expected to server return FlightInfo.ordered = true") } - var recs []arrow.Record + var recs []arrow.RecordBatch for _, ep := range info.Endpoint { if len(ep.Location) != 0 { return fmt.Errorf("expected to receive empty locations to use the original service: %s", @@ -931,7 +931,7 @@ func (tester *expirationTimeDoGetScenarioTester) RunClient(addr string, opts ... return err } - var recs []arrow.Record + var recs []arrow.RecordBatch for _, ep := range info.Endpoint { if len(recs) == 0 { if ep.ExpirationTime != nil { @@ -3078,7 +3078,7 @@ func getIngestRecords() array.RecordReader { rec := array.NewRecord(schema, []arrow.Array{arr}, ingestStatementExpectedRows) defer rec.Release() - rdr, _ := array.NewRecordReader(schema, []arrow.Record{rec}) + rdr, _ := array.NewRecordReader(schema, []arrow.RecordBatch{rec}) return rdr } diff --git a/arrow/ipc/file_reader.go b/arrow/ipc/file_reader.go index 9135529d..2d83fcc7 100644 --- a/arrow/ipc/file_reader.go +++ b/arrow/ipc/file_reader.go @@ -159,7 +159,7 @@ type FileReader struct { memo dictutils.Memo schema *arrow.Schema - record arrow.Record + record arrow.RecordBatch irec int // current record index. used for the arrio.Reader interface err error // last error @@ -355,11 +355,11 @@ func (f *FileReader) Close() error { return nil } -// Record returns the i-th record from the file. -// The returned value is valid until the next call to Record. -// Users need to call Retain on that Record to keep it valid for longer. -func (f *FileReader) Record(i int) (arrow.Record, error) { - record, err := f.RecordAt(i) +// RecordBatch returns the i-th record batch from the file. +// The returned value is valid until the next call to RecordBatch. +// Users need to call Retain on that RecordBatch to keep it valid for longer. +func (f *FileReader) RecordBatch(i int) (arrow.RecordBatch, error) { + record, err := f.RecordBatchAt(i) if err != nil { return nil, err } @@ -372,10 +372,19 @@ func (f *FileReader) Record(i int) (arrow.Record, error) { return record, nil } -// Record returns the i-th record from the file. Ownership is transferred to the +// Record returns the i-th record from the file. +// The returned value is valid until the next call to Record. +// Users need to call Retain on that Record to keep it valid for longer. +// +// Deprecated: Use [RecordBatch] instead. +func (f *FileReader) Record(i int) (arrow.Record, error) { + return f.RecordBatch(i) +} + +// RecordBatchAt returns the i-th record batch from the file. Ownership is transferred to the // caller and must call Release() to free the memory. This method is safe to // call concurrently. -func (f *FileReader) RecordAt(i int) (arrow.Record, error) { +func (f *FileReader) RecordBatchAt(i int) (arrow.RecordBatch, error) { if i < 0 || i > f.NumRecords() { panic("arrow/ipc: record index out of bounds") } @@ -400,32 +409,41 @@ func (f *FileReader) RecordAt(i int) (arrow.Record, error) { defer msg.Release() if msg.Type() != MessageRecordBatch { - return nil, fmt.Errorf("arrow/ipc: message %d is not a Record", i) + return nil, fmt.Errorf("arrow/ipc: message %d is not a RecordBatch", i) } - return newRecord(f.schema, &f.memo, msg.meta, msg.body, f.swapEndianness, f.mem), nil + return newRecordBatch(f.schema, &f.memo, msg.meta, msg.body, f.swapEndianness, f.mem), nil } -// Read reads the current record from the underlying stream and an error, if any. +// RecordAt returns the i-th record from the file. Ownership is transferred to the +// caller and must call Release() to free the memory. This method is safe to +// call concurrently. +// +// Deprecated: Use [RecordBatchAt] instead. +func (f *FileReader) RecordAt(i int) (arrow.Record, error) { + return f.RecordBatchAt(i) +} + +// Read reads the current record batch from the underlying stream and an error, if any. // When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF). // -// The returned record value is valid until the next call to Read. -// Users need to call Retain on that Record to keep it valid for longer. -func (f *FileReader) Read() (rec arrow.Record, err error) { +// The returned record batch value is valid until the next call to Read. +// Users need to call Retain on that RecordBatch to keep it valid for longer. +func (f *FileReader) Read() (rec arrow.RecordBatch, err error) { if f.irec == f.NumRecords() { return nil, io.EOF } - rec, f.err = f.Record(f.irec) + rec, f.err = f.RecordBatch(f.irec) f.irec++ return rec, f.err } -// ReadAt reads the i-th record from the underlying stream and an error, if any. -func (f *FileReader) ReadAt(i int64) (arrow.Record, error) { - return f.Record(int(i)) +// ReadAt reads the i-th record batch from the underlying stream and an error, if any. +func (f *FileReader) ReadAt(i int64) (arrow.RecordBatch, error) { + return f.RecordBatch(int(i)) } -func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buffer, body *memory.Buffer, swapEndianness bool, mem memory.Allocator) arrow.Record { +func newRecordBatch(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buffer, body *memory.Buffer, swapEndianness bool, mem memory.Allocator) arrow.RecordBatch { var ( msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) md flatbuf.RecordBatch diff --git a/arrow/ipc/file_writer.go b/arrow/ipc/file_writer.go index 2aa0c9b4..ff043ec9 100644 --- a/arrow/ipc/file_writer.go +++ b/arrow/ipc/file_writer.go @@ -296,7 +296,7 @@ func (f *FileWriter) Close() error { return nil } -func (f *FileWriter) Write(rec arrow.Record) error { +func (f *FileWriter) Write(rec arrow.RecordBatch) error { schema := rec.Schema() if schema == nil || !schema.Equal(f.schema) { return errInconsistentSchema diff --git a/arrow/ipc/reader.go b/arrow/ipc/reader.go index 1934c719..9d7096c0 100644 --- a/arrow/ipc/reader.go +++ b/arrow/ipc/reader.go @@ -40,7 +40,7 @@ type Reader struct { schema *arrow.Schema refCount atomic.Int64 - rec arrow.Record + rec arrow.RecordBatch err error // types dictTypeMap @@ -164,7 +164,7 @@ func (r *Reader) Release() { } } -// Next returns whether a Record could be extracted from the underlying stream. +// Next returns whether a RecordBatch could be extracted from the underlying stream. func (r *Reader) Next() bool { if r.rec != nil { r.rec.Release() @@ -252,20 +252,29 @@ func (r *Reader) next() bool { return false } - r.rec = newRecord(r.schema, &r.memo, msg.meta, msg.body, r.swapEndianness, r.mem) + r.rec = newRecordBatch(r.schema, &r.memo, msg.meta, msg.body, r.swapEndianness, r.mem) return true } +// RecordBatch returns the current record batch that has been extracted from the +// underlying stream. +// It is valid until the next call to Next. +func (r *Reader) RecordBatch() arrow.RecordBatch { + return r.rec +} + // Record returns the current record that has been extracted from the // underlying stream. // It is valid until the next call to Next. +// +// Deprecated: Use [RecordBatch] instead. func (r *Reader) Record() arrow.Record { - return r.rec + return r.RecordBatch() } -// Read reads the current record from the underlying stream and an error, if any. +// Read reads the current record batch from the underlying stream and an error, if any. // When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF). -func (r *Reader) Read() (arrow.Record, error) { +func (r *Reader) Read() (arrow.RecordBatch, error) { if r.rec != nil { r.rec.Release() r.rec = nil diff --git a/arrow/ipc/writer.go b/arrow/ipc/writer.go index 96f082fb..ab068511 100644 --- a/arrow/ipc/writer.go +++ b/arrow/ipc/writer.go @@ -152,7 +152,7 @@ func (w *Writer) Close() error { return nil } -func (w *Writer) Write(rec arrow.Record) (err error) { +func (w *Writer) Write(rec arrow.RecordBatch) (err error) { defer func() { if pErr := recover(); pErr != nil { err = utils.FormatRecoveredError("arrow/ipc: unknown error while writing", pErr) @@ -204,7 +204,7 @@ func (w *Writer) Write(rec arrow.Record) (err error) { return w.pw.WritePayload(data) } -func writeDictionaryPayloads(mem memory.Allocator, batch arrow.Record, isFileFormat bool, emitDictDeltas bool, mapper *dictutils.Mapper, lastWrittenDicts map[int64]arrow.Array, pw PayloadWriter, encoder *recordEncoder) error { +func writeDictionaryPayloads(mem memory.Allocator, batch arrow.RecordBatch, isFileFormat bool, emitDictDeltas bool, mapper *dictutils.Mapper, lastWrittenDicts map[int64]arrow.Array, pw PayloadWriter, encoder *recordEncoder) error { dictionaries, err := dictutils.CollectDictionaries(batch, mapper) if err != nil { return err @@ -467,7 +467,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) error { return <-errch } -func (w *recordEncoder) encode(p *Payload, rec arrow.Record) error { +func (w *recordEncoder) encode(p *Payload, rec arrow.RecordBatch) error { // perform depth-first traversal of the row-batch for i, col := range rec.Columns() { err := w.visit(p, col) @@ -1033,7 +1033,7 @@ func (w *recordEncoder) rebaseDenseUnionValueOffsets(arr *array.DenseUnion, offs return shiftedOffsetsBuf } -func (w *recordEncoder) Encode(p *Payload, rec arrow.Record) error { +func (w *recordEncoder) Encode(p *Payload, rec arrow.RecordBatch) error { if err := w.encode(p, rec); err != nil { return err } @@ -1087,7 +1087,7 @@ func needTruncate(offset int64, buf *memory.Buffer, minLength int64) bool { // GetRecordBatchPayload produces the ipc payload for a given record batch. // The resulting payload itself must be released by the caller via the Release // method after it is no longer needed. -func GetRecordBatchPayload(batch arrow.Record, opts ...Option) (Payload, error) { +func GetRecordBatchPayload(batch arrow.RecordBatch, opts ...Option) (Payload, error) { cfg := newConfig(opts...) var ( data = Payload{msg: MessageRecordBatch} diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go index d19177d9..b7f95178 100644 --- a/parquet/pqarrow/file_reader.go +++ b/parquet/pqarrow/file_reader.go @@ -879,7 +879,10 @@ func (r *recordReader) Next() bool { return r.next() } -func (r *recordReader) Record() arrow.Record { return r.cur } +func (r *recordReader) RecordBatch() arrow.RecordBatch { return r.cur } + +// Deprecated: Use [RecordBatch] instead. +func (r *recordReader) Record() arrow.Record { return r.RecordBatch() } func (r *recordReader) Err() error { if r.err == io.EOF {