This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 75e392744 fix(go/adbc/driver/snowflake): Records dropped on ingestion 
when empty batch is present (#1866)
75e392744 is described below

commit 75e3927444cb48f90769f198af118a8b20c0fae2
Author: Joel Lubinitsky <[email protected]>
AuthorDate: Tue May 21 11:54:14 2024 -0400

    fix(go/adbc/driver/snowflake): Records dropped on ingestion when empty 
batch is present (#1866)
    
    Reproduces and fixes: #1847
    
    Parquet files with empty row groups are valid per the spec, but
    Snowflake does not currently handle them properly. To mitigate this we
    buffer writes to the parquet file so that a row group is not written
    until some amount of data has been received.
    
    The CheckedAllocator was enabled for all tests as part of this fix,
    which detected a leak in the BufferWriter that was fixed in:
    
[https://github.com/apache/arrow/pull/41698](https://github.com/apache/arrow/pull/41698).
    
    There was an unrelated test failure that surfaced once the
    CheckedAllocator was enabled which had to do with casting decimals of
    certain precision. The fix is included in this PR as well.
---
 go/adbc/driver/snowflake/bulk_ingestion.go |   2 +-
 go/adbc/driver/snowflake/driver_test.go    | 111 +++++++++++++++--------------
 go/adbc/driver/snowflake/record_reader.go  |   1 +
 go/adbc/go.mod                             |   2 +-
 go/adbc/go.sum                             |   4 +-
 5 files changed, 64 insertions(+), 56 deletions(-)

diff --git a/go/adbc/driver/snowflake/bulk_ingestion.go 
b/go/adbc/driver/snowflake/bulk_ingestion.go
index 17d76195d..2e18428bd 100644
--- a/go/adbc/driver/snowflake/bulk_ingestion.go
+++ b/go/adbc/driver/snowflake/bulk_ingestion.go
@@ -342,7 +342,7 @@ func writeParquet(
        defer pqWriter.Close()
 
        for rec := range in {
-               err = pqWriter.Write(rec)
+               err = pqWriter.WriteBuffered(rec)
                rec.Release()
                if err != nil {
                        return err
diff --git a/go/adbc/driver/snowflake/driver_test.go 
b/go/adbc/driver/snowflake/driver_test.go
index af94e6108..abc738306 100644
--- a/go/adbc/driver/snowflake/driver_test.go
+++ b/go/adbc/driver/snowflake/driver_test.go
@@ -325,19 +325,14 @@ type SnowflakeTests struct {
        stmt   adbc.Statement
 }
 
-func (suite *SnowflakeTests) SetupSuite() {
+func (suite *SnowflakeTests) SetupTest() {
        var err error
        suite.ctx = context.Background()
        suite.driver = suite.Quirks.SetupDriver(suite.T())
        suite.db, err = suite.driver.NewDatabase(suite.Quirks.DatabaseOptions())
        suite.NoError(err)
-}
-
-func (suite *SnowflakeTests) SetupTest() {
-       var err error
        suite.cnxn, err = suite.db.Open(suite.ctx)
        suite.NoError(err)
-
        suite.stmt, err = suite.cnxn.NewStatement()
        suite.NoError(err)
 }
@@ -345,11 +340,11 @@ func (suite *SnowflakeTests) SetupTest() {
 func (suite *SnowflakeTests) TearDownTest() {
        suite.NoError(suite.stmt.Close())
        suite.NoError(suite.cnxn.Close())
-}
-
-func (suite *SnowflakeTests) TearDownSuite() {
+       suite.Quirks.TearDownDriver(suite.T(), suite.driver)
+       suite.cnxn = nil
        suite.NoError(suite.db.Close())
        suite.db = nil
+       suite.driver = nil
 }
 
 func (suite *SnowflakeTests) TestSqlIngestTimestamp() {
@@ -409,9 +404,6 @@ func (suite *SnowflakeTests) 
TestSqlIngestRecordAndStreamAreEquivalent() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_bind"))
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_bind_stream"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sc := arrow.NewSchema([]arrow.Field{
                {
                        Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -467,7 +459,7 @@ func (suite *SnowflakeTests) 
TestSqlIngestRecordAndStreamAreEquivalent() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{-1, 0, 25}, 
nil)
@@ -538,9 +530,6 @@ func (suite *SnowflakeTests) 
TestSqlIngestRecordAndStreamAreEquivalent() {
 func (suite *SnowflakeTests) TestSqlIngestRoundtripTypes() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_roundtrip"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sc := arrow.NewSchema([]arrow.Field{
                {
                        Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -584,7 +573,7 @@ func (suite *SnowflakeTests) TestSqlIngestRoundtripTypes() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{-1, 0, 25}, 
nil)
@@ -624,9 +613,6 @@ func (suite *SnowflakeTests) TestSqlIngestRoundtripTypes() {
 func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_timestamps"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sessionTimezone := "America/Phoenix"
        suite.Require().NoError(suite.stmt.SetSqlQuery(fmt.Sprintf(`ALTER 
SESSION SET TIMEZONE = "%s"`, sessionTimezone)))
        _, err := suite.stmt.ExecuteUpdate(suite.ctx)
@@ -663,7 +649,7 @@ func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -723,7 +709,7 @@ func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
                },
        }, nil)
 
-       expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema, 
bytes.NewReader([]byte(`
+       expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(), 
expectedSchema, bytes.NewReader([]byte(`
        [
                {
                        "col_int64": 1,
@@ -766,9 +752,6 @@ func (suite *SnowflakeTests) TestSqlIngestTimestampTypes() {
 func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_date64"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sc := arrow.NewSchema([]arrow.Field{
                {
                        Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -780,7 +763,7 @@ func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -815,7 +798,7 @@ func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
                },
        }, nil)
 
-       expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema, 
bytes.NewReader([]byte(`
+       expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(), 
expectedSchema, bytes.NewReader([]byte(`
        [
                {
                        "col_int64": 1,
@@ -843,9 +826,6 @@ func (suite *SnowflakeTests) TestSqlIngestDate64Type() {
 func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_high_precision"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sc := arrow.NewSchema([]arrow.Field{
                {
                        Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -865,7 +845,7 @@ func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -920,7 +900,7 @@ func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
                },
        }, nil)
 
-       expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema, 
bytes.NewReader([]byte(`
+       expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(), 
expectedSchema, bytes.NewReader([]byte(`
        [
                {
                        "col_int64": 1,
@@ -954,9 +934,6 @@ func (suite *SnowflakeTests) TestSqlIngestHighPrecision() {
 func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_high_precision"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sc := arrow.NewSchema([]arrow.Field{
                {
                        Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -976,7 +953,7 @@ func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -1028,7 +1005,7 @@ func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
                },
        }, nil)
 
-       expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema, 
bytes.NewReader([]byte(`
+       expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(), 
expectedSchema, bytes.NewReader([]byte(`
        [
                {
                        "col_int64": 1,
@@ -1062,9 +1039,6 @@ func (suite *SnowflakeTests) TestSqlIngestLowPrecision() {
 func (suite *SnowflakeTests) TestSqlIngestStructType() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_struct"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sc := arrow.NewSchema([]arrow.Field{
                {
                        Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -1089,7 +1063,7 @@ func (suite *SnowflakeTests) TestSqlIngestStructType() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -1141,7 +1115,7 @@ func (suite *SnowflakeTests) TestSqlIngestStructType() {
                },
        }, nil)
 
-       expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema, 
bytes.NewReader([]byte(`
+       expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(), 
expectedSchema, bytes.NewReader([]byte(`
        [
                {
                        "col_int64": 1,
@@ -1177,9 +1151,6 @@ func (suite *SnowflakeTests) TestSqlIngestStructType() {
 func (suite *SnowflakeTests) TestSqlIngestMapType() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_map"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sc := arrow.NewSchema([]arrow.Field{
                {
                        Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -1191,7 +1162,7 @@ func (suite *SnowflakeTests) TestSqlIngestMapType() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -1241,7 +1212,7 @@ func (suite *SnowflakeTests) TestSqlIngestMapType() {
                },
        }, nil)
 
-       expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema, 
bytes.NewReader([]byte(`
+       expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(), 
expectedSchema, bytes.NewReader([]byte(`
        [
                {
                        "col_int64": 1,
@@ -1272,9 +1243,6 @@ func (suite *SnowflakeTests) TestSqlIngestMapType() {
 func (suite *SnowflakeTests) TestSqlIngestListType() {
        suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_list"))
 
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(suite.T(), 0)
-
        sc := arrow.NewSchema([]arrow.Field{
                {
                        Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
@@ -1286,7 +1254,7 @@ func (suite *SnowflakeTests) TestSqlIngestListType() {
                },
        }, nil)
 
-       bldr := array.NewRecordBuilder(mem, sc)
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
        defer bldr.Release()
 
        bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
@@ -1330,7 +1298,7 @@ func (suite *SnowflakeTests) TestSqlIngestListType() {
                },
        }, nil)
 
-       expectedRecord, _, err := array.RecordFromJSON(mem, expectedSchema, 
bytes.NewReader([]byte(`
+       expectedRecord, _, err := array.RecordFromJSON(suite.Quirks.Alloc(), 
expectedSchema, bytes.NewReader([]byte(`
        [
                {
                        "col_int64": 1,
@@ -1751,6 +1719,8 @@ func (suite *SnowflakeTests) 
TestNonIntDecimalLowPrecision() {
                        value := rec.Column(0).(*array.Float64).Value(0)
                        difference := math.Abs(number - value)
                        suite.Truef(difference < 1e-13, "expected %f, got %f", 
number, value)
+
+                       suite.False(rdr.Next())
                }
        }
 }
@@ -1807,6 +1777,7 @@ func (suite *SnowflakeTests) TestAdditionalDriverInfo() {
                },
        )
        suite.Require().NoError(err)
+       defer rdr.Release()
 
        var totalRows int64
        for rdr.Next() {
@@ -2050,3 +2021,39 @@ func (suite *SnowflakeTests) TestEmptyResultSet() {
        suite.Equal(n, recv)
        suite.Equal(recv, int64(0))
 }
+
+func (suite *SnowflakeTests) TestIngestEmptyChunk() {
+       suite.Require().NoError(suite.Quirks.DropTable(suite.cnxn, 
"bulk_ingest_empty_chunk"))
+
+       sc := arrow.NewSchema([]arrow.Field{
+               {
+                       Name: "col_int64", Type: arrow.PrimitiveTypes.Int64,
+                       Nullable: true,
+               },
+       }, nil)
+
+       bldr := array.NewRecordBuilder(suite.Quirks.Alloc(), sc)
+       defer bldr.Release()
+
+       emptyRec := bldr.NewRecord()
+       defer emptyRec.Release()
+
+       bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
+
+       rec := bldr.NewRecord()
+       defer rec.Release()
+
+       // See https://github.com/apache/arrow-adbc/issues/1847
+       // Snowflake does not properly handle empty row groups, so need to make 
sure we don't send any.
+       rdr, err := array.NewRecordReader(sc, []arrow.Record{emptyRec, rec})
+       suite.Require().NoError(err)
+       defer rdr.Release()
+
+       suite.Require().NoError(suite.stmt.BindStream(suite.ctx, rdr))
+       
suite.Require().NoError(suite.stmt.SetOption(adbc.OptionKeyIngestTargetTable, 
"bulk_ingest_empty_chunk"))
+       
suite.Require().NoError(suite.stmt.SetOption(driver.OptionStatementIngestWriterConcurrency,
 "1"))
+
+       n, err := suite.stmt.ExecuteUpdate(suite.ctx)
+       suite.Require().NoError(err)
+       suite.EqualValues(int64(3), n)
+}
diff --git a/go/adbc/driver/snowflake/record_reader.go 
b/go/adbc/driver/snowflake/record_reader.go
index e404f116d..d3b6f7c3b 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -123,6 +123,7 @@ func getTransformer(sc *arrow.Schema, ld 
gosnowflake.ArrowStreamLoader, useHighP
                                                                if err != nil {
                                                                        return 
nil, err
                                                                }
+                                                               defer 
result.Release()
                                                                return 
compute.CastArray(ctx, result, compute.UnsafeCastOptions(f.Type))
                                                        }
                                                } else {
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index 8a1fecb60..206d97e8f 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -20,7 +20,7 @@ module github.com/apache/arrow-adbc/go/adbc
 go 1.21
 
 require (
-       github.com/apache/arrow/go/v17 v17.0.0-20240503231747-7cd9c6fbd313
+       github.com/apache/arrow/go/v17 v17.0.0-20240520131450-cc3e2db30094
        github.com/bluele/gcache v0.0.2
        github.com/golang/protobuf v1.5.4
        github.com/google/uuid v1.6.0
diff --git a/go/adbc/go.sum b/go/adbc/go.sum
index 1f6db2e83..b466e403d 100644
--- a/go/adbc/go.sum
+++ b/go/adbc/go.sum
@@ -20,8 +20,8 @@ github.com/andybalholm/brotli v1.1.0 
h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1
 github.com/andybalholm/brotli v1.1.0/go.mod 
h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
 github.com/apache/arrow/go/v15 v15.0.0 
h1:1zZACWf85oEZY5/kd9dsQS7i+2G5zVQcbKTHgslqHNA=
 github.com/apache/arrow/go/v15 v15.0.0/go.mod 
h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA=
-github.com/apache/arrow/go/v17 v17.0.0-20240503231747-7cd9c6fbd313 
h1:wnD2WBKoiH6iuEuhg33RsaslZ6aqfrviadRza3bNJZ4=
-github.com/apache/arrow/go/v17 v17.0.0-20240503231747-7cd9c6fbd313/go.mod 
h1:jeCSgGamSUiG483VAAaKkPn5wa/dTCVrSmCzF6PUlEo=
+github.com/apache/arrow/go/v17 v17.0.0-20240520131450-cc3e2db30094 
h1:DnkS2LPX69st/7BvQVwtGUcLR9RTkrVrdlYnMm89AxY=
+github.com/apache/arrow/go/v17 v17.0.0-20240520131450-cc3e2db30094/go.mod 
h1:GLRwak999pJQN7WbKiL5F6OOOq046IOQ/HduXhjTaUo=
 github.com/apache/thrift v0.20.0 
h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI=
 github.com/apache/thrift v0.20.0/go.mod 
h1:hOk1BQqcp2OLzGsyVXdfMk7YFlMxK3aoEVhjD06QhB8=
 github.com/aws/aws-sdk-go-v2 v1.25.1 
h1:P7hU6A5qEdmajGwvae/zDkOq+ULLC9tQBTwqqiwFGpI=

Reply via email to