This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 75e392744 fix(go/adbc/driver/snowflake): Records dropped on ingestion
when empty batch is present (#1866)
75e392744 is described below
commit 75e3927444cb48f90769f198af118a8b20c0fae2
Author: Joel Lubinitsky <[email protected]>
AuthorDate: Tue May 21 11:54:14 2024 -0400
fix(go/adbc/driver/snowflake): Records dropped on ingestion when empty
batch is present (#1866)
Reproduces and fixes: #1847
Parquet files with empty row groups are valid per the spec, but
Snowflake does not currently handle them properly. To mitigate this we
buffer writes to the parquet file so that a row group is not written
until some amount of data has been received.
The CheckedAllocator was enabled for all tests as part of this fix,
which detected a leak in the BufferWriter that was fixed in:
[https://github.com/apache/arrow/pull/41698](https://github.com/apache/arrow/pull/41698).
There was an unrelated test failure that surfaced once the
CheckedAllocator was enabled which had to do with casting decimals of
certain precision. The fix is included in this PR as well.
---
go/adbc/driver/snowflake/bulk_ingestion.go | 2 +-
go/adbc/driver/snowflake/driver_test.go | 111 +++++++++++++++--------------
go/adbc/driver/snowflake/record_reader.go | 1 +
go/adbc/go.mod | 2 +-
go/adbc/go.sum | 4 +-
5 files changed, 64 insertions(+), 56 deletions(-)
diff --git a/go/adbc/driver/snowflake/bulk_ingestion.go
b/go/adbc/driver/snowflake/bulk_ingestion.go
index 17d76195d..2e18428bd 100644
--- a/go/adbc/driver/snowflake/bulk_ingestion.go
+++ b/go/adbc/driver/snowflake/bulk_ingestion.go
@@ -342,7 +342,7 @@ func writeParquet(
defer pqWriter.Close()
for rec := range in {
- err = pqWriter.Write(rec)
+ err = pqWriter.WriteBuffered(rec)
rec.Release()
if err != nil {
return err
diff --git a/go/adbc/driver/snowflake/driver_test.go
b/go/adbc/driver/snowflake/driver_test.go
index af94e6108..abc738306 100644
--- a/go/adbc/driver/snowflake/driver_test.go
+++ b/go/adbc/driver/snowflake/driver_test.go
@@ -325,19 +325,14 @@ type SnowflakeTests struct {
stmt adbc.Statement
}
-func (suite *SnowflakeTests) SetupSuite() {
+func (suite *SnowflakeTests) SetupTest() {
var err error
suite.ctx = context.Background()
suite.driver = suite.Quirks.SetupDriver(suite.T())
suite.db, err = suite.driver.NewDatabase(suite.Quirks.DatabaseOptions())
suite.NoError(err)
-}
-
-func (suite *SnowflakeTests) SetupTest() {
- var err error
suite.cnxn, err = suite.db.Open(suite.ctx)
suite.NoError(err)
-
suite.stmt, err = suite.cnxn.NewStatement()
suite.NoError(err)
}
@@ -345,11 +340,11 @@ func (suite *SnowflakeTests) SetupTest() {
func (suite *SnowflakeTests) TearDownTest() {
suite.NoError(suite.stmt.Close())
suite.NoError(suite.cnxn.Close())
-}
-
-func (suite *SnowflakeTests) TearDownSuite() {
+ suite.Quirks.TearDownDriver(suite.T(), suite.driver)
+ suite.cnxn = nil
suite.NoError(suite.db.Close())
suite.db = nil
+ suite.driver = nil
}
func (suite *SnowflakeTests) TestSqlIngestTimestamp() {
@@ -409,9 +404,6 @@ func (suite *SnowflakeTests)
TestSqlIngestRecordAndStreamAreEquivalent() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_bind"))
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_bind_stream"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sc := arrow.NewSchema([]arrow.Field{
{
Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -467,7 +459,7 @@ func (suite *SnowflakeTests)
TestSqlIngestRecordAndStreamAreEquivalent() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{-1, 0, 25},
nil)
@@ -538,9 +530,6 @@ func (suite *SnowflakeTests)
TestSqlIngestRecordAndStreamAreEquivalent() {
func (suite *SnowflakeTests) TestSqlIngestRoundtripTypes() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_roundtrip"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sc := arrow.NewSchema([]arrow.Field{
{
Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -584,7 +573,7 @@ func (suite *SnowflakeTests) TestSqlIngestRoundtripTypes() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{-1, 0, 25},
nil)
@@ -624,9 +613,6 @@ func (suite *SnowflakeTests) TestSqlIngestRoundtripTypes() {
func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_timestamps"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sessionTimezone := "America/Phoenix"
suite.Require().NoError(suite.stmt.SetSqlQuery(fmt.Sprintf(`ALTER
SESSION SET TIMEZONE = "%s"`, sessionTimezone)))
_, err := suite.stmt.ExecuteUpdate(suite.ctx)
@@ -663,7 +649,7 @@ func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -723,7 +709,7 @@ func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
},
}, nil)
- expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema,
bytes.NewReader([]byte(`
+ expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(),
expectedSchema, bytes.NewReader([]byte(`
[
{
"col_int64": 1,
@@ -766,9 +752,6 @@ func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_date64"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sc := arrow.NewSchema([]arrow.Field{
{
Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -780,7 +763,7 @@ func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -815,7 +798,7 @@ func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
},
}, nil)
- expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema,
bytes.NewReader([]byte(`
+ expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(),
expectedSchema, bytes.NewReader([]byte(`
[
{
"col_int64": 1,
@@ -843,9 +826,6 @@ func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_high_precision"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sc := arrow.NewSchema([]arrow.Field{
{
Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -865,7 +845,7 @@ func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -920,7 +900,7 @@ func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
},
}, nil)
- expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema,
bytes.NewReader([]byte(`
+ expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(),
expectedSchema, bytes.NewReader([]byte(`
[
{
"col_int64": 1,
@@ -954,9 +934,6 @@ func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_high_precision"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sc := arrow.NewSchema([]arrow.Field{
{
Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -976,7 +953,7 @@ func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -1028,7 +1005,7 @@ func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
},
}, nil)
- expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema,
bytes.NewReader([]byte(`
+ expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(),
expectedSchema, bytes.NewReader([]byte(`
[
{
"col_int64": 1,
@@ -1062,9 +1039,6 @@ func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
func (suite *SnowflakeTests) TestSqlIngestStructType() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_struct"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sc := arrow.NewSchema([]arrow.Field{
{
Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -1089,7 +1063,7 @@ func (suite *SnowflakeTests) TestSqlIngestStructType() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -1141,7 +1115,7 @@ func (suite *SnowflakeTests) TestSqlIngestStructType() {
},
}, nil)
- expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema,
bytes.NewReader([]byte(`
+ expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(),
expectedSchema, bytes.NewReader([]byte(`
[
{
"col_int64": 1,
@@ -1177,9 +1151,6 @@ func (suite *SnowflakeTests) TestSqlIngestStructType() {
func (suite *SnowflakeTests) TestSqlIngestMapType() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_map"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sc := arrow.NewSchema([]arrow.Field{
{
Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -1191,7 +1162,7 @@ func (suite *SnowflakeTests) TestSqlIngestMapType() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -1241,7 +1212,7 @@ func (suite *SnowflakeTests) TestSqlIngestMapType() {
},
}, nil)
- expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema,
bytes.NewReader([]byte(`
+ expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(),
expectedSchema, bytes.NewReader([]byte(`
[
{
"col_int64": 1,
@@ -1272,9 +1243,6 @@ func (suite *SnowflakeTests) TestSqlIngestMapType() {
func (suite *SnowflakeTests) TestSqlIngestListType() {
suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_list"))
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(suite.T(), 0)
-
sc := arrow.NewSchema([]arrow.Field{
{
Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -1286,7 +1254,7 @@ func (suite *SnowflakeTests) TestSqlIngestListType() {
},
}, nil)
- bldr := array.NewRecordBuilder(mem, sc)
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -1330,7 +1298,7 @@ func (suite *SnowflakeTests) TestSqlIngestListType() {
},
}, nil)
- expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema,
bytes.NewReader([]byte(`
+ expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(),
expectedSchema, bytes.NewReader([]byte(`
[
{
"col_int64": 1,
@@ -1751,6 +1719,8 @@ func (suite *SnowflakeTests)
TestNonIntDecimalLowPrecision() {
value := rec.Column(0).(*array.Float64).Value(0)
difference := math.Abs(number - value)
suite.Truef(difference < 1e-13, "expected %f, got %f",
number, value)
+
+ suite.False(rdr.Next())
}
}
}
@@ -1807,6 +1777,7 @@ func (suite *SnowflakeTests) TestAdditionalDriverInfo() {
},
)
suite.Require().NoError(err)
+ defer rdr.Release()
var totalRows int64
for rdr.Next() {
@@ -2050,3 +2021,39 @@ func (suite *SnowflakeTests) TestEmptyResultSet() {
suite.Equal(n, recv)
suite.Equal(recv, int64(0))
}
+
+func (suite *SnowflakeTests) TestIngestEmptyChunk() {
+ suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn,
"bulk_ingest_empty_chunk"))
+
+ sc := arrow.NewSchema([]arrow.Field{
+ {
+ Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
+ Nullable: true,
+ },
+ }, nil)
+
+ bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
+ defer bldr.Release()
+
+ emptyRec := bldr.NewRecord()
+ defer emptyRec.Release()
+
+ bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
+
+ rec := bldr.NewRecord()
+ defer rec.Release()
+
+ // See https://github.com/apache/arrow-adbc/issues/1847
+ // Snowflake does not properly handle empty row groups, so need to make
sure we don't send any.
+ rdr, err := array.NewRecordReader(sc, []arrow.Record{emptyRec, rec})
+ suite.Require().NoError(err)
+ defer rdr.Release()
+
+ suite.Require().NoError(suite.stmt.BindStream(suite.ctx, rdr))
+
suite.Require().NoError(suite.stmt.SetOption(adbc.OptionKeyIngestTargetTable,
"bulk_ingest_empty_chunk"))
+
suite.Require().NoError(suite.stmt.SetOption(driver.OptionStatementIngestWriterConcurrency,
"1"))
+
+ n, err := suite.stmt.ExecuteUpdate(suite.ctx)
+ suite.Require().NoError(err)
+ suite.EqualValues(int64(3), n)
+}
diff --git a/go/adbc/driver/snowflake/record_reader.go
b/go/adbc/driver/snowflake/record_reader.go
index e404f116d..d3b6f7c3b 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -123,6 +123,7 @@ func getTransformer(sc *arrow.Schema, ld
gosnowflake.ArrowStreamLoader, useHighP
if err != nil {
return
nil, err
}
+ defer
result.Release()
return
compute.CastArray(ctx, result, compute.UnsafeCastOptions(f.Type))
}
} else {
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index 8a1fecb60..206d97e8f 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -20,7 +20,7 @@ module github.com/apache/arrow-adbc/go/adbc
go 1.21
require (
- github.com/apache/arrow/go/v17 v17.0.0-20240503231747-7cd9c6fbd313
+ github.com/apache/arrow/go/v17 v17.0.0-20240520131450-cc3e2db30094
github.com/bluele/gcache v0.0.2
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
diff --git a/go/adbc/go.sum b/go/adbc/go.sum
index 1f6db2e83..b466e403d 100644
--- a/go/adbc/go.sum
+++ b/go/adbc/go.sum
@@ -20,8 +20,8 @@ github.com/andybalholm/brotli v1.1.0
h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1
github.com/andybalholm/brotli v1.1.0/go.mod
h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/apache/arrow/go/v15 v15.0.0
h1:1zZACWf85oEZY5/kd9dsQS7i+2G5zVQcbKTHgslqHNA=
github.com/apache/arrow/go/v15 v15.0.0/go.mod
h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA=
-github.com/apache/arrow/go/v17 v17.0.0-20240503231747-7cd9c6fbd313
h1:wnD2WBKoiH6iuEuhg33RsaslZ6aqfrviadRza3bNJZ4=
-github.com/apache/arrow/go/v17 v17.0.0-20240503231747-7cd9c6fbd313/go.mod
h1:jeCSgGamSUiG483VAAaKkPn5wa/dTCVrSmCzF6PUlEo=
+github.com/apache/arrow/go/v17 v17.0.0-20240520131450-cc3e2db30094
h1:DnkS2LPX69st/7BvQVwtGUcLR9RTkrVrdlYnMm89AxY=
+github.com/apache/arrow/go/v17 v17.0.0-20240520131450-cc3e2db30094/go.mod
h1:GLRwak999pJQN7WbKiL5F6OOOq046IOQ/HduXhjTaUo=
github.com/apache/thrift v0.20.0
h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI=
github.com/apache/thrift v0.20.0/go.mod
h1:hOk1BQqcp2OLzGsyVXdfMk7YFlMxK3aoEVhjD06QhB8=
github.com/aws/aws-sdk-go-v2 v1.25.1
h1:P7hU6A5qEdmajGwvae/zDkOq+ULLC9tQBTwqqiwFGpI=