This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 23f3c6cb9 chore(go/adbc): Bump github.com/apache/arrow-go/v18 from
18.4.0 to 18.4.1 in /go/adbc (#3410)
23f3c6cb9 is described below
commit 23f3c6cb9ae7a01f9f9452a99ada3ac3bb3fb39f
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
AuthorDate: Tue Sep 9 10:43:25 2025 +0900
chore(go/adbc): Bump github.com/apache/arrow-go/v18 from 18.4.0 to 18.4.1
in /go/adbc (#3410)
Bumps
[github.com/apache/arrow-go/v18](https://github.com/apache/arrow-go)
from 18.4.0 to 18.4.1.
---------
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: David Li <[email protected]>
---
go/adbc/README.md | 8 +-
go/adbc/adbc.go | 2 +-
go/adbc/driver/bigquery/driver_test.go | 42 +++++------
go/adbc/driver/bigquery/record_reader.go | 30 +++++---
go/adbc/driver/bigquery/statement.go | 10 +--
go/adbc/driver/flightsql/cmd/testserver/main.go | 8 +-
go/adbc/driver/flightsql/example_usage_test.go | 2 +-
.../driver/flightsql/flightsql_adbc_server_test.go | 14 ++--
go/adbc/driver/flightsql/flightsql_adbc_test.go | 8 +-
go/adbc/driver/flightsql/flightsql_connection.go | 18 ++---
go/adbc/driver/flightsql/flightsql_database.go | 2 +-
go/adbc/driver/flightsql/flightsql_statement.go | 2 +-
go/adbc/driver/flightsql/record_reader.go | 24 +++---
go/adbc/driver/flightsql/record_reader_test.go | 4 +-
go/adbc/driver/internal/driverbase/connection.go | 12 +--
go/adbc/driver/internal/driverbase/driver_test.go | 6 +-
go/adbc/driver/internal/shared_utils.go | 4 +-
go/adbc/driver/panicdummy/panicdummy_adbc.go | 2 +-
go/adbc/driver/snowflake/binding.go | 6 +-
go/adbc/driver/snowflake/bulk_ingestion.go | 14 ++--
go/adbc/driver/snowflake/bulk_ingestion_test.go | 10 +--
go/adbc/driver/snowflake/concat_reader.go | 9 ++-
go/adbc/driver/snowflake/driver_test.go | 88 +++++++++++-----------
go/adbc/driver/snowflake/record_reader.go | 36 +++++----
go/adbc/driver/snowflake/statement.go | 4 +-
go/adbc/drivermgr/wrapper.go | 2 +-
go/adbc/drivermgr/wrapper_sqlite_test.go | 32 ++++----
go/adbc/ext.go | 2 +-
go/adbc/go.mod | 4 +-
go/adbc/go.sum | 8 +-
go/adbc/pkg/_tmpl/driver.go.tmpl | 2 +-
go/adbc/pkg/bigquery/driver.go | 2 +-
go/adbc/pkg/flightsql/driver.go | 2 +-
go/adbc/pkg/panicdummy/driver.go | 2 +-
go/adbc/pkg/snowflake/driver.go | 2 +-
go/adbc/sqldriver/driver.go | 10 +--
go/adbc/sqldriver/driver_internals_test.go | 18 ++---
go/adbc/validation/validation.go | 42 +++++------
38 files changed, 258 insertions(+), 235 deletions(-)
diff --git a/go/adbc/README.md b/go/adbc/README.md
index 26902149f..8c7fc78bc 100644
--- a/go/adbc/README.md
+++ b/go/adbc/README.md
@@ -115,7 +115,7 @@ We can execute a query and get the results as Arrow data:
fmt.Println("Rows affected: ", n)
for reader.Next() {
- record := reader.Record()
+ record := reader.RecordBatch()
// Extract our three columns
col0 := record.Column(0)
@@ -199,7 +199,7 @@ First, let's prepare some data.
}
defer table.Release()
- reader3, err := array.NewRecordReader(schema, []arrow.Record{table})
+ reader3, err := array.NewRecordReader(schema, []arrow.RecordBatch{table})
if err != nil {
return err
}
@@ -235,7 +235,7 @@ We can get information about the driver and the database:
// Process the info results...
for infoReader.Next() {
- record := infoReader.Record()
+ record := infoReader.RecordBatch()
// Extract vendor name, driver name, etc. from the record
}
```
@@ -261,7 +261,7 @@ Note: `GetObjects` takes an optional set of filters which
control which objects
// Process the objects results to get catalog/schema/table information
for objectsReader.Next() {
- record := objectsReader.Record()
+ record := objectsReader.RecordBatch()
// Navigate the nested structure for catalogs, schemas, tables, columns
}
```
diff --git a/go/adbc/adbc.go b/go/adbc/adbc.go
index 075a27814..e64decf26 100644
--- a/go/adbc/adbc.go
+++ b/go/adbc/adbc.go
@@ -678,7 +678,7 @@ type Statement interface {
// The driver will call release on the passed in Record when it is done,
// but it may not do this until the statement is closed or another
// record is bound.
- Bind(ctx context.Context, values arrow.Record) error
+ Bind(ctx context.Context, values arrow.RecordBatch) error
// BindStream uses a record batch stream to bind parameters for this
// query. This can be used for bulk inserts or prepared statements.
diff --git a/go/adbc/driver/bigquery/driver_test.go
b/go/adbc/driver/bigquery/driver_test.go
index 69c35b288..bc9b6d754 100644
--- a/go/adbc/driver/bigquery/driver_test.go
+++ b/go/adbc/driver/bigquery/driver_test.go
@@ -52,7 +52,7 @@ type BigQueryQuirks struct {
schemaName string
}
-func (q *BigQueryQuirks) CreateSampleTable(tableName string, r arrow.Record)
(err error) {
+func (q *BigQueryQuirks) CreateSampleTable(tableName string, r
arrow.RecordBatch) (err error) {
var buf bytes.Buffer
w, err := pqarrow.NewFileWriter(
@@ -148,7 +148,7 @@ func (q *BigQueryQuirks) quoteTblName(name string) string {
return fmt.Sprintf("`%s.%s`", q.schemaName, strings.ReplaceAll(name,
"\"", "\"\""))
}
-func (q *BigQueryQuirks) CreateSampleTableWithRecords(tableName string, r
arrow.Record) error {
+func (q *BigQueryQuirks) CreateSampleTableWithRecords(tableName string, r
arrow.RecordBatch) error {
var b strings.Builder
b.WriteString("CREATE OR REPLACE TABLE ")
b.WriteString(q.quoteTblName(tableName))
@@ -501,7 +501,7 @@ func samplePrimitiveTypeSchema() (*arrow.Schema,
*arrow.Schema) {
return input, expected
}
-func buildSamplePrimitiveTypeRecord(mem memory.Allocator, schema, bigquery
*arrow.Schema) (arrow.Record, arrow.Record) {
+func buildSamplePrimitiveTypeRecord(mem memory.Allocator, schema, bigquery
*arrow.Schema) (arrow.RecordBatch, arrow.RecordBatch) {
bldr := array.NewRecordBuilder(mem, schema)
defer bldr.Release()
@@ -551,7 +551,7 @@ func buildSamplePrimitiveTypeRecord(mem memory.Allocator,
schema, bigquery *arro
bldr2.Field(11).(*array.TimestampBuilder).AppendValues(bigQueryTimestamps, nil)
bldr2.Field(12).(*array.TimestampBuilder).AppendValues(bigQueryTimestamps, nil)
- return bldr.NewRecord(), bldr2.NewRecord()
+ return bldr.NewRecordBatch(), bldr2.NewRecordBatch()
}
func withQuirks(t *testing.T, fn func(quirks *BigQueryQuirks)) {
@@ -695,7 +695,7 @@ func (suite *BigQueryTests) TestEmptyResultSet() {
recv := int64(0)
for rdr.Next() {
- recv += rdr.Record().NumRows()
+ recv += rdr.RecordBatch().NumRows()
}
// verify that we got the expected number of rows if we sum up
@@ -721,7 +721,7 @@ func (suite *BigQueryTests) TestSqlBulkInsertRecords() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- resultBind := rdr.Record()
+ resultBind := rdr.RecordBatch()
suite.Truef(array.RecordEqual(expectedRec, resultBind), "expected:
%s\ngot: %s", expectedRec, resultBind)
suite.False(rdr.Next())
@@ -736,7 +736,7 @@ func (suite *BigQueryTests) TestSqlBulkInsertStreams() {
defer rec.Release()
defer expectedRec.Release()
- stream, err := array.NewRecordReader(input, []arrow.Record{rec})
+ stream, err := array.NewRecordReader(input, []arrow.RecordBatch{rec})
suite.Require().NoError(err)
defer stream.Release()
@@ -750,7 +750,7 @@ func (suite *BigQueryTests) TestSqlBulkInsertStreams() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- resultBindStream := rdr.Record()
+ resultBindStream := rdr.RecordBatch()
suite.Truef(array.RecordEqual(expectedRec, resultBindStream),
"expected: %s\ngot: %s", expectedRec, resultBindStream)
suite.False(rdr.Next())
@@ -804,7 +804,7 @@ func (suite *BigQueryTests) TestSqlIngestTimestampTypes() {
bldr.Field(5).(*array.TimestampBuilder).AppendValues([]arrow.Timestamp{1, 2,
3}, nil)
bldr.Field(6).(*array.TimestampBuilder).AppendValues([]arrow.Timestamp{1, 2,
3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
err := suite.Quirks.CreateSampleTableWithRecords(tableName, rec)
@@ -817,7 +817,7 @@ func (suite *BigQueryTests) TestSqlIngestTimestampTypes() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{
@@ -911,7 +911,7 @@ func (suite *BigQueryTests) TestSqlIngestDate64Type() {
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
bldr.Field(1).(*array.Date64Builder).AppendValues([]arrow.Date64{86400000,
172800000, 259200000}, nil) // 1,2,3 days of milliseconds
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
err := suite.Quirks.CreateSampleTableWithRecords(tableName, rec)
@@ -924,7 +924,7 @@ func (suite *BigQueryTests) TestSqlIngestDate64Type() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{
@@ -1016,7 +1016,7 @@ func (suite *BigQueryTests) TestSqlIngestDecimal() {
suite.Require().NoError(err)
bldr.Field(5).(*array.Decimal256Builder).AppendValues([]decimal256.Num{d256num1,
d256num2, d256num3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
err = suite.Quirks.CreateSampleTableWithRecords(tableName, rec)
@@ -1029,7 +1029,7 @@ func (suite *BigQueryTests) TestSqlIngestDecimal() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{
@@ -1096,7 +1096,7 @@ func (suite *BigQueryTests) TestSqlIngestDecimal() {
suite.Require().NoError(err)
bldr2.Field(5).(*array.Decimal256Builder).AppendValues([]decimal256.Num{expectedD256Num1,
expectedD256Num2, expectedD256Num3}, nil)
- expectedRec := bldr2.NewRecord()
+ expectedRec := bldr2.NewRecordBatch()
defer expectedRec.Release()
suite.Truef(array.RecordEqual(expectedRec, result), "expected: %s\ngot:
%s", expectedRec, result)
@@ -1151,7 +1151,7 @@ func (suite *BigQueryTests) TestSqlIngestListType() {
listvalbldr.AppendValues(strRow3, nil)
listbldr.AppendValues(offsets, valid)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
err := suite.Quirks.CreateSampleTableWithRecords(tableName, rec)
@@ -1164,7 +1164,7 @@ func (suite *BigQueryTests) TestSqlIngestListType() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
// Array cannot be NULL
expectedSchema := arrow.NewSchema([]arrow.Field{
@@ -1259,7 +1259,7 @@ func (suite *BigQueryTests) TestSqlIngestStructType() {
struct3bldr.FieldBuilder(0).(*array.Int64Builder).AppendValues([]int64{1, 2,
3}, nil)
struct3bldr.FieldBuilder(1).(*array.BooleanBuilder).AppendValues([]bool{true,
false, false}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
err := suite.Quirks.CreateSampleTableWithRecords(tableName, rec)
@@ -1272,7 +1272,7 @@ func (suite *BigQueryTests) TestSqlIngestStructType() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(),
sc, bytes.NewReader([]byte(`
[
@@ -1347,7 +1347,7 @@ func (suite *BigQueryTests)
TestMetadataGetObjectsColumnsXdbc() {
bldr.Field(2).(*array.StringBuilder).AppendValues([]string{"foo", "",
""}, []bool{true, false, true})
bldr.Field(3).(*array.TimestampBuilder).AppendValues([]arrow.Timestamp{arrow.Timestamp(1),
arrow.Timestamp(2), arrow.Timestamp(3)}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.Quirks.CreateSampleTable("bulk_ingest",
rec))
@@ -1410,7 +1410,7 @@ func (suite *BigQueryTests)
TestMetadataGetObjectsColumnsXdbc() {
suite.Truef(adbc.GetObjectsSchema.Equal(rdr.Schema()), "expected:
%s\ngot: %s", adbc.GetObjectsSchema, rdr.Schema())
suite.True(rdr.Next())
- rec = rdr.Record()
+ rec = rdr.RecordBatch()
suite.Greater(rec.NumRows(), int64(0))
suite.True(rec.Schema().Equal(adbc.GetObjectsSchema))
var (
diff --git a/go/adbc/driver/bigquery/record_reader.go
b/go/adbc/driver/bigquery/record_reader.go
index 388976564..366cb31cb 100644
--- a/go/adbc/driver/bigquery/record_reader.go
+++ b/go/adbc/driver/bigquery/record_reader.go
@@ -36,9 +36,9 @@ import (
type reader struct {
refCount int64
schema *arrow.Schema
- chs []chan arrow.Record
+ chs []chan arrow.RecordBatch
curChIndex int
- rec arrow.Record
+ rec arrow.RecordBatch
err error
cancelFn context.CancelFunc
@@ -100,7 +100,7 @@ func ipcReaderFromArrowIterator(arrowIterator
bigquery.ArrowIterator, alloc memo
return ipc.NewReader(arrowItReader, ipc.WithAllocator(alloc))
}
-func getQueryParameter(values arrow.Record, row int, parameterMode string)
([]bigquery.QueryParameter, error) {
+func getQueryParameter(values arrow.RecordBatch, row int, parameterMode
string) ([]bigquery.QueryParameter, error) {
parameters := make([]bigquery.QueryParameter, values.NumCols())
includeName := parameterMode == OptionValueQueryParameterModeNamed
schema := values.Schema()
@@ -127,9 +127,9 @@ func runPlainQuery(ctx context.Context, query
*bigquery.Query, alloc memory.Allo
return nil, -1, err
}
- chs := make([]chan arrow.Record, 1)
+ chs := make([]chan arrow.RecordBatch, 1)
ctx, cancelFn := context.WithCancel(ctx)
- ch := make(chan arrow.Record, resultRecordBufferSize)
+ ch := make(chan arrow.RecordBatch, resultRecordBufferSize)
chs[0] = ch
defer func() {
@@ -151,7 +151,7 @@ func runPlainQuery(ctx context.Context, query
*bigquery.Query, alloc memory.Allo
go func() {
defer rdr.Release()
for rdr.Next() && ctx.Err() == nil {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
rec.Retain()
ch <- rec
}
@@ -162,7 +162,7 @@ func runPlainQuery(ctx context.Context, query
*bigquery.Query, alloc memory.Allo
return bigqueryRdr, totalRows, nil
}
-func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group,
query *bigquery.Query, rec arrow.Record, ch chan arrow.Record, parameterMode
string, alloc memory.Allocator, rdrSchema func(schema *arrow.Schema)) (int64,
error) {
+func queryRecordWithSchemaCallback(ctx context.Context, group *errgroup.Group,
query *bigquery.Query, rec arrow.RecordBatch, ch chan arrow.RecordBatch,
parameterMode string, alloc memory.Allocator, rdrSchema func(schema
*arrow.Schema)) (int64, error) {
totalRows := int64(-1)
for i := 0; i < int(rec.NumRows()); i++ {
parameters, err := getQueryParameter(rec, i, parameterMode)
@@ -186,7 +186,7 @@ func queryRecordWithSchemaCallback(ctx context.Context,
group *errgroup.Group, q
group.Go(func() error {
defer rdr.Release()
for rdr.Next() && ctx.Err() == nil {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
rec.Retain()
ch <- rec
}
@@ -208,9 +208,9 @@ func newRecordReader(ctx context.Context, query
*bigquery.Query, boundParameters
// BigQuery can expose result sets as multiple streams when using
certain APIs
// for now lets keep this and set the number of channels to 1
// when we need to adapt to multiple streams we can change the value
here
- chs := make([]chan arrow.Record, 1)
+ chs := make([]chan arrow.RecordBatch, 1)
- ch := make(chan arrow.Record, resultRecordBufferSize)
+ ch := make(chan arrow.RecordBatch, resultRecordBufferSize)
group, ctx := errgroup.WithContext(ctx)
group.SetLimit(prefetchConcurrency)
ctx, cancelFn := context.WithCancel(ctx)
@@ -232,7 +232,7 @@ func newRecordReader(ctx context.Context, query
*bigquery.Query, boundParameters
}
for boundParameters.Next() {
- rec := boundParameters.Record()
+ rec := boundParameters.RecordBatch()
// Each call to Record() on the record reader is allowed to
release the previous record
// and since we're doing this sequentially
// we don't need to call rec.Retain() here and call call
rec.Release() in queryRecordWithSchemaCallback
@@ -294,7 +294,11 @@ func (r *reader) Schema() *arrow.Schema {
return r.schema
}
-func (r *reader) Record() arrow.Record {
+func (r *reader) Record() arrow.RecordBatch {
+ return r.rec
+}
+
+func (r *reader) RecordBatch() arrow.RecordBatch {
return r.rec
}
@@ -323,3 +327,5 @@ func (e emptyArrowIterator) SerializedArrowSchema() []byte {
return buf.Bytes()
}
+
+var _ array.RecordReader = (*reader)(nil)
diff --git a/go/adbc/driver/bigquery/statement.go
b/go/adbc/driver/bigquery/statement.go
index 58c3c1f57..a33c8bf46 100644
--- a/go/adbc/driver/bigquery/statement.go
+++ b/go/adbc/driver/bigquery/statement.go
@@ -47,7 +47,7 @@ type statement struct {
queryConfig bigquery.QueryConfig
parameterMode string
- paramBinding arrow.Record
+ paramBinding arrow.RecordBatch
streamBinding array.RecordReader
resultRecordBufferSize int
prefetchConcurrency int
@@ -329,7 +329,7 @@ func (st *statement) ExecuteUpdate(ctx context.Context)
(int64, error) {
} else {
totalRows := int64(0)
for boundParameters.Next() {
- values := boundParameters.Record()
+ values := boundParameters.RecordBatch()
for i := 0; i < int(values.NumRows()); i++ {
parameters, err := getQueryParameter(values, i,
st.parameterMode)
if err != nil {
@@ -663,7 +663,7 @@ func arrowValueToQueryParameterValue(field arrow.Field,
value arrow.Array, i int
func (st *statement) getBoundParameterReader() (array.RecordReader, error) {
if st.paramBinding != nil {
- rdr, err := array.NewRecordReader(st.paramBinding.Schema(),
[]arrow.Record{st.paramBinding})
+ rdr, err := array.NewRecordReader(st.paramBinding.Schema(),
[]arrow.RecordBatch{st.paramBinding})
if err != nil {
return nil, err
}
@@ -694,7 +694,7 @@ func (st *statement) clearParameters() {
// from under the statement. Release will be called on a previous binding
// record or reader if it existed, and will be called upon calling Close on the
// PreparedStatement.
-func (st *statement) SetParameters(binding arrow.Record) {
+func (st *statement) SetParameters(binding arrow.RecordBatch) {
st.clearParameters()
st.paramBinding = binding
if st.paramBinding != nil {
@@ -721,7 +721,7 @@ func (st *statement) SetRecordReader(binding
array.RecordReader) {
// The driver will call release on the passed in Record when it is done,
// but it may not do this until the statement is closed or another
// record is bound.
-func (st *statement) Bind(_ context.Context, values arrow.Record) error {
+func (st *statement) Bind(_ context.Context, values arrow.RecordBatch) error {
st.SetParameters(values)
return nil
}
diff --git a/go/adbc/driver/flightsql/cmd/testserver/main.go
b/go/adbc/driver/flightsql/cmd/testserver/main.go
index 7bb4fc7c1..6d82bd0c8 100644
--- a/go/adbc/driver/flightsql/cmd/testserver/main.go
+++ b/go/adbc/driver/flightsql/cmd/testserver/main.go
@@ -300,7 +300,7 @@ func (srv *ExampleServer) DoGetPreparedStatement(ctx
context.Context, cmd flight
case "forever":
ch := make(chan flight.StreamChunk)
schema = arrow.NewSchema([]arrow.Field{{Name: "ints", Type:
arrow.PrimitiveTypes.Int32, Nullable: true}}, nil)
- var rec arrow.Record
+ var rec arrow.RecordBatch
rec, _, err = array.RecordFromJSON(memory.DefaultAllocator,
schema, strings.NewReader(`[{"a": 5}]`))
go func() {
// wait for client cancel
@@ -342,7 +342,7 @@ func (srv *ExampleServer) DoGetPreparedStatement(ctx
context.Context, cmd flight
}
srv.headers = make([]RecordedHeader, 0)
- rec := array.NewRecord(recordedHeadersSchema, []arrow.Array{
+ rec := array.NewRecordBatch(recordedHeadersSchema,
[]arrow.Array{
methods.NewArray(),
headers.NewArray(),
values.NewArray(),
@@ -452,7 +452,7 @@ func (srv *ExampleServer) DoGetCatalogs(ctx
context.Context) (*arrow.Schema, <-c
}
defer catalogs.Release()
- batch := array.NewRecord(schema, []arrow.Array{catalogs}, 1)
+ batch := array.NewRecordBatch(schema, []arrow.Array{catalogs}, 1)
ch <- flight.StreamChunk{Data: batch}
close(ch)
return schema, ch, nil
@@ -489,7 +489,7 @@ func (srv *ExampleServer) DoGetDBSchemas(ctx
context.Context, req flightsql.GetD
}
defer dbSchemas.Release()
- batch := array.NewRecord(schema, []arrow.Array{catalogs,
dbSchemas}, 1)
+ batch := array.NewRecordBatch(schema, []arrow.Array{catalogs,
dbSchemas}, 1)
ch <- flight.StreamChunk{Data: batch}
}
close(ch)
diff --git a/go/adbc/driver/flightsql/example_usage_test.go
b/go/adbc/driver/flightsql/example_usage_test.go
index aac0c6f15..53eefa15a 100644
--- a/go/adbc/driver/flightsql/example_usage_test.go
+++ b/go/adbc/driver/flightsql/example_usage_test.go
@@ -87,7 +87,7 @@ func FlightSQLExample(uri string) (err error) {
defer reader.Release()
for reader.Next() {
- arr, ok := reader.Record().Column(0).(*array.Int64)
+ arr, ok := reader.RecordBatch().Column(0).(*array.Int64)
if !ok {
return fmt.Errorf("result data was not int64")
}
diff --git a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
index d7d4c73b9..52a2c3e09 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go
@@ -1732,7 +1732,7 @@ func (ts *TimeoutTests) TestDontTimeout() {
defer rr.Release()
ts.True(rr.Next())
- rec := rr.Record()
+ rec := rr.RecordBatch()
sc := arrow.NewSchema([]arrow.Field{{Name: "a", Type:
arrow.PrimitiveTypes.Int32, Nullable: true}}, nil)
expected, _, err := array.RecordFromJSON(memory.DefaultAllocator, sc,
strings.NewReader(`[{"a": 5}]`))
@@ -1914,7 +1914,7 @@ var (
func (server *DataTypeTestServer) DoGetStatement(ctx context.Context, tkt
flightsql.StatementQueryTicket) (*arrow.Schema, <-chan flight.StreamChunk,
error) {
var schema *arrow.Schema
- var record arrow.Record
+ var record arrow.RecordBatch
var err error
cmd := string(tkt.GetStatementHandle())
@@ -2041,7 +2041,7 @@ func (server *MultiTableTestServer) DoGetTables(ctx
context.Context, cmd flights
bldr.Field(4).(*array.BinaryBuilder).AppendValues([][]byte{buf1, buf2},
nil)
defer bldr.Release()
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
ch := make(chan flight.StreamChunk)
go func() {
@@ -2387,12 +2387,12 @@ func (srv *GetObjectsTestServer) DoGetTables(ctx
context.Context, cmd flightsql.
tablesRecord, _, _ := array.RecordFromJSON(srv.Alloc,
schema_ref.Tables, strings.NewReader(jsonStr))
defer tablesRecord.Release()
- tablesRecordWithSchema :=
array.NewRecord(schema_ref.TablesWithIncludedSchema,
append(tablesRecord.Columns(), schemaCol), tablesRecord.NumRows())
+ tablesRecordWithSchema :=
array.NewRecordBatch(schema_ref.TablesWithIncludedSchema,
append(tablesRecord.Columns(), schemaCol), tablesRecord.NumRows())
defer tablesRecordWithSchema.Release()
ch := make(chan flight.StreamChunk)
- rdr, err := array.NewRecordReader(schema_ref.TablesWithIncludedSchema,
[]arrow.Record{tablesRecordWithSchema})
+ rdr, err := array.NewRecordReader(schema_ref.TablesWithIncludedSchema,
[]arrow.RecordBatch{tablesRecordWithSchema})
go flight.StreamChunksFromReader(rdr, ch)
return schema_ref.TablesWithIncludedSchema, ch, err
}
@@ -2413,7 +2413,7 @@ func (srv *GetObjectsTestServer) DoGetDBSchemas(ctx
context.Context, cmd flights
}
defer dbSchemas.Release()
- batch := array.NewRecord(schema, []arrow.Array{catalogs, dbSchemas}, 1)
+ batch := array.NewRecordBatch(schema, []arrow.Array{catalogs,
dbSchemas}, 1)
ch <- flight.StreamChunk{Data: batch}
close(ch)
return schema, ch, nil
@@ -2560,7 +2560,7 @@ func (suite *GetObjectsTests)
TestMetadataGetObjectsColumnsXdbc() {
suite.Truef(adbc.GetObjectsSchema.Equal(rdr.Schema()),
"expected: %s\ngot: %s", adbc.GetObjectsSchema, rdr.Schema())
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
suite.Greater(rec.NumRows(), int64(0))
var (
foundExpected = false
diff --git a/go/adbc/driver/flightsql/flightsql_adbc_test.go
b/go/adbc/driver/flightsql/flightsql_adbc_test.go
index 17fc59c4e..9f9baf87f 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc_test.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc_test.go
@@ -173,7 +173,7 @@ func writeTo(arr arrow.Array, idx int, w io.Writer) {
}
}
-func (s *FlightSQLQuirks) CreateSampleTable(tableName string, r arrow.Record)
error {
+func (s *FlightSQLQuirks) CreateSampleTable(tableName string, r
arrow.RecordBatch) error {
var b strings.Builder
b.WriteString("CREATE TABLE ")
b.WriteString(tableName)
@@ -1011,10 +1011,10 @@ func (suite *ConnectionTests) TestGetInfo() {
driverVersion := false
driverArrowVersion := false
for reader.Next() {
- code := reader.Record().Column(0).(*array.Uint32)
- values := reader.Record().Column(1).(*array.DenseUnion)
+ code := reader.RecordBatch().Column(0).(*array.Uint32)
+ values := reader.RecordBatch().Column(1).(*array.DenseUnion)
stringValues := values.Field(0).(*array.String)
- for i := 0; i < int(reader.Record().NumRows()); i++ {
+ for i := 0; i < int(reader.RecordBatch().NumRows()); i++ {
switch adbc.InfoCode(code.Value(i)) {
case adbc.InfoDriverName:
{
diff --git a/go/adbc/driver/flightsql/flightsql_connection.go
b/go/adbc/driver/flightsql/flightsql_connection.go
index e70c2a54d..024950d3e 100644
--- a/go/adbc/driver/flightsql/flightsql_connection.go
+++ b/go/adbc/driver/flightsql/flightsql_connection.go
@@ -648,7 +648,7 @@ func (c *connectionImpl) PrepareDriverInfo(ctx
context.Context, infoCodes []adbc
}
for rdr.Next() {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
field := rec.Column(0).(*array.Uint32)
info := rec.Column(1).(*array.DenseUnion)
@@ -742,7 +742,7 @@ func (c *connectionImpl) GetObjectsCatalogs(ctx
context.Context, catalog *string
catalogs := make([]string, 0, numCatalogs)
for rdr.Next() {
- arr := rdr.Record().Column(0).(*array.String)
+ arr := rdr.RecordBatch().Column(0).(*array.String)
for i := 0; i < arr.Len(); i++ {
// XXX: force copy since accessor is unsafe
catalogName := string([]byte(arr.Value(i)))
@@ -781,9 +781,9 @@ func (c *connectionImpl) GetObjectsDbSchemas(ctx
context.Context, depth adbc.Obj
for rdr.Next() {
// Nullable
- catalog := rdr.Record().Column(0).(*array.String)
+ catalog := rdr.RecordBatch().Column(0).(*array.String)
// Non-nullable
- dbSchema := rdr.Record().Column(1).(*array.String)
+ dbSchema := rdr.RecordBatch().Column(1).(*array.String)
for i := 0; i < catalog.Len(); i++ {
catalogName := ""
@@ -834,11 +834,11 @@ func (c *connectionImpl) GetObjectsTables(ctx
context.Context, depth adbc.Object
for rdr.Next() {
// Nullable
- catalog := rdr.Record().Column(0).(*array.String)
- dbSchema := rdr.Record().Column(1).(*array.String)
+ catalog := rdr.RecordBatch().Column(0).(*array.String)
+ dbSchema := rdr.RecordBatch().Column(1).(*array.String)
// Non-nullable
- tableName := rdr.Record().Column(2).(*array.String)
- tableType := rdr.Record().Column(3).(*array.String)
+ tableName := rdr.RecordBatch().Column(2).(*array.String)
+ tableType := rdr.RecordBatch().Column(3).(*array.String)
for i := 0; i < catalog.Len(); i++ {
catalogName := ""
@@ -856,7 +856,7 @@ func (c *connectionImpl) GetObjectsTables(ctx
context.Context, depth adbc.Object
var schema *arrow.Schema
if includeSchema {
- reader, err :=
ipc.NewReader(bytes.NewReader(rdr.Record().Column(4).(*array.Binary).Value(i)))
+ reader, err :=
ipc.NewReader(bytes.NewReader(rdr.RecordBatch().Column(4).(*array.Binary).Value(i)))
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
diff --git a/go/adbc/driver/flightsql/flightsql_database.go
b/go/adbc/driver/flightsql/flightsql_database.go
index d8070c251..f79db14a9 100644
--- a/go/adbc/driver/flightsql/flightsql_database.go
+++ b/go/adbc/driver/flightsql/flightsql_database.go
@@ -511,7 +511,7 @@ func (d *databaseImpl) Open(ctx context.Context)
(adbc.Connection, error) {
defer rdr.Release()
for rdr.Next() {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
codes := rec.Column(0).(*array.Uint32)
values := rec.Column(1).(*array.DenseUnion)
int32Value :=
values.Field(int32code).(*array.Int32)
diff --git a/go/adbc/driver/flightsql/flightsql_statement.go
b/go/adbc/driver/flightsql/flightsql_statement.go
index 0deada8e9..30363af25 100644
--- a/go/adbc/driver/flightsql/flightsql_statement.go
+++ b/go/adbc/driver/flightsql/flightsql_statement.go
@@ -520,7 +520,7 @@ func (s *statement) SetSubstraitPlan(plan []byte) error {
// The driver will call release on the passed in Record when it is done,
// but it may not do this until the statement is closed or another
// record is bound.
-func (s *statement) Bind(_ context.Context, values arrow.Record) error {
+func (s *statement) Bind(_ context.Context, values arrow.RecordBatch) error {
// TODO: handle bulk insert situation
if s.prepared == nil {
diff --git a/go/adbc/driver/flightsql/record_reader.go
b/go/adbc/driver/flightsql/record_reader.go
index 92e37bde4..a0c2cc481 100644
--- a/go/adbc/driver/flightsql/record_reader.go
+++ b/go/adbc/driver/flightsql/record_reader.go
@@ -38,9 +38,9 @@ import (
type reader struct {
refCount int64
schema *arrow.Schema
- chs []chan arrow.Record
+ chs []chan arrow.RecordBatch
curChIndex int
- rec arrow.Record
+ rec arrow.RecordBatch
err error
cancelFn context.CancelFunc
@@ -67,10 +67,10 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
Code: adbc.StatusInternal,
}
}
- return array.NewRecordReader(schema, []arrow.Record{})
+ return array.NewRecordReader(schema, []arrow.RecordBatch{})
}
- ch := make(chan arrow.Record, bufferSize)
+ ch := make(chan arrow.RecordBatch, bufferSize)
group, ctx := errgroup.WithContext(ctx)
ctx, cancelFn := context.WithCancel(ctx)
// We may mutate endpoints below
@@ -104,7 +104,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
}
for rdr.Next() && ctx.Err() == nil {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
rec.Retain()
ch <- rec
}
@@ -117,7 +117,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
endpoints = endpoints[1:]
}
- chs := make([]chan arrow.Record, numEndpoints)
+ chs := make([]chan arrow.RecordBatch, numEndpoints)
chs[0] = ch
reader := &reader{
refCount: 1,
@@ -133,7 +133,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
for i, ep := range endpoints {
endpoint := ep
endpointIndex := i
- chs[endpointIndex] = make(chan arrow.Record, bufferSize)
+ chs[endpointIndex] = make(chan arrow.RecordBatch, bufferSize)
group.Go(func() error {
// Close channels (except the last) so that Next can
move on to the next channel properly
if endpointIndex != lastChannelIndex {
@@ -152,7 +152,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
}
for rdr.Next() && ctx.Err() == nil {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
rec.Retain()
chs[endpointIndex] <- rec
}
@@ -220,6 +220,12 @@ func (r *reader) Schema() *arrow.Schema {
return r.schema
}
-func (r *reader) Record() arrow.Record {
+func (r *reader) Record() arrow.RecordBatch {
return r.rec
}
+
+func (r *reader) RecordBatch() arrow.RecordBatch {
+ return r.rec
+}
+
+var _ array.RecordReader = (*reader)(nil)
diff --git a/go/adbc/driver/flightsql/record_reader_test.go
b/go/adbc/driver/flightsql/record_reader_test.go
index 9607bc549..ab11e0553 100644
--- a/go/adbc/driver/flightsql/record_reader_test.go
+++ b/go/adbc/driver/flightsql/record_reader_test.go
@@ -73,7 +73,7 @@ func (f *testFlightService) DoGet(request *flight.Ticket,
stream flight.FlightSe
epIndex.Append(int8(request.Ticket[0]))
batchIndex.Append(idx)
- rec := builder.NewRecord()
+ rec := builder.NewRecordBatch()
defer rec.Release()
if err := wr.Write(rec); err != nil {
return err
@@ -346,7 +346,7 @@ func (suite *RecordReaderTests) TestOrdering() {
for epIdx := int8(0); epIdx < 4; epIdx++ {
for batchIdx := int8(0); batchIdx < 4; batchIdx++ {
suite.True(reader.Next())
- rec := reader.Record()
+ rec := reader.RecordBatch()
// don't need to manually release this record because
we never
// call retain. Each call to Next releases the previous
record
diff --git a/go/adbc/driver/internal/driverbase/connection.go
b/go/adbc/driver/internal/driverbase/connection.go
index 2e1277852..272504c10 100644
--- a/go/adbc/driver/internal/driverbase/connection.go
+++ b/go/adbc/driver/internal/driverbase/connection.go
@@ -209,10 +209,10 @@ func (base *ConnectionImplBase) GetInfo(ctx
context.Context, infoCodes []adbc.In
}
}
- final := bldr.NewRecord()
+ final := bldr.NewRecordBatch()
defer final.Release()
- reader, err = array.NewRecordReader(adbc.GetInfoSchema,
[]arrow.Record{final})
+ reader, err = array.NewRecordReader(adbc.GetInfoSchema,
[]arrow.RecordBatch{final})
return reader, err
}
@@ -584,9 +584,9 @@ func (cnxn *connection) GetTableTypes(ctx context.Context)
(array.RecordReader,
defer bldr.Release()
bldr.Field(0).(*array.StringBuilder).AppendValues(tableTypes, nil)
- final := bldr.NewRecord()
+ final := bldr.NewRecordBatch()
defer final.Release()
- return array.NewRecordReader(adbc.TableTypesSchema,
[]arrow.Record{final})
+ return array.NewRecordReader(adbc.TableTypesSchema,
[]arrow.RecordBatch{final})
}
func (cnxn *connection) Commit(ctx context.Context) error {
@@ -744,10 +744,10 @@ CATALOGLOOP:
}
}
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
- return array.NewRecordReader(adbc.GetObjectsSchema, []arrow.Record{rec})
+ return array.NewRecordReader(adbc.GetObjectsSchema,
[]arrow.RecordBatch{rec})
}
func PatternToNamedArg(name string, pattern *string) sql.NamedArg {
diff --git a/go/adbc/driver/internal/driverbase/driver_test.go
b/go/adbc/driver/internal/driverbase/driver_test.go
index f33b1fbcd..1f1a24f5f 100644
--- a/go/adbc/driver/internal/driverbase/driver_test.go
+++ b/go/adbc/driver/internal/driverbase/driver_test.go
@@ -533,7 +533,7 @@ func (base *statement) Base() *driverbase.StatementImplBase
{
return &base.StatementImplBase
}
-func (base *statement) Bind(ctx context.Context, values arrow.Record) error {
+func (base *statement) Bind(ctx context.Context, values arrow.RecordBatch)
error {
return base.Base().ErrorHelper.Errorf(adbc.StatusNotImplemented, "Bind")
}
@@ -772,9 +772,9 @@ func messagesEqual(expected, actual logMessage) bool {
func tableFromRecordReader(rdr array.RecordReader) arrow.Table {
defer rdr.Release()
- recs := make([]arrow.Record, 0)
+ recs := make([]arrow.RecordBatch, 0)
for rdr.Next() {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
rec.Retain()
defer rec.Release()
recs = append(recs, rec)
diff --git a/go/adbc/driver/internal/shared_utils.go
b/go/adbc/driver/internal/shared_utils.go
index 0fda8f630..6ea07fcbe 100644
--- a/go/adbc/driver/internal/shared_utils.go
+++ b/go/adbc/driver/internal/shared_utils.go
@@ -256,10 +256,10 @@ func (g *GetObjects) Release() {
}
func (g *GetObjects) Finish() (array.RecordReader, error) {
- record := g.builder.NewRecord()
+ record := g.builder.NewRecordBatch()
defer record.Release()
- result, err := array.NewRecordReader(g.builder.Schema(),
[]arrow.Record{record})
+ result, err := array.NewRecordReader(g.builder.Schema(),
[]arrow.RecordBatch{record})
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
diff --git a/go/adbc/driver/panicdummy/panicdummy_adbc.go
b/go/adbc/driver/panicdummy/panicdummy_adbc.go
index cef79b50b..13d32660a 100644
--- a/go/adbc/driver/panicdummy/panicdummy_adbc.go
+++ b/go/adbc/driver/panicdummy/panicdummy_adbc.go
@@ -165,7 +165,7 @@ func (s *statement) SetSubstraitPlan(plan []byte) error {
return nil
}
-func (s *statement) Bind(_ context.Context, values arrow.Record) error {
+func (s *statement) Bind(_ context.Context, values arrow.RecordBatch) error {
maybePanic("StatementBind")
values.Release()
return nil
diff --git a/go/adbc/driver/snowflake/binding.go
b/go/adbc/driver/snowflake/binding.go
index 421fb1eb4..dbdeebec2 100644
--- a/go/adbc/driver/snowflake/binding.go
+++ b/go/adbc/driver/snowflake/binding.go
@@ -28,7 +28,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
)
-func convertArrowToNamedValue(batch arrow.Record, index int)
([]driver.NamedValue, error) {
+func convertArrowToNamedValue(batch arrow.RecordBatch, index int)
([]driver.NamedValue, error) {
// see goTypeToSnowflake in gosnowflake
// technically, snowflake can bind an array of values at once, but
// only for INSERT, so we can't take advantage of that without
@@ -97,7 +97,7 @@ func convertArrowToNamedValue(batch arrow.Record, index int)
([]driver.NamedValu
type snowflakeBindReader struct {
doQuery func([]driver.NamedValue) (array.RecordReader, error)
- currentBatch arrow.Record
+ currentBatch arrow.RecordBatch
nextIndex int64
// may be nil if we bound only a batch
stream array.RecordReader
@@ -135,7 +135,7 @@ func (r *snowflakeBindReader) NextParams()
([]driver.NamedValue, error) {
}
r.currentBatch = nil
if r.stream != nil && r.stream.Next() {
- r.currentBatch = r.stream.Record()
+ r.currentBatch = r.stream.RecordBatch()
r.currentBatch.Retain()
r.nextIndex = 0
continue
diff --git a/go/adbc/driver/snowflake/bulk_ingestion.go
b/go/adbc/driver/snowflake/bulk_ingestion.go
index 04f93999d..4415aea3e 100644
--- a/go/adbc/driver/snowflake/bulk_ingestion.go
+++ b/go/adbc/driver/snowflake/bulk_ingestion.go
@@ -150,7 +150,7 @@ func (st *statement) ingestRecord(ctx context.Context)
(nrows int64, err error)
g := errgroup.Group{}
// writeParquet takes a channel of Records, but we only have one Record
to write
- recordCh := make(chan arrow.Record, 1)
+ recordCh := make(chan arrow.RecordBatch, 1)
recordCh <- st.bound
close(recordCh)
@@ -243,7 +243,7 @@ func (st *statement) ingestStream(ctx context.Context)
(nrows int64, err error)
g, gCtx := errgroup.WithContext(ctx)
// Read records into channel
- records := make(chan arrow.Record, st.ingestOptions.writerConcurrency)
+ records := make(chan arrow.RecordBatch,
st.ingestOptions.writerConcurrency)
g.Go(func() error {
return readRecords(gCtx, st.streamBind, records)
})
@@ -308,11 +308,11 @@ func newWriterProps(mem memory.Allocator, opts
*ingestOptions) (*parquet.WriterP
return parquetProps, arrowProps
}
-func readRecords(ctx context.Context, rdr array.RecordReader, out chan<-
arrow.Record) error {
+func readRecords(ctx context.Context, rdr array.RecordReader, out chan<-
arrow.RecordBatch) error {
defer close(out)
for rdr.Next() {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
rec.Retain()
select {
@@ -325,7 +325,7 @@ func readRecords(ctx context.Context, rdr
array.RecordReader, out chan<- arrow.R
return rdr.Err()
}
-func writeRecordToParquet(wr *pqarrow.FileWriter, rec arrow.Record) (int64,
error) {
+func writeRecordToParquet(wr *pqarrow.FileWriter, rec arrow.RecordBatch)
(int64, error) {
if rec.NumRows() == 0 {
rec.Release()
return 0, nil
@@ -343,7 +343,7 @@ func writeRecordToParquet(wr *pqarrow.FileWriter, rec
arrow.Record) (int64, erro
func writeParquet(
schema *arrow.Schema,
w io.Writer,
- in <-chan arrow.Record,
+ in <-chan arrow.RecordBatch,
targetSize int,
parquetProps *parquet.WriterProperties,
arrowProps pqarrow.ArrowWriterProperties,
@@ -396,7 +396,7 @@ func runParallelParquetWriters(
parquetProps *parquet.WriterProperties,
arrowProps pqarrow.ArrowWriterProperties,
buffers *bufferPool,
- in <-chan arrow.Record,
+ in <-chan arrow.RecordBatch,
out chan<- *bytes.Buffer,
) error {
var once sync.Once
diff --git a/go/adbc/driver/snowflake/bulk_ingestion_test.go
b/go/adbc/driver/snowflake/bulk_ingestion_test.go
index 669a770e4..40dbbb0d9 100644
--- a/go/adbc/driver/snowflake/bulk_ingestion_test.go
+++ b/go/adbc/driver/snowflake/bulk_ingestion_test.go
@@ -59,7 +59,7 @@ func TestIngestBatchedParquetWithFileLimit(t *testing.T) {
expectedRowGroupSize := buf.Len()
require.NoError(t, tempWriter.Close())
- recs := make([]arrow.Record, nRecs)
+ recs := make([]arrow.RecordBatch, nRecs)
for i := 0; i < nRecs; i++ {
recs[i] = rec
}
@@ -68,7 +68,7 @@ func TestIngestBatchedParquetWithFileLimit(t *testing.T) {
require.NoError(t, err)
defer rdr.Release()
- records := make(chan arrow.Record)
+ records := make(chan arrow.RecordBatch)
go func() { assert.NoError(t, readRecords(ctx, rdr, records)) }()
buf.Reset()
@@ -83,7 +83,7 @@ func TestIngestBatchedParquetWithFileLimit(t *testing.T) {
require.ErrorIs(t, writeParquet(rdr.Schema(), &buf, records, -1,
parquetProps, arrowProps), io.EOF)
}
-func makeRec(mem memory.Allocator, nCols, nRows int) arrow.Record {
+func makeRec(mem memory.Allocator, nCols, nRows int) arrow.RecordBatch {
vals := make([]int8, nRows)
for val := 0; val < nRows; val++ {
vals[val] = int8(val)
@@ -100,9 +100,9 @@ func makeRec(mem memory.Allocator, nCols, nRows int)
arrow.Record {
cols := make([]arrow.Array, nCols)
for i := 0; i < nCols; i++ {
fields[i] = arrow.Field{Name: fmt.Sprintf("field_%d", i), Type:
arrow.PrimitiveTypes.Int8}
- cols[i] = arr // array.NewRecord will retain these
+ cols[i] = arr // array.NewRecordBatch will retain these
}
schema := arrow.NewSchema(fields, nil)
- return array.NewRecord(schema, cols, int64(nRows))
+ return array.NewRecordBatch(schema, cols, int64(nRows))
}
diff --git a/go/adbc/driver/snowflake/concat_reader.go
b/go/adbc/driver/snowflake/concat_reader.go
index a94a2ded8..c7745cfaa 100644
--- a/go/adbc/driver/snowflake/concat_reader.go
+++ b/go/adbc/driver/snowflake/concat_reader.go
@@ -99,9 +99,14 @@ func (r *concatReader) Next() bool {
}
return true
}
-func (r *concatReader) Record() arrow.Record {
- return r.currentReader.Record()
+func (r *concatReader) Record() arrow.RecordBatch {
+ return r.currentReader.RecordBatch()
+}
+func (r *concatReader) RecordBatch() arrow.RecordBatch {
+ return r.currentReader.RecordBatch()
}
func (r *concatReader) Err() error {
return r.err
}
+
+var _ array.RecordReader = (*concatReader)(nil)
diff --git a/go/adbc/driver/snowflake/driver_test.go
b/go/adbc/driver/snowflake/driver_test.go
index 141eacb2f..147a92efd 100644
--- a/go/adbc/driver/snowflake/driver_test.go
+++ b/go/adbc/driver/snowflake/driver_test.go
@@ -151,7 +151,7 @@ func quoteTblName(name string) string {
return "\"" + strings.ReplaceAll(name, "\"", "\"\"") + "\""
}
-func (s *SnowflakeQuirks) CreateSampleTable(tableName string, r arrow.Record)
(err error) {
+func (s *SnowflakeQuirks) CreateSampleTable(tableName string, r
arrow.RecordBatch) (err error) {
var b strings.Builder
b.WriteString("CREATE OR REPLACE TABLE ")
b.WriteString(quoteTblName(tableName))
@@ -453,7 +453,7 @@ func (suite *SnowflakeTests) TestSqlIngestTimestamp() {
ibldr := bldr.Field(2).(*array.Int64Builder)
ibldr.AppendValues([]int64{-1, 25, 0}, []bool{true, true, false})
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -469,7 +469,7 @@ func (suite *SnowflakeTests) TestSqlIngestTimestamp() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
suite.Truef(array.RecordEqual(rec, result), "expected: %s\ngot: %s",
rec, result)
suite.False(rdr.Next())
@@ -552,10 +552,10 @@ func (suite *SnowflakeTests)
TestSqlIngestRecordAndStreamAreEquivalent() {
bldr.Field(11).(*array.TimestampBuilder).AppendValues([]arrow.Timestamp{1, 2,
3}, nil)
bldr.Field(12).(*array.TimestampBuilder).AppendValues([]arrow.Timestamp{1, 2,
3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
- stream, err := array.NewRecordReader(sc, []arrow.Record{rec})
+ stream, err := array.NewRecordReader(sc, []arrow.RecordBatch{rec})
suite.Require().NoError(err)
defer stream.Release()
@@ -572,7 +572,7 @@ func (suite *SnowflakeTests)
TestSqlIngestRecordAndStreamAreEquivalent() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- resultBind := rdr.Record()
+ resultBind := rdr.RecordBatch()
// New session to clean up TEMPORARY resources in Snowflake associated
with the previous one
suite.NoError(suite.stmt.Close())
@@ -595,7 +595,7 @@ func (suite *SnowflakeTests)
TestSqlIngestRecordAndStreamAreEquivalent() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- resultBindStream := rdr.Record()
+ resultBindStream := rdr.RecordBatch()
suite.Truef(array.RecordEqual(resultBind, resultBindStream), "expected:
%s\ngot: %s", resultBind, resultBindStream)
suite.False(rdr.Next())
@@ -663,7 +663,7 @@ func (suite *SnowflakeTests) TestSqlIngestRoundtripTypes() {
bldr.Field(8).(*array.Time32Builder).AppendValues([]arrow.Time32{1, 2,
3}, nil)
bldr.Field(9).(*array.Time32Builder).AppendValues([]arrow.Time32{1, 2,
3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -679,7 +679,7 @@ func (suite *SnowflakeTests) TestSqlIngestRoundtripTypes() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
suite.Truef(array.RecordEqual(rec, result), "expected: %s\ngot: %s",
rec, result)
suite.False(rdr.Next())
@@ -736,7 +736,7 @@ func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
bldr.Field(5).(*array.TimestampBuilder).AppendValues([]arrow.Timestamp{1, 2,
3}, nil)
bldr.Field(6).(*array.TimestampBuilder).AppendValues([]arrow.Timestamp{1, 2,
3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -752,7 +752,7 @@ func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{
@@ -845,7 +845,7 @@ func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
bldr.Field(1).(*array.Date64Builder).AppendValues([]arrow.Date64{86400000,
172800000, 259200000}, nil) // 1,2,3 days of milliseconds
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -861,7 +861,7 @@ func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{
@@ -935,7 +935,7 @@ func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
suite.Require().NoError(err)
bldr.Field(3).(*array.Decimal128Builder).AppendValues([]decimal128.Num{num1,
num2, num3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -955,7 +955,7 @@ func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{ // INT64 -> DECIMAL(38, 0) on roundtrip
@@ -1043,7 +1043,7 @@ func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
suite.Require().NoError(err)
bldr.Field(3).(*array.Decimal128Builder).AppendValues([]decimal128.Num{num1,
num2, num3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -1060,7 +1060,7 @@ func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{ // Preserved on roundtrip
@@ -1158,7 +1158,7 @@ func (suite *SnowflakeTests) TestSqlIngestStructType() {
struct3bldr.FieldBuilder(0).(*array.Int64Builder).AppendValues([]int64{1, 0,
3}, nil)
struct3bldr.FieldBuilder(1).(*array.BooleanBuilder).AppendValues([]bool{true,
false, false}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -1174,7 +1174,7 @@ func (suite *SnowflakeTests) TestSqlIngestStructType() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{
@@ -1259,7 +1259,7 @@ func (suite *SnowflakeTests) TestSqlIngestMapType() {
keybldr.Append("key3")
itembldr.Append(3)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -1275,7 +1275,7 @@ func (suite *SnowflakeTests) TestSqlIngestMapType() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{
@@ -1345,7 +1345,7 @@ func (suite *SnowflakeTests) TestSqlIngestListType() {
listbldr.Append(true)
listvalbldr.Append("three")
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
suite.Require().NoError(suite.stmt.Bind(suite.ctx, rec))
@@ -1361,7 +1361,7 @@ func (suite *SnowflakeTests) TestSqlIngestListType() {
suite.EqualValues(3, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
expectedSchema := arrow.NewSchema([]arrow.Field{
{
@@ -1413,7 +1413,7 @@ func (suite *SnowflakeTests)
TestStatementEmptyResultSet() {
defer rdr.Release()
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
suite.Equal(n, rec.NumRows())
// Snowflake may add more columns to the result of this query in the
future,
@@ -1526,7 +1526,7 @@ func (suite *SnowflakeTests)
TestMetadataGetObjectsColumnsXdbc() {
suite.Truef(adbc.GetObjectsSchema.Equal(rdr.Schema()), "expected:
%s\ngot: %s", adbc.GetObjectsSchema, rdr.Schema())
suite.True(rdr.Next())
- rec = rdr.Record()
+ rec = rdr.RecordBatch()
suite.Greater(rec.NumRows(), int64(0))
var (
foundExpected = false
@@ -1693,7 +1693,7 @@ func (suite *SnowflakeTests) TestTimestampSnow() {
defer rdr.Release()
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
for _, f := range rec.Schema().Fields() {
st, ok := f.Metadata.GetValue("SNOWFLAKE_TYPE")
if !ok {
@@ -1755,7 +1755,7 @@ func (suite *SnowflakeTests) TestTimestampPrecisionJson()
{
suite.Require().NoError(err)
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
suite.Equal(1, int(rec.NumRows()))
@@ -1860,7 +1860,7 @@ func (suite *SnowflakeTests) queryTimestamps(query
string, expectedMicrosecondRe
suite.validateTimestamps(query, rec, nil) // dont expect any results
}
-func (suite *SnowflakeTests) getTimestamps(query string, maxTimestampPrecision
string) arrow.Record {
+func (suite *SnowflakeTests) getTimestamps(query string, maxTimestampPrecision
string) arrow.RecordBatch {
// with max microseconds precision
opts := suite.Quirks.DatabaseOptions()
@@ -1886,12 +1886,12 @@ func (suite *SnowflakeTests) getTimestamps(query
string, maxTimestampPrecision s
}
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
return rec
}
-func (suite *SnowflakeTests) validateTimestamps(query string, rec
arrow.Record, expected []arrow.Timestamp) {
+func (suite *SnowflakeTests) validateTimestamps(query string, rec
arrow.RecordBatch, expected []arrow.Timestamp) {
if expected != nil {
for i := 0; i < int(rec.NumCols()); i++ {
col := rec.Column(i).(*array.Timestamp)
@@ -1936,7 +1936,7 @@ func (suite *SnowflakeTests) TestUseHighPrecision() {
suite.Truef(arrow.TypeEqual(arrow.PrimitiveTypes.Int64,
rdr.Schema().Field(0).Type), "expected int64, got %s",
rdr.Schema().Field(0).Type)
suite.Truef(arrow.TypeEqual(arrow.PrimitiveTypes.Float64,
rdr.Schema().Field(1).Type), "expected float64, got %s",
rdr.Schema().Field(1).Type)
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
suite.Equal(1234567.89, rec.Column(1).(*array.Float64).Value(0))
suite.Equal(9876543210.99, rec.Column(1).(*array.Float64).Value(1))
@@ -1963,7 +1963,7 @@ func (suite *SnowflakeTests) TestDecimalHighPrecision() {
suite.EqualValues(1, n)
suite.Truef(arrow.TypeEqual(&arrow.Decimal128Type{Precision: int32(precision),
Scale: int32(scale)}, rdr.Schema().Field(0).Type), "expected decimal(%d, %d),
got %s", precision, scale, rdr.Schema().Field(0).Type)
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
suite.Equal(number,
rec.Column(0).(*array.Decimal128).Value(0))
}
@@ -1993,7 +1993,7 @@ func (suite *SnowflakeTests)
TestNonIntDecimalLowPrecision() {
suite.EqualValues(1, n)
suite.Truef(arrow.TypeEqual(arrow.PrimitiveTypes.Float64,
rdr.Schema().Field(0).Type), "expected float64, got %s",
rdr.Schema().Field(0).Type)
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
value := rec.Column(0).(*array.Float64).Value(0)
difference := math.Abs(number - value)
@@ -2028,7 +2028,7 @@ func (suite *SnowflakeTests) TestIntDecimalLowPrecision()
{
suite.EqualValues(1, n)
suite.Truef(arrow.TypeEqual(arrow.PrimitiveTypes.Int64,
rdr.Schema().Field(0).Type), "expected int64, got %s",
rdr.Schema().Field(0).Type)
suite.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
value := rec.Column(0).(*array.Int64).Value(0)
suite.Equal(number, value)
@@ -2060,7 +2060,7 @@ func (suite *SnowflakeTests) TestAdditionalDriverInfo() {
var totalRows int64
for rdr.Next() {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
totalRows += rec.NumRows()
code := rec.Column(0).(*array.Uint32)
info := rec.Column(1).(*array.DenseUnion)
@@ -2274,7 +2274,7 @@ func (suite *SnowflakeTests) TestMetadataOnlyQuery() {
recv := int64(0)
for rdr.Next() {
- recv += rdr.Record().NumRows()
+ recv += rdr.RecordBatch().NumRows()
}
// verify that we got the expected number of rows if we sum up
@@ -2292,7 +2292,7 @@ func (suite *SnowflakeTests) TestEmptyResultSet() {
recv := int64(0)
for rdr.Next() {
- recv += rdr.Record().NumRows()
+ recv += rdr.RecordBatch().NumRows()
}
// verify that we got the expected number of rows if we sum up
@@ -2314,17 +2314,17 @@ func (suite *SnowflakeTests) TestIngestEmptyChunk() {
bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
- emptyRec := bldr.NewRecord()
+ emptyRec := bldr.NewRecordBatch()
defer emptyRec.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
// See https://github.com/apache/arrow-adbc/issues/1847
// Snowflake does not properly handle empty row groups, so need to make
sure we don't send any.
- rdr, err := array.NewRecordReader(sc, []arrow.Record{emptyRec, rec})
+ rdr, err := array.NewRecordReader(sc, []arrow.RecordBatch{emptyRec,
rec})
suite.Require().NoError(err)
defer rdr.Release()
@@ -2367,10 +2367,10 @@ func TestIngestCancelContext(t *testing.T) {
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2,
3}, nil)
- rec := bldr.NewRecord()
+ rec := bldr.NewRecordBatch()
defer rec.Release()
- rdr, err := array.NewRecordReader(sc, []arrow.Record{rec})
+ rdr, err := array.NewRecordReader(sc, []arrow.RecordBatch{rec})
require.NoError(t, err)
defer rdr.Release()
@@ -2569,7 +2569,7 @@ ORDER BY start_time;
suite.EqualValues(1, n)
suite.True(rdr.Next())
- result := rdr.Record()
+ result := rdr.RecordBatch()
suite.Require().Equal("SELECT 1",
result.Column(0).(*array.String).Value(0))
suite.False(rdr.Next())
suite.Require().NoError(rdr.Err())
@@ -2592,7 +2592,7 @@ func (suite *SnowflakeTests) TestGetObjectsVector() {
defer rdr.Release()
suite.Require().True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
for i := 0; int64(i) < rec.NumRows(); i++ {
// list<db_schema_schema>
diff --git a/go/adbc/driver/snowflake/record_reader.go
b/go/adbc/driver/snowflake/record_reader.go
index 140b1050a..c1f4979fc 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -48,11 +48,11 @@ func identCol(_ context.Context, a arrow.Array)
(arrow.Array, error) {
return a, nil
}
-type recordTransformer = func(context.Context, arrow.Record) (arrow.Record,
error)
+type recordTransformer = func(context.Context, arrow.RecordBatch)
(arrow.RecordBatch, error)
type colTransformer = func(context.Context, arrow.Array) (arrow.Array, error)
func getRecTransformer(sc *arrow.Schema, tr []colTransformer)
recordTransformer {
- return func(ctx context.Context, r arrow.Record) (arrow.Record, error) {
+ return func(ctx context.Context, r arrow.RecordBatch)
(arrow.RecordBatch, error) {
if len(tr) != int(r.NumCols()) {
return nil, adbc.Error{
Msg: "mismatch in record cols and
transformers",
@@ -71,7 +71,7 @@ func getRecTransformer(sc *arrow.Schema, tr []colTransformer)
recordTransformer
defer cols[i].Release()
}
- return array.NewRecord(sc, cols, r.NumRows()), nil
+ return array.NewRecordBatch(sc, cols, r.NumRows()), nil
}
}
@@ -400,7 +400,7 @@ func extractTimestamp(src *string) (sec, nsec int64, err
error) {
return
}
-func jsonDataToArrow(_ context.Context, bldr *array.RecordBuilder, rawData
[][]*string, maxTimestampPrecision MaxTimestampPrecision) (arrow.Record, error)
{
+func jsonDataToArrow(_ context.Context, bldr *array.RecordBuilder, rawData
[][]*string, maxTimestampPrecision MaxTimestampPrecision) (arrow.RecordBatch,
error) {
fieldBuilders := bldr.Fields()
for _, rec := range rawData {
for i, col := range rec {
@@ -507,15 +507,15 @@ func jsonDataToArrow(_ context.Context, bldr
*array.RecordBuilder, rawData [][]*
}
}
}
- return bldr.NewRecord(), nil
+ return bldr.NewRecordBatch(), nil
}
type reader struct {
refCount int64
schema *arrow.Schema
- chs []chan arrow.Record
+ chs []chan arrow.RecordBatch
curChIndex int
- rec arrow.Record
+ rec arrow.RecordBatch
err error
cancelFn context.CancelFunc
@@ -542,7 +542,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
}
if ld.TotalRows() == 0 {
- return array.NewRecordReader(schema, []arrow.Record{})
+ return array.NewRecordReader(schema,
[]arrow.RecordBatch{})
}
bldr := array.NewRecordBuilder(alloc, schema)
@@ -554,7 +554,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
}
defer rec.Release()
- results := []arrow.Record{rec}
+ results := []arrow.RecordBatch{rec}
for _, b := range batches {
rdr, err := b.GetStream(ctx)
if err != nil {
@@ -626,7 +626,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
return array.NewRecordReader(schema, results)
}
- ch := make(chan arrow.Record, bufferSize)
+ ch := make(chan arrow.RecordBatch, bufferSize)
group, ctx := errgroup.WithContext(compute.WithAllocator(ctx, alloc))
ctx, cancelFn := context.WithCancel(ctx)
group.SetLimit(prefetchConcurrency)
@@ -638,7 +638,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
}
}()
- chs := make([]chan arrow.Record, len(batches))
+ chs := make([]chan arrow.RecordBatch, len(batches))
rdr := &reader{
refCount: 1,
chs: chs,
@@ -681,7 +681,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
}
for rr.Next() && ctx.Err() == nil {
- rec := rr.Record()
+ rec := rr.RecordBatch()
rec, err = recTransform(ctx, rec)
if err != nil {
return err
@@ -697,7 +697,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
go func() {
for i, b := range batches[1:] {
batch, batchIdx := b, i+1
- chs[batchIdx] = make(chan arrow.Record, bufferSize)
+ chs[batchIdx] = make(chan arrow.RecordBatch, bufferSize)
group.Go(func() (err error) {
// close channels (except the last) so that
Next can move on to the next channel properly
if batchIdx != lastChannelIndex {
@@ -719,7 +719,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
defer rr.Release()
for rr.Next() && ctx.Err() == nil {
- rec := rr.Record()
+ rec := rr.RecordBatch()
rec, err = recTransform(ctx, rec)
if err != nil {
return err
@@ -748,7 +748,11 @@ func (r *reader) Schema() *arrow.Schema {
return r.schema
}
-func (r *reader) Record() arrow.Record {
+func (r *reader) Record() arrow.RecordBatch {
+ return r.rec
+}
+
+func (r *reader) RecordBatch() arrow.RecordBatch {
return r.rec
}
@@ -793,3 +797,5 @@ func (r *reader) Release() {
}
}
}
+
+var _ array.RecordReader = (*reader)(nil)
diff --git a/go/adbc/driver/snowflake/statement.go
b/go/adbc/driver/snowflake/statement.go
index 71f04b253..6b7e09560 100644
--- a/go/adbc/driver/snowflake/statement.go
+++ b/go/adbc/driver/snowflake/statement.go
@@ -63,7 +63,7 @@ type statement struct {
ingestOptions *ingestOptions
queryTag string
- bound arrow.Record
+ bound arrow.RecordBatch
streamBind array.RecordReader
}
@@ -696,7 +696,7 @@ func (st *statement) SetSubstraitPlan(plan []byte) error {
// The driver will call release on the passed in Record when it is done,
// but it may not do this until the statement is closed or another
// record is bound.
-func (st *statement) Bind(_ context.Context, values arrow.Record) error {
+func (st *statement) Bind(_ context.Context, values arrow.RecordBatch) error {
if st.streamBind != nil {
st.streamBind.Release()
st.streamBind = nil
diff --git a/go/adbc/drivermgr/wrapper.go b/go/adbc/drivermgr/wrapper.go
index 518089c81..d2a8e029f 100644
--- a/go/adbc/drivermgr/wrapper.go
+++ b/go/adbc/drivermgr/wrapper.go
@@ -527,7 +527,7 @@ func (s *stmt) SetSubstraitPlan(plan []byte) error {
return &adbc.Error{Code: adbc.StatusNotImplemented}
}
-func (s *stmt) Bind(_ context.Context, values arrow.Record) error {
+func (s *stmt) Bind(_ context.Context, values arrow.RecordBatch) error {
var (
arr = C.allocArr()
schema C.struct_ArrowSchema
diff --git a/go/adbc/drivermgr/wrapper_sqlite_test.go
b/go/adbc/drivermgr/wrapper_sqlite_test.go
index 779371c22..0e0b3de0a 100644
--- a/go/adbc/drivermgr/wrapper_sqlite_test.go
+++ b/go/adbc/drivermgr/wrapper_sqlite_test.go
@@ -132,7 +132,7 @@ func (dm *DriverMgrSuite) TestGetObjects() {
dm.True(expSchema.Equal(rdr.Schema()))
dm.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
dm.Equal(int64(1), rec.NumRows())
expRec, _, err := array.RecordFromJSON(
memory.DefaultAllocator,
@@ -204,7 +204,7 @@ func (dm *DriverMgrSuite) TestGetObjectsDBSchema() {
dm.True(expSchema.Equal(rdr.Schema()))
dm.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
dm.Equal(int64(1), rec.NumRows())
expRec, _, err := array.RecordFromJSON(
memory.DefaultAllocator,
@@ -233,7 +233,7 @@ func (dm *DriverMgrSuite) TestGetObjectsTableName() {
dm.True(expSchema.Equal(rdr.Schema()))
dm.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
dm.Equal(int64(1), rec.NumRows())
expRec, _, err := array.RecordFromJSON(
memory.DefaultAllocator,
@@ -267,7 +267,7 @@ func (dm *DriverMgrSuite) TestGetObjectsColumnName() {
dm.True(expSchema.Equal(rdr.Schema()))
dm.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
dm.Equal(int64(1), rec.NumRows())
expRec, _, err := array.RecordFromJSON(
memory.DefaultAllocator,
@@ -320,7 +320,7 @@ func (dm *DriverMgrSuite) TestGetObjectsTableType() {
dm.True(expSchema.Equal(rdr.Schema()))
dm.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
dm.Equal(int64(1), rec.NumRows())
expRec, _, err := array.RecordFromJSON(
memory.DefaultAllocator,
@@ -384,7 +384,7 @@ func (dm *DriverMgrSuite) TestGetTableTypes() {
dm.True(expSchema.Equal(rdr.Schema()))
dm.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
dm.Equal(int64(2), rec.NumRows())
expTableTypes := []string{"table", "view"}
@@ -439,7 +439,7 @@ func (dm *DriverMgrSuite) TestSqlExecute() {
defer expRec.Release()
dm.True(rdr.Next())
- dm.Truef(array.RecordEqual(expRec, rdr.Record()), "expected: %s\ngot:
%s", expRec, rdr.Record())
+ dm.Truef(array.RecordEqual(expRec, rdr.RecordBatch()), "expected:
%s\ngot: %s", expRec, rdr.RecordBatch())
dm.False(rdr.Next())
}
@@ -480,7 +480,7 @@ func (dm *DriverMgrSuite) TestSqlPrepare() {
defer expRec.Release()
dm.True(rdr.Next())
- dm.Truef(array.RecordEqual(expRec, rdr.Record()), "expected: %s\ngot:
%s", expRec, rdr.Record())
+ dm.Truef(array.RecordEqual(expRec, rdr.RecordBatch()), "expected:
%s\ngot: %s", expRec, rdr.RecordBatch())
dm.False(rdr.Next())
}
@@ -511,7 +511,7 @@ func (dm *DriverMgrSuite) TestSqlPrepareMultipleParams() {
defer rdr.Release()
dm.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
dm.Truef(array.RecordEqual(params, rec), "expected: %s\ngot: %s",
params, rec)
dm.False(rdr.Next())
}
@@ -553,16 +553,16 @@ func (dm *DriverMgrSuite) TestBindStream() {
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
bldr.Field(1).(*array.StringBuilder).AppendValues([]string{"one",
"two", "three"}, nil)
- rec1 := bldr.NewRecord()
+ rec1 := bldr.NewRecordBatch()
defer rec1.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{4, 5, 6}, nil)
bldr.Field(1).(*array.StringBuilder).AppendValues([]string{"four",
"five", "six"}, nil)
- rec2 := bldr.NewRecord()
+ rec2 := bldr.NewRecordBatch()
defer rec2.Release()
- recsIn := []arrow.Record{rec1, rec2}
+ recsIn := []arrow.RecordBatch{rec1, rec2}
rdrIn, err := array.NewRecordReader(schema, recsIn)
dm.NoError(err)
@@ -572,9 +572,9 @@ func (dm *DriverMgrSuite) TestBindStream() {
dm.NoError(err)
defer rdrOut.Release()
- recsOut := make([]arrow.Record, 0)
+ recsOut := make([]arrow.RecordBatch, 0)
for rdrOut.Next() {
- rec := rdrOut.Record()
+ rec := rdrOut.RecordBatch()
rec.Retain()
defer rec.Release()
recsOut = append(recsOut, rec)
@@ -658,7 +658,7 @@ func (dm *DriverMgrSuite) TestIngestStream() {
dm.Require().NoError(err)
defer rec2.Release()
- rdr, err := array.NewRecordReader(schema, []arrow.Record{rec1, rec2})
+ rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec1,
rec2})
dm.Require().NoError(err)
defer rdr.Release()
@@ -678,7 +678,7 @@ func (dm *DriverMgrSuite) TestIngestStream() {
defer rdr2.Release()
dm.True(rdr2.Next(), "expected one row with the count")
- recCount := rdr2.Record()
+ recCount := rdr2.RecordBatch()
cntArr := recCount.Column(0).(*array.Int64)
dm.Equal(int64(5), cntArr.Value(0), "table should contain 5 rows")
dm.False(rdr2.Next(), "no more rows expected")
diff --git a/go/adbc/ext.go b/go/adbc/ext.go
index bea3d7e24..7fcbadcdb 100644
--- a/go/adbc/ext.go
+++ b/go/adbc/ext.go
@@ -208,7 +208,7 @@ func GetDriverInfo(ctx context.Context, cnxn Connection)
(DriverInfo, error) {
}
for stream.Next() {
- batch := stream.Record()
+ batch := stream.RecordBatch()
codeArr := batch.Column(0).(*array.Uint32)
unionArr := batch.Column(1).(*array.DenseUnion)
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index a3f90d1c3..6d977f903 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -23,7 +23,7 @@ toolchain go1.24.1
require (
cloud.google.com/go/bigquery v1.69.0
- github.com/apache/arrow-go/v18 v18.4.0
+ github.com/apache/arrow-go/v18 v18.4.1
github.com/bluele/gcache v0.0.2
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
@@ -102,7 +102,7 @@ require (
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c //
indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.18.0 // indirect
- github.com/klauspost/cpuid/v2 v2.2.11 // indirect
+ github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 //
indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
diff --git a/go/adbc/go.sum b/go/adbc/go.sum
index 36052c5e6..fb4cd3f94 100644
--- a/go/adbc/go.sum
+++ b/go/adbc/go.sum
@@ -46,8 +46,8 @@
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping
v0.51.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0=
github.com/andybalholm/brotli v1.2.0
h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod
h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
-github.com/apache/arrow-go/v18 v18.4.0
h1:/RvkGqH517iY8bZKc4FD5/kkdwXJGjxf28JIXbJ/oB0=
-github.com/apache/arrow-go/v18 v18.4.0/go.mod
h1:Aawvwhj8x2jURIzD9Moy72cF0FyJXOpkYpdmGRHcw14=
+github.com/apache/arrow-go/v18 v18.4.1
h1:q/jVkBWCJOB9reDgaIZIdruLQUb1kbkvOnOFezVH1C4=
+github.com/apache/arrow-go/v18 v18.4.1/go.mod
h1:tLyFubsAl17bvFdUAy24bsSvA/6ww95Iqi67fTpGu3E=
github.com/apache/arrow/go/v15 v15.0.2
h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE=
github.com/apache/arrow/go/v15 v15.0.2/go.mod
h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA=
github.com/apache/thrift v0.22.0
h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc=
@@ -158,8 +158,8 @@ github.com/klauspost/asmfmt v1.3.2
h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK
github.com/klauspost/asmfmt v1.3.2/go.mod
h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.18.0
h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod
h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
-github.com/klauspost/cpuid/v2 v2.2.11
h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
-github.com/klauspost/cpuid/v2 v2.2.11/go.mod
h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
+github.com/klauspost/cpuid/v2 v2.3.0
h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
+github.com/klauspost/cpuid/v2 v2.3.0/go.mod
h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod
h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
diff --git a/go/adbc/pkg/_tmpl/driver.go.tmpl b/go/adbc/pkg/_tmpl/driver.go.tmpl
index 70c078e59..d68170f2d 100644
--- a/go/adbc/pkg/_tmpl/driver.go.tmpl
+++ b/go/adbc/pkg/_tmpl/driver.go.tmpl
@@ -390,7 +390,7 @@ func {{.Prefix}}ArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.s
}
cStream := getFromHandle[cArrayStream](stream.private_data)
if cStream.rdr.Next() {
- cdata.ExportArrowRecordBatch(cStream.rdr.Record(),
toCdataArray(array), nil)
+ cdata.ExportArrowRecordBatch(cStream.rdr.RecordBatch(),
toCdataArray(array), nil)
return 0;
}
array.release = nil
diff --git a/go/adbc/pkg/bigquery/driver.go b/go/adbc/pkg/bigquery/driver.go
index 1c9157efa..31204a47e 100644
--- a/go/adbc/pkg/bigquery/driver.go
+++ b/go/adbc/pkg/bigquery/driver.go
@@ -385,7 +385,7 @@ func BigQueryArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.stru
}
cStream := getFromHandle[cArrayStream](stream.private_data)
if cStream.rdr.Next() {
- cdata.ExportArrowRecordBatch(cStream.rdr.Record(),
toCdataArray(array), nil)
+ cdata.ExportArrowRecordBatch(cStream.rdr.RecordBatch(),
toCdataArray(array), nil)
return 0
}
array.release = nil
diff --git a/go/adbc/pkg/flightsql/driver.go b/go/adbc/pkg/flightsql/driver.go
index ea2507374..defe870ff 100644
--- a/go/adbc/pkg/flightsql/driver.go
+++ b/go/adbc/pkg/flightsql/driver.go
@@ -393,7 +393,7 @@ func FlightSQLArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.str
}
cStream := getFromHandle[cArrayStream](stream.private_data)
if cStream.rdr.Next() {
- cdata.ExportArrowRecordBatch(cStream.rdr.Record(),
toCdataArray(array), nil)
+ cdata.ExportArrowRecordBatch(cStream.rdr.RecordBatch(),
toCdataArray(array), nil)
return 0
}
array.release = nil
diff --git a/go/adbc/pkg/panicdummy/driver.go b/go/adbc/pkg/panicdummy/driver.go
index 9f67857d1..f7b55eda1 100644
--- a/go/adbc/pkg/panicdummy/driver.go
+++ b/go/adbc/pkg/panicdummy/driver.go
@@ -393,7 +393,7 @@ func PanicDummyArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.st
}
cStream := getFromHandle[cArrayStream](stream.private_data)
if cStream.rdr.Next() {
- cdata.ExportArrowRecordBatch(cStream.rdr.Record(),
toCdataArray(array), nil)
+ cdata.ExportArrowRecordBatch(cStream.rdr.RecordBatch(),
toCdataArray(array), nil)
return 0
}
array.release = nil
diff --git a/go/adbc/pkg/snowflake/driver.go b/go/adbc/pkg/snowflake/driver.go
index ef282e4ac..30b9fe04f 100644
--- a/go/adbc/pkg/snowflake/driver.go
+++ b/go/adbc/pkg/snowflake/driver.go
@@ -393,7 +393,7 @@ func SnowflakeArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.str
}
cStream := getFromHandle[cArrayStream](stream.private_data)
if cStream.rdr.Next() {
- cdata.ExportArrowRecordBatch(cStream.rdr.Record(),
toCdataArray(array), nil)
+ cdata.ExportArrowRecordBatch(cStream.rdr.RecordBatch(),
toCdataArray(array), nil)
return 0
}
array.release = nil
diff --git a/go/adbc/sqldriver/driver.go b/go/adbc/sqldriver/driver.go
index 03b6eecb6..ad7e79730 100644
--- a/go/adbc/sqldriver/driver.go
+++ b/go/adbc/sqldriver/driver.go
@@ -528,7 +528,7 @@ func arrFromVal(val any, dt arrow.DataType) (arrow.Array,
error) {
return array.MakeFromData(data), nil
}
-func createBoundRecord(values []driver.NamedValue, schema *arrow.Schema)
(arrow.Record, error) {
+func createBoundRecord(values []driver.NamedValue, schema *arrow.Schema)
(arrow.RecordBatch, error) {
fields := make([]arrow.Field, len(values))
cols := make([]arrow.Array, len(values))
if schema == nil {
@@ -548,7 +548,7 @@ func createBoundRecord(values []driver.NamedValue, schema
*arrow.Schema) (arrow.
cols[v.Ordinal-1] = arr
}
- return array.NewRecord(arrow.NewSchema(fields, nil), cols, 1),
nil
+ return array.NewRecordBatch(arrow.NewSchema(fields, nil), cols,
1), nil
}
for _, v := range values {
@@ -572,7 +572,7 @@ func createBoundRecord(values []driver.NamedValue, schema
*arrow.Schema) (arrow.
f.Type = arr.DataType()
cols[idx] = arr
}
- return array.NewRecord(arrow.NewSchema(fields, nil), cols, 1), nil
+ return array.NewRecordBatch(arrow.NewSchema(fields, nil), cols, 1), nil
}
func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue)
(driver.Result, error) {
@@ -618,7 +618,7 @@ func (s *stmt) QueryContext(ctx context.Context, args
[]driver.NamedValue) (driv
type rows struct {
rdr array.RecordReader
curRow int64
- curRecord arrow.Record
+ curRecord arrow.RecordBatch
rowsAffected int64
stmt *stmt
}
@@ -654,7 +654,7 @@ func (r *rows) Next(dest []driver.Value) error {
}
return io.EOF
}
- r.curRecord = r.rdr.Record()
+ r.curRecord = r.rdr.RecordBatch()
r.curRow = 0
if r.curRecord.NumRows() == 0 {
r.curRecord = nil
diff --git a/go/adbc/sqldriver/driver_internals_test.go
b/go/adbc/sqldriver/driver_internals_test.go
index 0c42e5ac2..33a85062f 100644
--- a/go/adbc/sqldriver/driver_internals_test.go
+++ b/go/adbc/sqldriver/driver_internals_test.go
@@ -317,7 +317,7 @@ func TestNextRowTypes(t *testing.T) {
recordBuilder :=
array.NewRecordBuilder(memory.DefaultAllocator, schema)
t.Cleanup(recordBuilder.Release)
test.arrowValueFunc(t, recordBuilder.Field(0))
- record := recordBuilder.NewRecord()
+ record := recordBuilder.NewRecordBatch()
t.Cleanup(record.Release)
r := &rows{curRecord: record}
@@ -428,49 +428,49 @@ func TestArrFromVal(t *testing.T) {
value: tstampSec,
inputDataType: &arrow.TimestampType{Unit:
arrow.Second},
expectedDataType: &arrow.TimestampType{Unit:
arrow.Second},
- expectedStringValue:
testTime.UTC().Truncate(time.Second).Format("2006-01-02 15:04:05Z"),
+ expectedStringValue:
testTime.UTC().Truncate(time.Second).Format("2006-01-02T15:04:05Z"),
},
{
value: tstampMilli,
inputDataType: &arrow.TimestampType{Unit:
arrow.Millisecond},
expectedDataType: &arrow.TimestampType{Unit:
arrow.Millisecond},
- expectedStringValue:
testTime.UTC().Truncate(time.Millisecond).Format("2006-01-02 15:04:05.000Z"),
+ expectedStringValue:
testTime.UTC().Truncate(time.Millisecond).Format("2006-01-02T15:04:05.000Z"),
},
{
value: tstampMicro,
inputDataType: &arrow.TimestampType{Unit:
arrow.Microsecond},
expectedDataType: &arrow.TimestampType{Unit:
arrow.Microsecond},
- expectedStringValue:
testTime.UTC().Truncate(time.Microsecond).Format("2006-01-02 15:04:05.000000Z"),
+ expectedStringValue:
testTime.UTC().Truncate(time.Microsecond).Format("2006-01-02T15:04:05.000000Z"),
},
{
value: tstampNano,
inputDataType: &arrow.TimestampType{Unit:
arrow.Nanosecond},
expectedDataType: &arrow.TimestampType{Unit:
arrow.Nanosecond},
- expectedStringValue:
testTime.UTC().Truncate(time.Nanosecond).Format("2006-01-02
15:04:05.000000000Z"),
+ expectedStringValue:
testTime.UTC().Truncate(time.Nanosecond).Format("2006-01-02T15:04:05.000000000Z"),
},
{
value: testTime,
inputDataType: &arrow.TimestampType{Unit:
arrow.Second},
expectedDataType: &arrow.TimestampType{Unit:
arrow.Second},
- expectedStringValue:
testTime.UTC().Truncate(time.Second).Format("2006-01-02 15:04:05Z"),
+ expectedStringValue:
testTime.UTC().Truncate(time.Second).Format("2006-01-02T15:04:05Z"),
},
{
value: testTime,
inputDataType: &arrow.TimestampType{Unit:
arrow.Millisecond},
expectedDataType: &arrow.TimestampType{Unit:
arrow.Millisecond},
- expectedStringValue:
testTime.UTC().Truncate(time.Millisecond).Format("2006-01-02 15:04:05.000Z"),
+ expectedStringValue:
testTime.UTC().Truncate(time.Millisecond).Format("2006-01-02T15:04:05.000Z"),
},
{
value: testTime,
inputDataType: &arrow.TimestampType{Unit:
arrow.Microsecond},
expectedDataType: &arrow.TimestampType{Unit:
arrow.Microsecond},
- expectedStringValue:
testTime.UTC().Truncate(time.Microsecond).Format("2006-01-02 15:04:05.000000Z"),
+ expectedStringValue:
testTime.UTC().Truncate(time.Microsecond).Format("2006-01-02T15:04:05.000000Z"),
},
{
value: testTime,
inputDataType: &arrow.TimestampType{Unit:
arrow.Nanosecond},
expectedDataType: &arrow.TimestampType{Unit:
arrow.Nanosecond},
- expectedStringValue:
testTime.UTC().Truncate(time.Nanosecond).Format("2006-01-02
15:04:05.000000000Z"),
+ expectedStringValue:
testTime.UTC().Truncate(time.Nanosecond).Format("2006-01-02T15:04:05.000000000Z"),
},
}
for i, test := range tests {
diff --git a/go/adbc/validation/validation.go b/go/adbc/validation/validation.go
index fdd212796..39b91474d 100644
--- a/go/adbc/validation/validation.go
+++ b/go/adbc/validation/validation.go
@@ -71,7 +71,7 @@ type DriverQuirks interface {
// Expected Metadata responses
GetMetadata(adbc.InfoCode) interface{}
// Create a sample table from an arrow record
- CreateSampleTable(tableName string, r arrow.Record) error
+ CreateSampleTable(tableName string, r arrow.RecordBatch) error
// Field Metadata for Sample Table for comparison
SampleTableSchemaMetadata(tblName string, dt arrow.DataType)
arrow.Metadata
// have the driver drop a table with the correct SQL syntax
@@ -318,7 +318,7 @@ func (c *ConnectionTests) TestMetadataGetInfo() {
adbc.GetInfoSchema, rdr.Schema())
for rdr.Next() {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
codeCol := rec.Column(0).(*array.Uint32)
valUnion := rec.Column(1).(*array.DenseUnion)
for i := 0; i < int(rec.NumRows()); i++ {
@@ -562,7 +562,7 @@ func (c *ConnectionTests) TestMetadataGetObjectsColumns() {
c.Truef(adbc.GetObjectsSchema.Equal(rdr.Schema()),
"expected: %s\ngot: %s", adbc.GetObjectsSchema, rdr.Schema())
c.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
var (
foundCatalog = false
foundDbSchema = false
@@ -754,7 +754,7 @@ func (s *StatementTests) TestSqlPartitionedInts() {
s.Len(sc.Fields(), 1)
s.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
s.EqualValues(1, rec.NumCols())
s.EqualValues(1, rec.NumRows())
@@ -814,7 +814,7 @@ func (s *StatementTests) TestSQLPrepareSelectParams() {
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{42, -42, 0},
[]bool{true, true, false})
bldr.Field(1).(*array.StringBuilder).AppendValues([]string{"", "",
"bar"}, []bool{true, false, true})
- batch := bldr.NewRecord()
+ batch := bldr.NewRecordBatch()
defer batch.Release()
s.Require().NoError(stmt.Bind(s.ctx, batch))
@@ -825,7 +825,7 @@ func (s *StatementTests) TestSQLPrepareSelectParams() {
var nrows int64
for rdr.Next() {
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
s.Require().NotNil(rec)
s.EqualValues(2, rec.NumCols())
@@ -863,7 +863,7 @@ func (s *StatementTests) TestSQLPrepareSelectNoParams() {
s.Len(sc.Fields(), 1)
s.True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
s.EqualValues(1, rec.NumCols())
s.EqualValues(1, rec.NumRows())
@@ -895,7 +895,7 @@ func (s *StatementTests)
TestSqlPrepareErrorParamCountMismatch() {
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false})
- batch := batchbldr.NewRecord()
+ batch := batchbldr.NewRecordBatch()
defer batch.Release()
s.NoError(stmt.Bind(s.ctx, batch))
@@ -917,7 +917,7 @@ func (s *StatementTests) TestSqlIngestInts() {
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false})
- batch := batchbldr.NewRecord()
+ batch := batchbldr.NewRecordBatch()
defer batch.Release()
stmt, err := s.Cnxn.NewStatement()
@@ -944,7 +944,7 @@ func (s *StatementTests) TestSqlIngestInts() {
s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())),
"expected: %s\n got: %s", schema, rdr.Schema())
s.Require().True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
s.EqualValues(3, rec.NumRows())
s.EqualValues(1, rec.NumCols())
@@ -968,7 +968,7 @@ func (s *StatementTests) TestSqlIngestAppend() {
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42}, []bool{true})
- batch := batchbldr.NewRecord()
+ batch := batchbldr.NewRecordBatch()
defer batch.Release()
// ingest and create table
@@ -987,7 +987,7 @@ func (s *StatementTests) TestSqlIngestAppend() {
// now append
bldr.AppendValues([]int64{-42, 0}, []bool{true, false})
- batch2 := batchbldr.NewRecord()
+ batch2 := batchbldr.NewRecordBatch()
defer batch2.Release()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"bulk_ingest"))
@@ -1015,7 +1015,7 @@ func (s *StatementTests) TestSqlIngestAppend() {
s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())),
"expected: %s\n got: %s", schema, rdr.Schema())
s.Require().True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
s.EqualValues(3, rec.NumRows())
s.EqualValues(1, rec.NumCols())
@@ -1042,7 +1042,7 @@ func (s *StatementTests) TestSqlIngestReplace() {
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42}, []bool{true})
- batch := batchbldr.NewRecord()
+ batch := batchbldr.NewRecordBatch()
defer batch.Release()
// ingest and create table
@@ -1066,7 +1066,7 @@ func (s *StatementTests) TestSqlIngestReplace() {
defer batchbldr2.Release()
bldr2 := batchbldr2.Field(0).(*array.Int64Builder)
bldr2.AppendValues([]int64{42}, []bool{true})
- batch2 := batchbldr2.NewRecord()
+ batch2 := batchbldr2.NewRecordBatch()
defer batch2.Release()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"bulk_ingest"))
@@ -1089,7 +1089,7 @@ func (s *StatementTests) TestSqlIngestReplace() {
s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())),
"expected: %s\n got: %s", schema, rdr.Schema())
s.Require().True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
s.EqualValues(1, rec.NumRows())
s.EqualValues(1, rec.NumCols())
col, ok := rec.Column(0).(*array.Int64)
@@ -1114,7 +1114,7 @@ func (s *StatementTests) TestSqlIngestCreateAppend() {
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42}, []bool{true})
- batch := batchbldr.NewRecord()
+ batch := batchbldr.NewRecordBatch()
defer batch.Release()
// ingest and create table
@@ -1154,7 +1154,7 @@ func (s *StatementTests) TestSqlIngestCreateAppend() {
s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())),
"expected: %s\n got: %s", schema, rdr.Schema())
s.Require().True(rdr.Next())
- rec := rdr.Record()
+ rec := rdr.RecordBatch()
s.EqualValues(2, rec.NumRows())
s.EqualValues(1, rec.NumCols())
col, ok := rec.Column(0).(*array.Int64)
@@ -1199,7 +1199,7 @@ func (s *StatementTests) TestSqlIngestErrors() {
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true,
false})
- batch := batchbldr.NewRecord()
+ batch := batchbldr.NewRecordBatch()
defer batch.Release()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"bulk_ingest"))
@@ -1227,7 +1227,7 @@ func (s *StatementTests) TestSqlIngestErrors() {
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true,
false})
- batch := batchbldr.NewRecord()
+ batch := batchbldr.NewRecordBatch()
defer batch.Release()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"bulk_ingest"))
@@ -1252,7 +1252,7 @@ func (s *StatementTests) TestSqlIngestErrors() {
defer batchbldr.Release()
batchbldr.Field(0).AppendNull()
batchbldr.Field(1).AppendNull()
- batch = batchbldr.NewRecord()
+ batch = batchbldr.NewRecordBatch()
defer batch.Release()
if
!s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeCreate) {