This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ad38d03 fix(parquet/file): use adaptive batch sizing to avoid panic 
(#690)
3ad38d03 is described below

commit 3ad38d03dbc85283f61d60ba2bd8ae8492f0972a
Author: Matt Topol <[email protected]>
AuthorDate: Mon Mar 9 00:35:41 2026 -0400

    fix(parquet/file): use adaptive batch sizing to avoid panic (#690)
    
    ### Rationale for this change
    Issue reported in
    https://github.com/apache/arrow-go/issues/622#issuecomment-3994896058
    where accumulated data on a given page exceeds the DataPageSize.
    
    ### What changes are included in this PR?
    Removing a broken mid-batch flush in `writeValues`/`writeValuesSpaced`,
    instead relying back on `encoder.Put()/encoder.PutSpaced()`.
    
    Updated `WriteBatch` to use an adaptive batch sizing approach for
    ByteArray/FLBA writing to properly handle v2 data page row-boundary
    alignment without breaking on very large individual values.
    
    ### Are these changes tested?
    New tests are added to cover this scenario to ensure test coverage.
    
    ### Are there any user-facing changes?
    No
---
 parquet/file/column_writer.go                |  11 +-
 parquet/file/column_writer_types.gen.go      | 362 +++++++++++++--------------
 parquet/file/column_writer_types.gen.go.tmpl | 218 +++++++++-------
 parquet/file/large_value_test.go             | 271 ++++++++++++++++----
 4 files changed, 532 insertions(+), 330 deletions(-)

diff --git a/parquet/file/column_writer.go b/parquet/file/column_writer.go
index 8d35aaa3..acfe69a1 100644
--- a/parquet/file/column_writer.go
+++ b/parquet/file/column_writer.go
@@ -19,7 +19,9 @@ package file
 import (
        "bytes"
        "encoding/binary"
+       "fmt"
        "io"
+       "math"
        "strconv"
 
        "github.com/apache/arrow-go/v18/arrow"
@@ -303,7 +305,12 @@ func (w *columnWriter) FlushCurrentPage() error {
                repLevelsRLESize = int32(w.repLevelSink.Len())
        }
 
-       uncompressed := defLevelsRLESize + repLevelsRLESize + 
int32(values.Len())
+       uncompressed64 := int64(defLevelsRLESize) + int64(repLevelsRLESize) + 
int64(values.Len())
+       if uncompressed64 > math.MaxInt32 {
+               return fmt.Errorf("parquet: uncompressed page size %d exceeds 
INT32_MAX (%d)",
+                       uncompressed64, int64(math.MaxInt32))
+       }
+       uncompressed := int32(uncompressed64)
        if isV1DataPage {
                err = w.buildDataPageV1(defLevelsRLESize, repLevelsRLESize, 
uncompressed, values.Bytes())
        } else {
@@ -378,7 +385,7 @@ func (w *columnWriter) buildDataPageV2(defLevelsRLESize, 
repLevelsRLESize, uncom
 
        // concatenate uncompressed levels and the possibly compressed values
        var combined bytes.Buffer
-       combined.Grow(int(defLevelsRLESize + repLevelsRLESize + 
int32(len(data))))
+       combined.Grow(int(int64(defLevelsRLESize) + int64(repLevelsRLESize) + 
int64(len(data))))
        w.concatBuffers(defLevelsRLESize, repLevelsRLESize, data, &combined)
 
        pageStats, err := w.getPageStatistics()
diff --git a/parquet/file/column_writer_types.gen.go 
b/parquet/file/column_writer_types.gen.go
index 6530fbeb..a380a51a 100644
--- a/parquet/file/column_writer_types.gen.go
+++ b/parquet/file/column_writer_types.gen.go
@@ -1271,22 +1271,65 @@ func (w *ByteArrayColumnChunkWriter) WriteBatch(values 
[]parquet.ByteArray, defL
        case values != nil:
                n = int64(len(values))
        }
-       w.doBatches(n, repLevels, func(offset, batch int64) {
-               var vals []parquet.ByteArray
+       // For variable-length types, we use adaptive batch sizing to ensure
+       // each batch's encoded data stays well within int32 range. This 
prevents
+       // overflow in FlushCurrentPage when computing uncompressed page size.
+       const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
+
+       batchSize := w.props.WriteBatchSize()
+       maxDefLevel := w.descr.MaxDefinitionLevel()
+       isV2WithRep := w.props.DataPageVersion() != parquet.DataPageV1 &&
+               repLevels != nil && w.descr.MaxRepetitionLevel() > 0
+       levelOffset := int64(0)
+
+       for levelOffset < n {
+               remaining := n - levelOffset
+               batch := min(remaining, batchSize)
+
+               // Scan values to find safe batch boundary
+               if values != nil {
+                       valScan := valueOffset
+                       var cumDataSize int64
+                       for li := int64(0); li < batch; li++ {
+                               isValue := defLevels == nil || maxDefLevel == 0 
||
+                                       defLevels[levelOffset+li] == maxDefLevel
+                               if isValue && valScan < int64(len(values)) {
+                                       valSize := int64(len(values[valScan])) 
+ 4
+                                       if cumDataSize+valSize > 
maxSafeBatchDataSize && li > 0 {
+                                               batch = li
+                                               break
+                                       }
+                                       cumDataSize += valSize
+                                       valScan++
+                               }
+                       }
+               }
 
-               toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, 
offset, batch), levelSliceOrNil(repLevels, offset, batch))
+               // V2 row-boundary alignment
+               if isV2WithRep && levelOffset+batch < n {
+                       for batch > 1 && repLevels[levelOffset+batch] != 0 {
+                               batch--
+                       }
+               }
+               if batch < 1 {
+                       batch = 1
+               }
+
+               // Process batch
+               toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, 
levelOffset, batch),
+                       levelSliceOrNil(repLevels, levelOffset, batch))
+               var vals []parquet.ByteArray
                if values != nil {
                        vals = values[valueOffset : valueOffset+toWrite]
                }
-
                w.writeValues(vals, batch-toWrite)
                if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err 
!= nil {
                        panic(err)
                }
-
                valueOffset += toWrite
                w.checkDictionarySizeLimit()
-       })
+               levelOffset += batch
+       }
        return
 }
 
@@ -1311,11 +1354,38 @@ func (w *ByteArrayColumnChunkWriter) 
WriteBatchSpaced(values []parquet.ByteArray
        if defLevels == nil {
                length = len(values)
        }
-       doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch 
int64) {
-               var vals []parquet.ByteArray
-               info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, 
offset, batch), batch)
+       // For variable-length types, use adaptive batch sizing to keep encoded
+       // data within int32 range per page.
+       const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
 
-               w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, 
batch), levelSliceOrNil(repLevels, offset, batch))
+       batchSize := w.props.WriteBatchSize()
+       levelOffset := int64(0)
+       n := int64(length)
+
+       for levelOffset < n {
+               remaining := n - levelOffset
+               batch := min(remaining, batchSize)
+
+               // Conservative scan: estimate data size from values starting 
at valueOffset
+               if values != nil {
+                       var cumDataSize int64
+                       for vi := int64(0); vi < batch && valueOffset+vi < 
int64(len(values)); vi++ {
+                               valSize := int64(len(values[valueOffset+vi])) + 
4
+                               if cumDataSize+valSize > maxSafeBatchDataSize 
&& vi > 0 {
+                                       batch = vi
+                                       break
+                               }
+                               cumDataSize += valSize
+                       }
+               }
+               if batch < 1 {
+                       batch = 1
+               }
+
+               info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, 
levelOffset, batch), batch)
+
+               w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, 
levelOffset, batch), levelSliceOrNil(repLevels, levelOffset, batch))
+               var vals []parquet.ByteArray
                if values != nil {
                        vals = values[valueOffset : 
valueOffset+info.numSpaced()]
                }
@@ -1325,11 +1395,14 @@ func (w *ByteArrayColumnChunkWriter) 
WriteBatchSpaced(values []parquet.ByteArray
                } else {
                        w.writeValuesSpaced(vals, info.batchNum, batch, 
validBits, validBitsOffset+valueOffset)
                }
-               w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
+               if err := w.commitWriteAndCheckPageLimit(batch, 
info.numSpaced()); err != nil {
+                       panic(err)
+               }
                valueOffset += info.numSpaced()
 
                w.checkDictionarySizeLimit()
-       })
+               levelOffset += batch
+       }
 }
 
 func (w *ByteArrayColumnChunkWriter) WriteDictIndices(indices arrow.Array, 
defLevels, repLevels []int16) (err error) {
@@ -1371,50 +1444,7 @@ func (w *ByteArrayColumnChunkWriter) 
WriteDictIndices(indices arrow.Array, defLe
 }
 
 func (w *ByteArrayColumnChunkWriter) writeValues(values []parquet.ByteArray, 
numNulls int64) {
-       // For variable-length types, we need to check buffer size to prevent 
int32 overflow
-       // For small values (<1MB), checking frequently adds negligible overhead
-       // For large values (>1MB), we MUST check before each value
-       const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
-       const largeValueThreshold = 1.0 * 1024 * 1024      // 1MB
-
-       encoder := w.currentEncoder.(encoding.ByteArrayEncoder)
-       currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
-       // Batch process small values, check individually for large values
-       batchStart := 0
-       for i := 0; i < len(values); i++ {
-               valueSize := int64(len(values[i]))
-
-               // If this value might cause overflow, flush first
-               if currentSize+valueSize >= maxSafeBufferSize {
-                       // Add accumulated batch before flushing
-                       if i > batchStart {
-                               encoder.Put(values[batchStart:i])
-                               currentSize = 
w.currentEncoder.EstimatedDataEncodedSize()
-                       }
-                       // Flush the page
-                       if err := w.FlushCurrentPage(); err != nil {
-                               panic(err)
-                       }
-                       batchStart = i
-                       currentSize = 0
-               }
-
-               // Track size estimate
-               currentSize += valueSize + 4 // +4 for length prefix
-
-               // For large values, add and flush immediately if needed
-               if valueSize >= largeValueThreshold {
-                       encoder.Put(values[i : i+1])
-                       batchStart = i + 1
-                       currentSize = 
w.currentEncoder.EstimatedDataEncodedSize()
-               }
-       }
-
-       // Add remaining batch
-       if batchStart < len(values) {
-               encoder.Put(values[batchStart:])
-       }
+       w.currentEncoder.(encoding.ByteArrayEncoder).Put(values)
        if w.pageStatistics != nil {
                w.pageStatistics.(*metadata.ByteArrayStatistics).Update(values, 
numNulls)
        }
@@ -1425,41 +1455,10 @@ func (w *ByteArrayColumnChunkWriter) writeValues(values 
[]parquet.ByteArray, num
 }
 
 func (w *ByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues 
[]parquet.ByteArray, numRead, numValues int64, validBits []byte, 
validBitsOffset int64) {
-       // For variable-length types, we need to check buffer size to prevent 
int32 overflow
-       // For small values (<1MB), checking frequently adds negligible overhead
-       // For large values (>1MB), we MUST check before each value
-       const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
-       const largeValueThreshold = 1.0 * 1024 * 1024      // 1MB
-
-       encoder := w.currentEncoder.(encoding.ByteArrayEncoder)
-       currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
-       for i := 0; i < len(spacedValues); i++ {
-               valueSize := int64(len(spacedValues[i]))
-
-               // If this value might cause overflow, flush first
-               if currentSize+valueSize >= maxSafeBufferSize {
-                       if err := w.FlushCurrentPage(); err != nil {
-                               // If flush fails, panic will be caught by 
WriteBatch's defer recover
-                               panic(err)
-                       }
-                       currentSize = 0
-               }
-
-               // Add the value
-               chunk := spacedValues[i : i+1]
-               if len(spacedValues) != int(numRead) && validBits != nil {
-                       encoder.PutSpaced(chunk, validBits, 
validBitsOffset+int64(i))
-               } else {
-                       encoder.Put(chunk)
-               }
-
-               // Track size estimate (only update for large values or every 
100 values)
-               if valueSize >= largeValueThreshold || i%100 == 0 {
-                       currentSize = 
w.currentEncoder.EstimatedDataEncodedSize()
-               } else {
-                       currentSize += valueSize + 4 // +4 for length prefix
-               }
+       if len(spacedValues) != int(numRead) {
+               
w.currentEncoder.(encoding.ByteArrayEncoder).PutSpaced(spacedValues, validBits, 
validBitsOffset)
+       } else {
+               w.currentEncoder.(encoding.ByteArrayEncoder).Put(spacedValues)
        }
        if w.pageStatistics != nil {
                nulls := numValues - numRead
@@ -1543,22 +1542,65 @@ func (w *FixedLenByteArrayColumnChunkWriter) 
WriteBatch(values []parquet.FixedLe
        case values != nil:
                n = int64(len(values))
        }
-       w.doBatches(n, repLevels, func(offset, batch int64) {
-               var vals []parquet.FixedLenByteArray
+       // For variable-length types, we use adaptive batch sizing to ensure
+       // each batch's encoded data stays well within int32 range. This 
prevents
+       // overflow in FlushCurrentPage when computing uncompressed page size.
+       const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
 
-               toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, 
offset, batch), levelSliceOrNil(repLevels, offset, batch))
+       batchSize := w.props.WriteBatchSize()
+       maxDefLevel := w.descr.MaxDefinitionLevel()
+       isV2WithRep := w.props.DataPageVersion() != parquet.DataPageV1 &&
+               repLevels != nil && w.descr.MaxRepetitionLevel() > 0
+       levelOffset := int64(0)
+
+       for levelOffset < n {
+               remaining := n - levelOffset
+               batch := min(remaining, batchSize)
+
+               // Scan values to find safe batch boundary
                if values != nil {
-                       vals = values[valueOffset : valueOffset+toWrite]
+                       valScan := valueOffset
+                       var cumDataSize int64
+                       for li := int64(0); li < batch; li++ {
+                               isValue := defLevels == nil || maxDefLevel == 0 
||
+                                       defLevels[levelOffset+li] == maxDefLevel
+                               if isValue && valScan < int64(len(values)) {
+                                       valSize := int64(w.descr.TypeLength()) 
+ 4
+                                       if cumDataSize+valSize > 
maxSafeBatchDataSize && li > 0 {
+                                               batch = li
+                                               break
+                                       }
+                                       cumDataSize += valSize
+                                       valScan++
+                               }
+                       }
+               }
+
+               // V2 row-boundary alignment
+               if isV2WithRep && levelOffset+batch < n {
+                       for batch > 1 && repLevels[levelOffset+batch] != 0 {
+                               batch--
+                       }
+               }
+               if batch < 1 {
+                       batch = 1
                }
 
+               // Process batch
+               toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, 
levelOffset, batch),
+                       levelSliceOrNil(repLevels, levelOffset, batch))
+               var vals []parquet.FixedLenByteArray
+               if values != nil {
+                       vals = values[valueOffset : valueOffset+toWrite]
+               }
                w.writeValues(vals, batch-toWrite)
                if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err 
!= nil {
                        panic(err)
                }
-
                valueOffset += toWrite
                w.checkDictionarySizeLimit()
-       })
+               levelOffset += batch
+       }
        return
 }
 
@@ -1583,11 +1625,38 @@ func (w *FixedLenByteArrayColumnChunkWriter) 
WriteBatchSpaced(values []parquet.F
        if defLevels == nil {
                length = len(values)
        }
-       doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch 
int64) {
-               var vals []parquet.FixedLenByteArray
-               info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, 
offset, batch), batch)
+       // For variable-length types, use adaptive batch sizing to keep encoded
+       // data within int32 range per page.
+       const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
 
-               w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, 
batch), levelSliceOrNil(repLevels, offset, batch))
+       batchSize := w.props.WriteBatchSize()
+       levelOffset := int64(0)
+       n := int64(length)
+
+       for levelOffset < n {
+               remaining := n - levelOffset
+               batch := min(remaining, batchSize)
+
+               // Conservative scan: estimate data size from values starting 
at valueOffset
+               if values != nil {
+                       var cumDataSize int64
+                       for vi := int64(0); vi < batch && valueOffset+vi < 
int64(len(values)); vi++ {
+                               valSize := int64(w.descr.TypeLength()) + 4
+                               if cumDataSize+valSize > maxSafeBatchDataSize 
&& vi > 0 {
+                                       batch = vi
+                                       break
+                               }
+                               cumDataSize += valSize
+                       }
+               }
+               if batch < 1 {
+                       batch = 1
+               }
+
+               info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, 
levelOffset, batch), batch)
+
+               w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, 
levelOffset, batch), levelSliceOrNil(repLevels, levelOffset, batch))
+               var vals []parquet.FixedLenByteArray
                if values != nil {
                        vals = values[valueOffset : 
valueOffset+info.numSpaced()]
                }
@@ -1597,11 +1666,14 @@ func (w *FixedLenByteArrayColumnChunkWriter) 
WriteBatchSpaced(values []parquet.F
                } else {
                        w.writeValuesSpaced(vals, info.batchNum, batch, 
validBits, validBitsOffset+valueOffset)
                }
-               w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
+               if err := w.commitWriteAndCheckPageLimit(batch, 
info.numSpaced()); err != nil {
+                       panic(err)
+               }
                valueOffset += info.numSpaced()
 
                w.checkDictionarySizeLimit()
-       })
+               levelOffset += batch
+       }
 }
 
 func (w *FixedLenByteArrayColumnChunkWriter) WriteDictIndices(indices 
arrow.Array, defLevels, repLevels []int16) (err error) {
@@ -1643,50 +1715,7 @@ func (w *FixedLenByteArrayColumnChunkWriter) 
WriteDictIndices(indices arrow.Arra
 }
 
 func (w *FixedLenByteArrayColumnChunkWriter) writeValues(values 
[]parquet.FixedLenByteArray, numNulls int64) {
-       // For variable-length types, we need to check buffer size to prevent 
int32 overflow
-       // For small values (<1MB), checking frequently adds negligible overhead
-       // For large values (>1MB), we MUST check before each value
-       const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
-       const largeValueThreshold = 1.0 * 1024 * 1024      // 1MB
-
-       encoder := w.currentEncoder.(encoding.FixedLenByteArrayEncoder)
-       currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
-       // Batch process small values, check individually for large values
-       batchStart := 0
-       for i := 0; i < len(values); i++ {
-               valueSize := int64(w.descr.TypeLength())
-
-               // If this value might cause overflow, flush first
-               if currentSize+valueSize >= maxSafeBufferSize {
-                       // Add accumulated batch before flushing
-                       if i > batchStart {
-                               encoder.Put(values[batchStart:i])
-                               currentSize = 
w.currentEncoder.EstimatedDataEncodedSize()
-                       }
-                       // Flush the page
-                       if err := w.FlushCurrentPage(); err != nil {
-                               panic(err)
-                       }
-                       batchStart = i
-                       currentSize = 0
-               }
-
-               // Track size estimate
-               currentSize += valueSize + 4 // +4 for length prefix
-
-               // For large values, add and flush immediately if needed
-               if valueSize >= largeValueThreshold {
-                       encoder.Put(values[i : i+1])
-                       batchStart = i + 1
-                       currentSize = 
w.currentEncoder.EstimatedDataEncodedSize()
-               }
-       }
-
-       // Add remaining batch
-       if batchStart < len(values) {
-               encoder.Put(values[batchStart:])
-       }
+       w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(values)
        if w.pageStatistics != nil {
                if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) {
                        
w.pageStatistics.(*metadata.Float16Statistics).Update(values, numNulls)
@@ -1701,41 +1730,10 @@ func (w *FixedLenByteArrayColumnChunkWriter) 
writeValues(values []parquet.FixedL
 }
 
 func (w *FixedLenByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues 
[]parquet.FixedLenByteArray, numRead, numValues int64, validBits []byte, 
validBitsOffset int64) {
-       // For variable-length types, we need to check buffer size to prevent 
int32 overflow
-       // For small values (<1MB), checking frequently adds negligible overhead
-       // For large values (>1MB), we MUST check before each value
-       const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
-       const largeValueThreshold = 1.0 * 1024 * 1024      // 1MB
-
-       encoder := w.currentEncoder.(encoding.FixedLenByteArrayEncoder)
-       currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
-       for i := 0; i < len(spacedValues); i++ {
-               valueSize := int64(w.descr.TypeLength())
-
-               // If this value might cause overflow, flush first
-               if currentSize+valueSize >= maxSafeBufferSize {
-                       if err := w.FlushCurrentPage(); err != nil {
-                               // If flush fails, panic will be caught by 
WriteBatch's defer recover
-                               panic(err)
-                       }
-                       currentSize = 0
-               }
-
-               // Add the value
-               chunk := spacedValues[i : i+1]
-               if len(spacedValues) != int(numRead) && validBits != nil {
-                       encoder.PutSpaced(chunk, validBits, 
validBitsOffset+int64(i))
-               } else {
-                       encoder.Put(chunk)
-               }
-
-               // Track size estimate (only update for large values or every 
100 values)
-               if valueSize >= largeValueThreshold || i%100 == 0 {
-                       currentSize = 
w.currentEncoder.EstimatedDataEncodedSize()
-               } else {
-                       currentSize += valueSize + 4 // +4 for length prefix
-               }
+       if len(spacedValues) != int(numRead) {
+               
w.currentEncoder.(encoding.FixedLenByteArrayEncoder).PutSpaced(spacedValues, 
validBits, validBitsOffset)
+       } else {
+               
w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(spacedValues)
        }
        if w.pageStatistics != nil {
                nulls := numValues - numRead
diff --git a/parquet/file/column_writer_types.gen.go.tmpl 
b/parquet/file/column_writer_types.gen.go.tmpl
index 936920b4..f6f2ce81 100644
--- a/parquet/file/column_writer_types.gen.go.tmpl
+++ b/parquet/file/column_writer_types.gen.go.tmpl
@@ -17,8 +17,6 @@
 package file
 
 import (
-    "fmt"
-
     "github.com/apache/arrow-go/v18/internal/utils"
     "github.com/apache/arrow-go/v18/parquet"
     "github.com/apache/arrow-go/v18/parquet/metadata"
@@ -85,6 +83,71 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values 
[]{{.name}}, defLevels, r
   case values != nil:
     n = int64(len(values))
   }
+{{- if .isByteArray}}
+  // For variable-length types, we use adaptive batch sizing to ensure
+  // each batch's encoded data stays well within int32 range. This prevents
+  // overflow in FlushCurrentPage when computing uncompressed page size.
+  const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
+
+  batchSize := w.props.WriteBatchSize()
+  maxDefLevel := w.descr.MaxDefinitionLevel()
+  isV2WithRep := w.props.DataPageVersion() != parquet.DataPageV1 &&
+    repLevels != nil && w.descr.MaxRepetitionLevel() > 0
+  levelOffset := int64(0)
+
+  for levelOffset < n {
+    remaining := n - levelOffset
+    batch := min(remaining, batchSize)
+
+    // Scan values to find safe batch boundary
+    if values != nil {
+      valScan := valueOffset
+      var cumDataSize int64
+      for li := int64(0); li < batch; li++ {
+        isValue := defLevels == nil || maxDefLevel == 0 ||
+          defLevels[levelOffset+li] == maxDefLevel
+        if isValue && valScan < int64(len(values)) {
+{{- if eq .Name "ByteArray"}}
+          valSize := int64(len(values[valScan])) + 4
+{{- else}}
+          valSize := int64(w.descr.TypeLength()) + 4
+{{- end}}
+          if cumDataSize+valSize > maxSafeBatchDataSize && li > 0 {
+            batch = li
+            break
+          }
+          cumDataSize += valSize
+          valScan++
+        }
+      }
+    }
+
+    // V2 row-boundary alignment
+    if isV2WithRep && levelOffset+batch < n {
+      for batch > 1 && repLevels[levelOffset+batch] != 0 {
+        batch--
+      }
+    }
+    if batch < 1 {
+      batch = 1
+    }
+
+    // Process batch
+    toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, levelOffset, 
batch),
+      levelSliceOrNil(repLevels, levelOffset, batch))
+    var vals []{{.name}}
+    if values != nil {
+      vals = values[valueOffset : valueOffset+toWrite]
+    }
+    w.writeValues(vals, batch-toWrite)
+    if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
+      panic(err)
+    }
+    valueOffset += toWrite
+    w.checkDictionarySizeLimit()
+    levelOffset += batch
+  }
+{{- else}}
   w.doBatches(n, repLevels, func(offset, batch int64) {
     var vals []{{.name}}
 
@@ -101,6 +164,7 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values 
[]{{.name}}, defLevels, r
     valueOffset += toWrite
     w.checkDictionarySizeLimit()
   })
+{{- end}}
   return
 }
 
@@ -125,6 +189,61 @@ func (w *{{.Name}}ColumnChunkWriter) 
WriteBatchSpaced(values []{{.name}}, defLev
   if defLevels == nil {
     length = len(values)
   }
+{{- if .isByteArray}}
+  // For variable-length types, use adaptive batch sizing to keep encoded
+  // data within int32 range per page.
+  const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
+
+  batchSize := w.props.WriteBatchSize()
+  levelOffset := int64(0)
+  n := int64(length)
+
+  for levelOffset < n {
+    remaining := n - levelOffset
+    batch := min(remaining, batchSize)
+
+    // Conservative scan: estimate data size from values starting at 
valueOffset
+    if values != nil {
+      var cumDataSize int64
+      for vi := int64(0); vi < batch && valueOffset+vi < int64(len(values)); 
vi++ {
+{{- if eq .Name "ByteArray"}}
+        valSize := int64(len(values[valueOffset+vi])) + 4
+{{- else}}
+        valSize := int64(w.descr.TypeLength()) + 4
+{{- end}}
+        if cumDataSize+valSize > maxSafeBatchDataSize && vi > 0 {
+          batch = vi
+          break
+        }
+        cumDataSize += valSize
+      }
+    }
+    if batch < 1 {
+      batch = 1
+    }
+
+    info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, 
levelOffset, batch), batch)
+
+    w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, levelOffset, batch), 
levelSliceOrNil(repLevels, levelOffset, batch))
+    var vals []{{.name}}
+    if values != nil {
+      vals = values[valueOffset : valueOffset+info.numSpaced()]
+    }
+
+    if w.bitsBuffer != nil {
+      w.writeValuesSpaced(vals, info.batchNum, batch, w.bitsBuffer.Bytes(), 0)
+    } else {
+      w.writeValuesSpaced(vals, info.batchNum, batch, validBits, 
validBitsOffset+valueOffset)
+    }
+    if err := w.commitWriteAndCheckPageLimit(batch, info.numSpaced()); err != 
nil {
+      panic(err)
+    }
+    valueOffset += info.numSpaced()
+
+    w.checkDictionarySizeLimit()
+    levelOffset += batch
+  }
+{{- else}}
   doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) 
{
     var vals []{{.name}}
     info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, 
batch), batch)
@@ -144,6 +263,7 @@ func (w *{{.Name}}ColumnChunkWriter) 
WriteBatchSpaced(values []{{.name}}, defLev
 
     w.checkDictionarySizeLimit()
   })
+{{- end}}
 }
 
 func (w *{{.Name}}ColumnChunkWriter) WriteDictIndices(indices arrow.Array, 
defLevels, repLevels []int16) (err error) {
@@ -185,58 +305,7 @@ func (w *{{.Name}}ColumnChunkWriter) 
WriteDictIndices(indices arrow.Array, defLe
 }
 
 func (w *{{.Name}}ColumnChunkWriter) writeValues(values []{{.name}}, numNulls 
int64) {
-{{- if or (eq .Name "ByteArray") (eq .Name "FixedLenByteArray")}}
-  // For variable-length types, we need to check buffer size to prevent int32 
overflow
-  // For small values (<1MB), checking frequently adds negligible overhead
-  // For large values (>1MB), we MUST check before each value
-  const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
-  const largeValueThreshold = 1.0 * 1024 * 1024      // 1MB
-
-  encoder := w.currentEncoder.(encoding.{{.Name}}Encoder)
-  currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
-  // Batch process small values, check individually for large values
-  batchStart := 0
-  for i := 0; i < len(values); i++ {
-{{- if eq .Name "ByteArray"}}
-    valueSize := int64(len(values[i]))
-{{- else}}
-    valueSize := int64(w.descr.TypeLength())
-{{- end}}
-
-    // If this value might cause overflow, flush first
-    if currentSize + valueSize >= maxSafeBufferSize {
-      // Add accumulated batch before flushing
-      if i > batchStart {
-        encoder.Put(values[batchStart:i])
-        currentSize = w.currentEncoder.EstimatedDataEncodedSize()
-      }
-      // Flush the page
-      if err := w.FlushCurrentPage(); err != nil {
-        panic(err)
-      }
-      batchStart = i
-      currentSize = 0
-    }
-
-    // Track size estimate
-    currentSize += valueSize + 4 // +4 for length prefix
-
-    // For large values, add and flush immediately if needed
-    if valueSize >= largeValueThreshold {
-      encoder.Put(values[i:i+1])
-      batchStart = i + 1
-      currentSize = w.currentEncoder.EstimatedDataEncodedSize()
-    }
-  }
-
-  // Add remaining batch
-  if batchStart < len(values) {
-    encoder.Put(values[batchStart:])
-  }
-{{- else}}
   w.currentEncoder.(encoding.{{.Name}}Encoder).Put(values)
-{{- end}}
   if w.pageStatistics != nil {
 {{- if ne .Name "FixedLenByteArray"}}
     w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls)
@@ -255,54 +324,11 @@ func (w *{{.Name}}ColumnChunkWriter) writeValues(values 
[]{{.name}}, numNulls in
 }
 
 func (w *{{.Name}}ColumnChunkWriter) writeValuesSpaced(spacedValues 
[]{{.name}}, numRead, numValues int64, validBits []byte, validBitsOffset int64) 
{
-{{- if or (eq .Name "ByteArray") (eq .Name "FixedLenByteArray")}}
-  // For variable-length types, we need to check buffer size to prevent int32 
overflow
-  // For small values (<1MB), checking frequently adds negligible overhead
-  // For large values (>1MB), we MUST check before each value
-  const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
-  const largeValueThreshold = 1.0 * 1024 * 1024      // 1MB
-
-  encoder := w.currentEncoder.(encoding.{{.Name}}Encoder)
-  currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
-  for i := 0; i < len(spacedValues); i++ {
-{{- if eq .Name "ByteArray"}}
-    valueSize := int64(len(spacedValues[i]))
-{{- else}}
-    valueSize := int64(w.descr.TypeLength())
-{{- end}}
-
-    // If this value might cause overflow, flush first
-    if currentSize + valueSize >= maxSafeBufferSize {
-      if err := w.FlushCurrentPage(); err != nil {
-        // If flush fails, panic will be caught by WriteBatch's defer recover
-        panic(err)
-      }
-      currentSize = 0
-    }
-
-    // Add the value
-    chunk := spacedValues[i:i+1]
-    if len(spacedValues) != int(numRead) && validBits != nil {
-      encoder.PutSpaced(chunk, validBits, validBitsOffset+int64(i))
-    } else {
-      encoder.Put(chunk)
-    }
-
-    // Track size estimate (only update for large values or every 100 values)
-    if valueSize >= largeValueThreshold || i % 100 == 0 {
-      currentSize = w.currentEncoder.EstimatedDataEncodedSize()
-    } else {
-      currentSize += valueSize + 4 // +4 for length prefix
-    }
-  }
-{{- else}}
   if len(spacedValues) != int(numRead) {
     w.currentEncoder.(encoding.{{.Name}}Encoder).PutSpaced(spacedValues, 
validBits, validBitsOffset)
   } else {
     w.currentEncoder.(encoding.{{.Name}}Encoder).Put(spacedValues)
   }
-{{- end}}
   if w.pageStatistics != nil {
     nulls := numValues - numRead
 {{- if ne .Name "FixedLenByteArray"}}
diff --git a/parquet/file/large_value_test.go b/parquet/file/large_value_test.go
index 3def3dc6..66925f11 100644
--- a/parquet/file/large_value_test.go
+++ b/parquet/file/large_value_test.go
@@ -18,6 +18,8 @@ package file_test
 
 import (
        "bytes"
+       "context"
+       "fmt"
        "runtime"
        "testing"
 
@@ -33,24 +35,26 @@ import (
 )
 
 // TestLargeByteArrayValuesDoNotOverflowInt32 tests that writing large byte 
array
-// values that would exceed the 1GB flush threshold does not cause an int32 
overflow panic.
-// The fix ensures pages are flushed automatically before buffer size exceeds 
safe limits.
+// values totalling over 1GB in a single WriteBatch call triggers adaptive 
batch
+// sizing and does not cause an int32 overflow panic in FlushCurrentPage.
+//
+// Memory note: input values all share one 1.5MB buffer so input memory is low,
+// but the parquet output buffer grows to ~1GB (unavoidable for this boundary 
test).
 func TestLargeByteArrayValuesDoNotOverflowInt32(t *testing.T) {
        if runtime.GOARCH == "386" {
                t.Skip("Skipping test on 32-bit architecture")
        }
 
-       // Create schema with a single byte array column
        sc := schema.NewSchema(schema.MustGroup(schema.NewGroupNode("schema", 
parquet.Repetitions.Required, schema.FieldList{
                schema.Must(schema.NewPrimitiveNode("large_data", 
parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
        }, -1)))
 
        props := parquet.NewWriterProperties(
-               parquet.WithStats(false), // Disable stats to focus on core 
issue
+               parquet.WithStats(false),
                parquet.WithVersion(parquet.V2_LATEST),
                parquet.WithDataPageVersion(parquet.DataPageV2),
-               parquet.WithDictionaryDefault(false), // Plain encoding
-               parquet.WithDataPageSize(1024*1024),  // 1MB page size
+               parquet.WithDictionaryDefault(false),
+               parquet.WithDataPageSize(1024*1024),
        )
 
        out := &bytes.Buffer{}
@@ -60,47 +64,33 @@ func TestLargeByteArrayValuesDoNotOverflowInt32(t 
*testing.T) {
        rgw := writer.AppendRowGroup()
        colWriter, _ := rgw.NextColumn()
 
-       // Create 700 values of 1.5MB each (1.05GB total)
-       // This exceeds the 1GB flush threshold, triggering automatic page 
flushes
-       // Uses minimal memory (single 1.5MB buffer reused) while testing loop 
logic thoroughly
-       const valueSize = 1.5 * 1024 * 1024 // 1.5MB per value (>= 1MB 
threshold for large value handling)
-       const numValues = 700               // 700 values = 1.05GB total
+       // 700 values × 1.5MB = 1.05GB total, triggers adaptive batch split at 
~1GB.
+       // All values share the same underlying buffer (only ~1.5MB of input 
memory).
+       const valueSize = 1.5 * 1024 * 1024
+       const numValues = 700
 
-       // Create a single 1.5MB buffer and reuse it (only allocates 1.5MB!)
-       largeValue := make([]byte, valueSize)
+       largeValue := make([]byte, int(valueSize))
        for i := range largeValue {
                largeValue[i] = byte(i % 256)
        }
 
        values := make([]parquet.ByteArray, numValues)
        for i := range values {
-               values[i] = largeValue // Reuse same buffer (memory efficient: 
2MB total, writes 1.1GB)
+               values[i] = largeValue
        }
 
-       // This should NOT panic with int32 overflow
-       // Expected behavior: automatically flush pages at 1GB threshold
        byteArrayWriter := colWriter.(*file.ByteArrayColumnChunkWriter)
        _, err := byteArrayWriter.WriteBatch(values, nil, nil)
-
-       // Should succeed without panic
-       assert.NoError(t, err)
-
-       err = colWriter.Close()
-       assert.NoError(t, err)
-
-       err = rgw.Close()
        assert.NoError(t, err)
 
-       err = writer.Close()
-       assert.NoError(t, err)
-
-       // Verify we wrote data successfully
-       assert.Greater(t, out.Len(), 0, "should have written data to buffer")
+       assert.NoError(t, colWriter.Close())
+       assert.NoError(t, rgw.Close())
+       assert.NoError(t, writer.Close())
+       assert.Greater(t, out.Len(), 0)
 }
 
-// TestLargeStringArrayWithArrow tests the same issue using Arrow arrays
-// This tests the pqarrow integration path which is commonly used.
-// Uses LARGE_STRING type (int64 offsets) to handle >1GB of string data 
without overflow.
+// TestLargeStringArrayWithArrow tests the pqarrow integration path with large 
values.
+// Writes >1GB total through multiple small batches via the Arrow writer.
 func TestLargeStringArrayWithArrow(t *testing.T) {
        if runtime.GOARCH == "386" {
                t.Skip("Skipping test on 32-bit architecture")
@@ -108,11 +98,9 @@ func TestLargeStringArrayWithArrow(t *testing.T) {
 
        mem := memory.NewGoAllocator()
 
-       // Create Arrow schema with LARGE_STRING field (uses int64 offsets, can 
handle >2GB)
        field := arrow.Field{Name: "large_strings", Type: 
arrow.BinaryTypes.LargeString, Nullable: true}
        arrowSchema := arrow.NewSchema([]arrow.Field{field}, nil)
 
-       // Write to Parquet
        out := &bytes.Buffer{}
        props := parquet.NewWriterProperties(
                parquet.WithStats(false),
@@ -125,39 +113,222 @@ func TestLargeStringArrayWithArrow(t *testing.T) {
        pqw, err := pqarrow.NewFileWriter(arrowSchema, out, props, 
pqarrow.NewArrowWriterProperties())
        require.NoError(t, err)
 
-       // Write in multiple batches to reduce memory usage
-       // Each batch: 10 values × 10MB = 100MB
-       // Total: 11 batches = 1.1GB written (only 100MB memory at a time!)
-       const valueSize = 10 * 1024 * 1024 // 10MB per string (realistic large 
blob)
-       const valuesPerBatch = 10          // 10 values per batch
-       const numBatches = 11              // 11 batches = 1.1GB total
+       // 11 batches × 10 values × 10MB = 1.1GB total.
+       // Only one batch (~100MB) is live at a time.
+       const valueSize = 10 * 1024 * 1024
+       const valuesPerBatch = 10
+       const numBatches = 11
 
        largeStr := string(make([]byte, valueSize))
 
        for batchNum := 0; batchNum < numBatches; batchNum++ {
-               // Build a small batch
                builder := array.NewLargeStringBuilder(mem)
                for i := 0; i < valuesPerBatch; i++ {
                        builder.Append(largeStr)
                }
                arr := builder.NewArray()
-
                rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 
int64(valuesPerBatch))
 
-               // Write batch - this should NOT panic with int32 overflow
                err = pqw.Write(rec)
-
-               // Clean up batch resources
                rec.Release()
                arr.Release()
                builder.Release()
-
                assert.NoError(t, err)
        }
 
-       err = pqw.Close()
-       assert.NoError(t, err)
+       assert.NoError(t, pqw.Close())
+       assert.Greater(t, out.Len(), 0)
+}
 
-       // Verify we wrote data successfully
-       assert.Greater(t, out.Len(), 0, "should have written data to buffer")
+// TestLargeByteArrayRoundTripCorrectness verifies that ByteArray values
+// written through the pqarrow path are read back correctly. This ensures the
+// adaptive batch sizing loop in WriteBatch produces valid data pages where
+// levels and values stay in sync.
+//
+// Uses modest value sizes (~10KB each, ~2MB total) to keep memory low while
+// still exercising the full write→encode→flush→read→decode→compare path.
+// The >1GB adaptive-split boundary is tested by 
TestLargeByteArrayValuesDoNotOverflowInt32.
+func TestLargeByteArrayRoundTripCorrectness(t *testing.T) {
+       mem := memory.NewGoAllocator()
+
+       field := arrow.Field{Name: "data", Type: arrow.BinaryTypes.LargeString, 
Nullable: false}
+       arrowSchema := arrow.NewSchema([]arrow.Field{field}, nil)
+
+       out := &bytes.Buffer{}
+       props := parquet.NewWriterProperties(
+               parquet.WithStats(true),
+               parquet.WithVersion(parquet.V2_LATEST),
+               parquet.WithDataPageVersion(parquet.DataPageV2),
+               parquet.WithDictionaryDefault(false),
+               parquet.WithDataPageSize(1024*1024),
+       )
+
+       pqw, err := pqarrow.NewFileWriter(arrowSchema, out, props, 
pqarrow.NewArrowWriterProperties())
+       require.NoError(t, err)
+
+       // 200 values × 10KB = ~2MB total. Each value has a unique, 
deterministic
+       // pattern so we can regenerate expected content during verification
+       // without storing all values in memory.
+       const valueSize = 10 * 1024
+       const numValues = 200
+
+       makeValue := func(idx int) string {
+               buf := make([]byte, valueSize)
+               // First 4 bytes: index for identification
+               buf[0] = byte(idx >> 24)
+               buf[1] = byte(idx >> 16)
+               buf[2] = byte(idx >> 8)
+               buf[3] = byte(idx)
+               // Remaining bytes: deterministic pattern
+               for j := 4; j < valueSize; j++ {
+                       buf[j] = byte(idx*31 + j)
+               }
+               return string(buf)
+       }
+
+       builder := array.NewLargeStringBuilder(mem)
+       for i := 0; i < numValues; i++ {
+               builder.Append(makeValue(i))
+       }
+       arr := builder.NewArray()
+       rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 
int64(numValues))
+
+       err = pqw.Write(rec)
+       require.NoError(t, err)
+
+       rec.Release()
+       arr.Release()
+       builder.Release()
+
+       require.NoError(t, pqw.Close())
+
+       // Read back and verify
+       rdr, err := file.NewParquetReader(bytes.NewReader(out.Bytes()))
+       require.NoError(t, err)
+       defer rdr.Close()
+
+       arrowRdr, err := pqarrow.NewFileReader(rdr, 
pqarrow.ArrowReadProperties{}, mem)
+       require.NoError(t, err)
+
+       tbl, err := arrowRdr.ReadTable(context.Background())
+       require.NoError(t, err)
+       defer tbl.Release()
+
+       require.EqualValues(t, numValues, tbl.NumRows())
+       require.Equal(t, 1, int(tbl.NumCols()))
+
+       col := tbl.Column(0)
+       rowIdx := 0
+       for _, chunk := range col.Data().Chunks() {
+               strArr := chunk.(*array.String)
+               for j := 0; j < strArr.Len(); j++ {
+                       got := strArr.Value(j)
+                       expected := makeValue(rowIdx)
+                       require.Equal(t, len(expected), len(got),
+                               "value %d: length mismatch", rowIdx)
+                       require.Equal(t, expected[:4], got[:4],
+                               "value %d: header mismatch (data corruption)", 
rowIdx)
+                       require.Equal(t, expected, got,
+                               "value %d: full content mismatch", rowIdx)
+                       rowIdx++
+               }
+       }
+       require.Equal(t, numValues, rowIdx, "did not read back all values")
+}
+
+// TestLargeByteArrayRoundTripWithNulls verifies correctness of the
+// WriteBatchSpaced path (nullable column) with moderately-sized values.
+// Every 3rd value is null. Uses ~3MB total.
+func TestLargeByteArrayRoundTripWithNulls(t *testing.T) {
+       mem := memory.NewGoAllocator()
+
+       field := arrow.Field{Name: "data", Type: arrow.BinaryTypes.LargeString, 
Nullable: true}
+       arrowSchema := arrow.NewSchema([]arrow.Field{field}, nil)
+
+       out := &bytes.Buffer{}
+       props := parquet.NewWriterProperties(
+               parquet.WithStats(true),
+               parquet.WithVersion(parquet.V2_LATEST),
+               parquet.WithDataPageVersion(parquet.DataPageV2),
+               parquet.WithDictionaryDefault(false),
+               parquet.WithDataPageSize(1024*1024),
+       )
+
+       pqw, err := pqarrow.NewFileWriter(arrowSchema, out, props, 
pqarrow.NewArrowWriterProperties())
+       require.NoError(t, err)
+
+       const valueSize = 10 * 1024
+       const numValues = 300
+
+       makeValue := func(idx int) string {
+               buf := make([]byte, valueSize)
+               buf[0] = byte(idx >> 24)
+               buf[1] = byte(idx >> 16)
+               buf[2] = byte(idx >> 8)
+               buf[3] = byte(idx)
+               for j := 4; j < valueSize; j++ {
+                       buf[j] = byte(idx*17 + j)
+               }
+               return string(buf)
+       }
+
+       // Track which indices are null for verification
+       isNull := func(i int) bool { return i%3 == 0 }
+
+       builder := array.NewLargeStringBuilder(mem)
+       for i := 0; i < numValues; i++ {
+               if isNull(i) {
+                       builder.AppendNull()
+               } else {
+                       builder.Append(makeValue(i))
+               }
+       }
+       arr := builder.NewArray()
+       rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 
int64(numValues))
+
+       err = pqw.Write(rec)
+       require.NoError(t, err)
+
+       rec.Release()
+       arr.Release()
+       builder.Release()
+
+       require.NoError(t, pqw.Close())
+
+       // Read back and verify
+       rdr, err := file.NewParquetReader(bytes.NewReader(out.Bytes()))
+       require.NoError(t, err)
+       defer rdr.Close()
+
+       arrowRdr, err := pqarrow.NewFileReader(rdr, 
pqarrow.ArrowReadProperties{}, mem)
+       require.NoError(t, err)
+
+       tbl, err := arrowRdr.ReadTable(context.Background())
+       require.NoError(t, err)
+       defer tbl.Release()
+
+       require.EqualValues(t, numValues, tbl.NumRows())
+
+       col := tbl.Column(0)
+       rowIdx := 0
+       for _, chunk := range col.Data().Chunks() {
+               strArr := chunk.(*array.String)
+               for j := 0; j < strArr.Len(); j++ {
+                       if isNull(rowIdx) {
+                               require.True(t, strArr.IsNull(j), "value %d: 
expected null", rowIdx)
+                       } else {
+                               require.False(t, strArr.IsNull(j), "value %d: 
unexpected null", rowIdx)
+                               got := strArr.Value(j)
+                               expected := makeValue(rowIdx)
+                               require.Equal(t, len(expected), len(got),
+                                       "value %d: length mismatch", rowIdx)
+                               require.Equal(t, expected[:4], got[:4],
+                                       "value %d: header mismatch (data 
corruption)", rowIdx)
+                               require.Equal(t, expected, got,
+                                       fmt.Sprintf("value %d: full content 
mismatch", rowIdx))
+                       }
+                       rowIdx++
+               }
+       }
+       require.Equal(t, numValues, rowIdx, "did not read back all values")
 }


Reply via email to