This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 3ad38d03 fix(parquet/file): use adaptive batch sizing to avoid panic
(#690)
3ad38d03 is described below
commit 3ad38d03dbc85283f61d60ba2bd8ae8492f0972a
Author: Matt Topol <[email protected]>
AuthorDate: Mon Mar 9 00:35:41 2026 -0400
fix(parquet/file): use adaptive batch sizing to avoid panic (#690)
### Rationale for this change
Issue reported in
https://github.com/apache/arrow-go/issues/622#issuecomment-3994896058
where accumulated data on a given page exceeds the DataPageSize.
### What changes are included in this PR?
Removing a broken mid-batch flush in `writeValues`/`writeValuesSpaced`,
instead relying back on `encoder.Put()/encoder.PutSpaced()`.
Updated `WriteBatch` to use an adaptive batch sizing approach for
ByteArray/FLBA writing to properly handle v2 data page row-boundary
alignment without breaking on very large individual values.
### Are these changes tested?
New tests are added to cover this scenario to ensure test coverage.
### Are there any user-facing changes?
No
---
parquet/file/column_writer.go | 11 +-
parquet/file/column_writer_types.gen.go | 362 +++++++++++++--------------
parquet/file/column_writer_types.gen.go.tmpl | 218 +++++++++-------
parquet/file/large_value_test.go | 271 ++++++++++++++++----
4 files changed, 532 insertions(+), 330 deletions(-)
diff --git a/parquet/file/column_writer.go b/parquet/file/column_writer.go
index 8d35aaa3..acfe69a1 100644
--- a/parquet/file/column_writer.go
+++ b/parquet/file/column_writer.go
@@ -19,7 +19,9 @@ package file
import (
"bytes"
"encoding/binary"
+ "fmt"
"io"
+ "math"
"strconv"
"github.com/apache/arrow-go/v18/arrow"
@@ -303,7 +305,12 @@ func (w *columnWriter) FlushCurrentPage() error {
repLevelsRLESize = int32(w.repLevelSink.Len())
}
- uncompressed := defLevelsRLESize + repLevelsRLESize +
int32(values.Len())
+ uncompressed64 := int64(defLevelsRLESize) + int64(repLevelsRLESize) +
int64(values.Len())
+ if uncompressed64 > math.MaxInt32 {
+ return fmt.Errorf("parquet: uncompressed page size %d exceeds
INT32_MAX (%d)",
+ uncompressed64, int64(math.MaxInt32))
+ }
+ uncompressed := int32(uncompressed64)
if isV1DataPage {
err = w.buildDataPageV1(defLevelsRLESize, repLevelsRLESize,
uncompressed, values.Bytes())
} else {
@@ -378,7 +385,7 @@ func (w *columnWriter) buildDataPageV2(defLevelsRLESize,
repLevelsRLESize, uncom
// concatenate uncompressed levels and the possibly compressed values
var combined bytes.Buffer
- combined.Grow(int(defLevelsRLESize + repLevelsRLESize +
int32(len(data))))
+ combined.Grow(int(int64(defLevelsRLESize) + int64(repLevelsRLESize) +
int64(len(data))))
w.concatBuffers(defLevelsRLESize, repLevelsRLESize, data, &combined)
pageStats, err := w.getPageStatistics()
diff --git a/parquet/file/column_writer_types.gen.go
b/parquet/file/column_writer_types.gen.go
index 6530fbeb..a380a51a 100644
--- a/parquet/file/column_writer_types.gen.go
+++ b/parquet/file/column_writer_types.gen.go
@@ -1271,22 +1271,65 @@ func (w *ByteArrayColumnChunkWriter) WriteBatch(values
[]parquet.ByteArray, defL
case values != nil:
n = int64(len(values))
}
- w.doBatches(n, repLevels, func(offset, batch int64) {
- var vals []parquet.ByteArray
+ // For variable-length types, we use adaptive batch sizing to ensure
+ // each batch's encoded data stays well within int32 range. This
prevents
+ // overflow in FlushCurrentPage when computing uncompressed page size.
+ const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
+
+ batchSize := w.props.WriteBatchSize()
+ maxDefLevel := w.descr.MaxDefinitionLevel()
+ isV2WithRep := w.props.DataPageVersion() != parquet.DataPageV1 &&
+ repLevels != nil && w.descr.MaxRepetitionLevel() > 0
+ levelOffset := int64(0)
+
+ for levelOffset < n {
+ remaining := n - levelOffset
+ batch := min(remaining, batchSize)
+
+ // Scan values to find safe batch boundary
+ if values != nil {
+ valScan := valueOffset
+ var cumDataSize int64
+ for li := int64(0); li < batch; li++ {
+ isValue := defLevels == nil || maxDefLevel == 0
||
+ defLevels[levelOffset+li] == maxDefLevel
+ if isValue && valScan < int64(len(values)) {
+ valSize := int64(len(values[valScan]))
+ 4
+ if cumDataSize+valSize >
maxSafeBatchDataSize && li > 0 {
+ batch = li
+ break
+ }
+ cumDataSize += valSize
+ valScan++
+ }
+ }
+ }
- toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels,
offset, batch), levelSliceOrNil(repLevels, offset, batch))
+ // V2 row-boundary alignment
+ if isV2WithRep && levelOffset+batch < n {
+ for batch > 1 && repLevels[levelOffset+batch] != 0 {
+ batch--
+ }
+ }
+ if batch < 1 {
+ batch = 1
+ }
+
+ // Process batch
+ toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels,
levelOffset, batch),
+ levelSliceOrNil(repLevels, levelOffset, batch))
+ var vals []parquet.ByteArray
if values != nil {
vals = values[valueOffset : valueOffset+toWrite]
}
-
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err
!= nil {
panic(err)
}
-
valueOffset += toWrite
w.checkDictionarySizeLimit()
- })
+ levelOffset += batch
+ }
return
}
@@ -1311,11 +1354,38 @@ func (w *ByteArrayColumnChunkWriter)
WriteBatchSpaced(values []parquet.ByteArray
if defLevels == nil {
length = len(values)
}
- doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch
int64) {
- var vals []parquet.ByteArray
- info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels,
offset, batch), batch)
+ // For variable-length types, use adaptive batch sizing to keep encoded
+ // data within int32 range per page.
+ const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
- w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
+ batchSize := w.props.WriteBatchSize()
+ levelOffset := int64(0)
+ n := int64(length)
+
+ for levelOffset < n {
+ remaining := n - levelOffset
+ batch := min(remaining, batchSize)
+
+ // Conservative scan: estimate data size from values starting
at valueOffset
+ if values != nil {
+ var cumDataSize int64
+ for vi := int64(0); vi < batch && valueOffset+vi <
int64(len(values)); vi++ {
+ valSize := int64(len(values[valueOffset+vi])) +
4
+ if cumDataSize+valSize > maxSafeBatchDataSize
&& vi > 0 {
+ batch = vi
+ break
+ }
+ cumDataSize += valSize
+ }
+ }
+ if batch < 1 {
+ batch = 1
+ }
+
+ info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels,
levelOffset, batch), batch)
+
+ w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels,
levelOffset, batch), levelSliceOrNil(repLevels, levelOffset, batch))
+ var vals []parquet.ByteArray
if values != nil {
vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
@@ -1325,11 +1395,14 @@ func (w *ByteArrayColumnChunkWriter)
WriteBatchSpaced(values []parquet.ByteArray
} else {
w.writeValuesSpaced(vals, info.batchNum, batch,
validBits, validBitsOffset+valueOffset)
}
- w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
+ if err := w.commitWriteAndCheckPageLimit(batch,
info.numSpaced()); err != nil {
+ panic(err)
+ }
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
- })
+ levelOffset += batch
+ }
}
func (w *ByteArrayColumnChunkWriter) WriteDictIndices(indices arrow.Array,
defLevels, repLevels []int16) (err error) {
@@ -1371,50 +1444,7 @@ func (w *ByteArrayColumnChunkWriter)
WriteDictIndices(indices arrow.Array, defLe
}
func (w *ByteArrayColumnChunkWriter) writeValues(values []parquet.ByteArray,
numNulls int64) {
- // For variable-length types, we need to check buffer size to prevent
int32 overflow
- // For small values (<1MB), checking frequently adds negligible overhead
- // For large values (>1MB), we MUST check before each value
- const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
- const largeValueThreshold = 1.0 * 1024 * 1024 // 1MB
-
- encoder := w.currentEncoder.(encoding.ByteArrayEncoder)
- currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
- // Batch process small values, check individually for large values
- batchStart := 0
- for i := 0; i < len(values); i++ {
- valueSize := int64(len(values[i]))
-
- // If this value might cause overflow, flush first
- if currentSize+valueSize >= maxSafeBufferSize {
- // Add accumulated batch before flushing
- if i > batchStart {
- encoder.Put(values[batchStart:i])
- currentSize =
w.currentEncoder.EstimatedDataEncodedSize()
- }
- // Flush the page
- if err := w.FlushCurrentPage(); err != nil {
- panic(err)
- }
- batchStart = i
- currentSize = 0
- }
-
- // Track size estimate
- currentSize += valueSize + 4 // +4 for length prefix
-
- // For large values, add and flush immediately if needed
- if valueSize >= largeValueThreshold {
- encoder.Put(values[i : i+1])
- batchStart = i + 1
- currentSize =
w.currentEncoder.EstimatedDataEncodedSize()
- }
- }
-
- // Add remaining batch
- if batchStart < len(values) {
- encoder.Put(values[batchStart:])
- }
+ w.currentEncoder.(encoding.ByteArrayEncoder).Put(values)
if w.pageStatistics != nil {
w.pageStatistics.(*metadata.ByteArrayStatistics).Update(values,
numNulls)
}
@@ -1425,41 +1455,10 @@ func (w *ByteArrayColumnChunkWriter) writeValues(values
[]parquet.ByteArray, num
}
func (w *ByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues
[]parquet.ByteArray, numRead, numValues int64, validBits []byte,
validBitsOffset int64) {
- // For variable-length types, we need to check buffer size to prevent
int32 overflow
- // For small values (<1MB), checking frequently adds negligible overhead
- // For large values (>1MB), we MUST check before each value
- const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
- const largeValueThreshold = 1.0 * 1024 * 1024 // 1MB
-
- encoder := w.currentEncoder.(encoding.ByteArrayEncoder)
- currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
- for i := 0; i < len(spacedValues); i++ {
- valueSize := int64(len(spacedValues[i]))
-
- // If this value might cause overflow, flush first
- if currentSize+valueSize >= maxSafeBufferSize {
- if err := w.FlushCurrentPage(); err != nil {
- // If flush fails, panic will be caught by
WriteBatch's defer recover
- panic(err)
- }
- currentSize = 0
- }
-
- // Add the value
- chunk := spacedValues[i : i+1]
- if len(spacedValues) != int(numRead) && validBits != nil {
- encoder.PutSpaced(chunk, validBits,
validBitsOffset+int64(i))
- } else {
- encoder.Put(chunk)
- }
-
- // Track size estimate (only update for large values or every
100 values)
- if valueSize >= largeValueThreshold || i%100 == 0 {
- currentSize =
w.currentEncoder.EstimatedDataEncodedSize()
- } else {
- currentSize += valueSize + 4 // +4 for length prefix
- }
+ if len(spacedValues) != int(numRead) {
+
w.currentEncoder.(encoding.ByteArrayEncoder).PutSpaced(spacedValues, validBits,
validBitsOffset)
+ } else {
+ w.currentEncoder.(encoding.ByteArrayEncoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := numValues - numRead
@@ -1543,22 +1542,65 @@ func (w *FixedLenByteArrayColumnChunkWriter)
WriteBatch(values []parquet.FixedLe
case values != nil:
n = int64(len(values))
}
- w.doBatches(n, repLevels, func(offset, batch int64) {
- var vals []parquet.FixedLenByteArray
+ // For variable-length types, we use adaptive batch sizing to ensure
+ // each batch's encoded data stays well within int32 range. This
prevents
+ // overflow in FlushCurrentPage when computing uncompressed page size.
+ const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
- toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels,
offset, batch), levelSliceOrNil(repLevels, offset, batch))
+ batchSize := w.props.WriteBatchSize()
+ maxDefLevel := w.descr.MaxDefinitionLevel()
+ isV2WithRep := w.props.DataPageVersion() != parquet.DataPageV1 &&
+ repLevels != nil && w.descr.MaxRepetitionLevel() > 0
+ levelOffset := int64(0)
+
+ for levelOffset < n {
+ remaining := n - levelOffset
+ batch := min(remaining, batchSize)
+
+ // Scan values to find safe batch boundary
if values != nil {
- vals = values[valueOffset : valueOffset+toWrite]
+ valScan := valueOffset
+ var cumDataSize int64
+ for li := int64(0); li < batch; li++ {
+ isValue := defLevels == nil || maxDefLevel == 0
||
+ defLevels[levelOffset+li] == maxDefLevel
+ if isValue && valScan < int64(len(values)) {
+ valSize := int64(w.descr.TypeLength())
+ 4
+ if cumDataSize+valSize >
maxSafeBatchDataSize && li > 0 {
+ batch = li
+ break
+ }
+ cumDataSize += valSize
+ valScan++
+ }
+ }
+ }
+
+ // V2 row-boundary alignment
+ if isV2WithRep && levelOffset+batch < n {
+ for batch > 1 && repLevels[levelOffset+batch] != 0 {
+ batch--
+ }
+ }
+ if batch < 1 {
+ batch = 1
}
+ // Process batch
+ toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels,
levelOffset, batch),
+ levelSliceOrNil(repLevels, levelOffset, batch))
+ var vals []parquet.FixedLenByteArray
+ if values != nil {
+ vals = values[valueOffset : valueOffset+toWrite]
+ }
w.writeValues(vals, batch-toWrite)
if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err
!= nil {
panic(err)
}
-
valueOffset += toWrite
w.checkDictionarySizeLimit()
- })
+ levelOffset += batch
+ }
return
}
@@ -1583,11 +1625,38 @@ func (w *FixedLenByteArrayColumnChunkWriter)
WriteBatchSpaced(values []parquet.F
if defLevels == nil {
length = len(values)
}
- doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch
int64) {
- var vals []parquet.FixedLenByteArray
- info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels,
offset, batch), batch)
+ // For variable-length types, use adaptive batch sizing to keep encoded
+ // data within int32 range per page.
+ const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
- w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
+ batchSize := w.props.WriteBatchSize()
+ levelOffset := int64(0)
+ n := int64(length)
+
+ for levelOffset < n {
+ remaining := n - levelOffset
+ batch := min(remaining, batchSize)
+
+ // Conservative scan: estimate data size from values starting
at valueOffset
+ if values != nil {
+ var cumDataSize int64
+ for vi := int64(0); vi < batch && valueOffset+vi <
int64(len(values)); vi++ {
+ valSize := int64(w.descr.TypeLength()) + 4
+ if cumDataSize+valSize > maxSafeBatchDataSize
&& vi > 0 {
+ batch = vi
+ break
+ }
+ cumDataSize += valSize
+ }
+ }
+ if batch < 1 {
+ batch = 1
+ }
+
+ info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels,
levelOffset, batch), batch)
+
+ w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels,
levelOffset, batch), levelSliceOrNil(repLevels, levelOffset, batch))
+ var vals []parquet.FixedLenByteArray
if values != nil {
vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
@@ -1597,11 +1666,14 @@ func (w *FixedLenByteArrayColumnChunkWriter)
WriteBatchSpaced(values []parquet.F
} else {
w.writeValuesSpaced(vals, info.batchNum, batch,
validBits, validBitsOffset+valueOffset)
}
- w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
+ if err := w.commitWriteAndCheckPageLimit(batch,
info.numSpaced()); err != nil {
+ panic(err)
+ }
valueOffset += info.numSpaced()
w.checkDictionarySizeLimit()
- })
+ levelOffset += batch
+ }
}
func (w *FixedLenByteArrayColumnChunkWriter) WriteDictIndices(indices
arrow.Array, defLevels, repLevels []int16) (err error) {
@@ -1643,50 +1715,7 @@ func (w *FixedLenByteArrayColumnChunkWriter)
WriteDictIndices(indices arrow.Arra
}
func (w *FixedLenByteArrayColumnChunkWriter) writeValues(values
[]parquet.FixedLenByteArray, numNulls int64) {
- // For variable-length types, we need to check buffer size to prevent
int32 overflow
- // For small values (<1MB), checking frequently adds negligible overhead
- // For large values (>1MB), we MUST check before each value
- const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
- const largeValueThreshold = 1.0 * 1024 * 1024 // 1MB
-
- encoder := w.currentEncoder.(encoding.FixedLenByteArrayEncoder)
- currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
- // Batch process small values, check individually for large values
- batchStart := 0
- for i := 0; i < len(values); i++ {
- valueSize := int64(w.descr.TypeLength())
-
- // If this value might cause overflow, flush first
- if currentSize+valueSize >= maxSafeBufferSize {
- // Add accumulated batch before flushing
- if i > batchStart {
- encoder.Put(values[batchStart:i])
- currentSize =
w.currentEncoder.EstimatedDataEncodedSize()
- }
- // Flush the page
- if err := w.FlushCurrentPage(); err != nil {
- panic(err)
- }
- batchStart = i
- currentSize = 0
- }
-
- // Track size estimate
- currentSize += valueSize + 4 // +4 for length prefix
-
- // For large values, add and flush immediately if needed
- if valueSize >= largeValueThreshold {
- encoder.Put(values[i : i+1])
- batchStart = i + 1
- currentSize =
w.currentEncoder.EstimatedDataEncodedSize()
- }
- }
-
- // Add remaining batch
- if batchStart < len(values) {
- encoder.Put(values[batchStart:])
- }
+ w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(values)
if w.pageStatistics != nil {
if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) {
w.pageStatistics.(*metadata.Float16Statistics).Update(values, numNulls)
@@ -1701,41 +1730,10 @@ func (w *FixedLenByteArrayColumnChunkWriter)
writeValues(values []parquet.FixedL
}
func (w *FixedLenByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues
[]parquet.FixedLenByteArray, numRead, numValues int64, validBits []byte,
validBitsOffset int64) {
- // For variable-length types, we need to check buffer size to prevent
int32 overflow
- // For small values (<1MB), checking frequently adds negligible overhead
- // For large values (>1MB), we MUST check before each value
- const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
- const largeValueThreshold = 1.0 * 1024 * 1024 // 1MB
-
- encoder := w.currentEncoder.(encoding.FixedLenByteArrayEncoder)
- currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
- for i := 0; i < len(spacedValues); i++ {
- valueSize := int64(w.descr.TypeLength())
-
- // If this value might cause overflow, flush first
- if currentSize+valueSize >= maxSafeBufferSize {
- if err := w.FlushCurrentPage(); err != nil {
- // If flush fails, panic will be caught by
WriteBatch's defer recover
- panic(err)
- }
- currentSize = 0
- }
-
- // Add the value
- chunk := spacedValues[i : i+1]
- if len(spacedValues) != int(numRead) && validBits != nil {
- encoder.PutSpaced(chunk, validBits,
validBitsOffset+int64(i))
- } else {
- encoder.Put(chunk)
- }
-
- // Track size estimate (only update for large values or every
100 values)
- if valueSize >= largeValueThreshold || i%100 == 0 {
- currentSize =
w.currentEncoder.EstimatedDataEncodedSize()
- } else {
- currentSize += valueSize + 4 // +4 for length prefix
- }
+ if len(spacedValues) != int(numRead) {
+
w.currentEncoder.(encoding.FixedLenByteArrayEncoder).PutSpaced(spacedValues,
validBits, validBitsOffset)
+ } else {
+
w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(spacedValues)
}
if w.pageStatistics != nil {
nulls := numValues - numRead
diff --git a/parquet/file/column_writer_types.gen.go.tmpl
b/parquet/file/column_writer_types.gen.go.tmpl
index 936920b4..f6f2ce81 100644
--- a/parquet/file/column_writer_types.gen.go.tmpl
+++ b/parquet/file/column_writer_types.gen.go.tmpl
@@ -17,8 +17,6 @@
package file
import (
- "fmt"
-
"github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/metadata"
@@ -85,6 +83,71 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values
[]{{.name}}, defLevels, r
case values != nil:
n = int64(len(values))
}
+{{- if .isByteArray}}
+ // For variable-length types, we use adaptive batch sizing to ensure
+ // each batch's encoded data stays well within int32 range. This prevents
+ // overflow in FlushCurrentPage when computing uncompressed page size.
+ const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
+
+ batchSize := w.props.WriteBatchSize()
+ maxDefLevel := w.descr.MaxDefinitionLevel()
+ isV2WithRep := w.props.DataPageVersion() != parquet.DataPageV1 &&
+ repLevels != nil && w.descr.MaxRepetitionLevel() > 0
+ levelOffset := int64(0)
+
+ for levelOffset < n {
+ remaining := n - levelOffset
+ batch := min(remaining, batchSize)
+
+ // Scan values to find safe batch boundary
+ if values != nil {
+ valScan := valueOffset
+ var cumDataSize int64
+ for li := int64(0); li < batch; li++ {
+ isValue := defLevels == nil || maxDefLevel == 0 ||
+ defLevels[levelOffset+li] == maxDefLevel
+ if isValue && valScan < int64(len(values)) {
+{{- if eq .Name "ByteArray"}}
+ valSize := int64(len(values[valScan])) + 4
+{{- else}}
+ valSize := int64(w.descr.TypeLength()) + 4
+{{- end}}
+ if cumDataSize+valSize > maxSafeBatchDataSize && li > 0 {
+ batch = li
+ break
+ }
+ cumDataSize += valSize
+ valScan++
+ }
+ }
+ }
+
+ // V2 row-boundary alignment
+ if isV2WithRep && levelOffset+batch < n {
+ for batch > 1 && repLevels[levelOffset+batch] != 0 {
+ batch--
+ }
+ }
+ if batch < 1 {
+ batch = 1
+ }
+
+ // Process batch
+ toWrite := w.writeLevels(batch, levelSliceOrNil(defLevels, levelOffset,
batch),
+ levelSliceOrNil(repLevels, levelOffset, batch))
+ var vals []{{.name}}
+ if values != nil {
+ vals = values[valueOffset : valueOffset+toWrite]
+ }
+ w.writeValues(vals, batch-toWrite)
+ if err := w.commitWriteAndCheckPageLimit(batch, toWrite); err != nil {
+ panic(err)
+ }
+ valueOffset += toWrite
+ w.checkDictionarySizeLimit()
+ levelOffset += batch
+ }
+{{- else}}
w.doBatches(n, repLevels, func(offset, batch int64) {
var vals []{{.name}}
@@ -101,6 +164,7 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values
[]{{.name}}, defLevels, r
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
+{{- end}}
return
}
@@ -125,6 +189,61 @@ func (w *{{.Name}}ColumnChunkWriter)
WriteBatchSpaced(values []{{.name}}, defLev
if defLevels == nil {
length = len(values)
}
+{{- if .isByteArray}}
+ // For variable-length types, use adaptive batch sizing to keep encoded
+ // data within int32 range per page.
+ const maxSafeBatchDataSize int64 = 1 << 30 // 1GB
+
+ batchSize := w.props.WriteBatchSize()
+ levelOffset := int64(0)
+ n := int64(length)
+
+ for levelOffset < n {
+ remaining := n - levelOffset
+ batch := min(remaining, batchSize)
+
+ // Conservative scan: estimate data size from values starting at
valueOffset
+ if values != nil {
+ var cumDataSize int64
+ for vi := int64(0); vi < batch && valueOffset+vi < int64(len(values));
vi++ {
+{{- if eq .Name "ByteArray"}}
+ valSize := int64(len(values[valueOffset+vi])) + 4
+{{- else}}
+ valSize := int64(w.descr.TypeLength()) + 4
+{{- end}}
+ if cumDataSize+valSize > maxSafeBatchDataSize && vi > 0 {
+ batch = vi
+ break
+ }
+ cumDataSize += valSize
+ }
+ }
+ if batch < 1 {
+ batch = 1
+ }
+
+ info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels,
levelOffset, batch), batch)
+
+ w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, levelOffset, batch),
levelSliceOrNil(repLevels, levelOffset, batch))
+ var vals []{{.name}}
+ if values != nil {
+ vals = values[valueOffset : valueOffset+info.numSpaced()]
+ }
+
+ if w.bitsBuffer != nil {
+ w.writeValuesSpaced(vals, info.batchNum, batch, w.bitsBuffer.Bytes(), 0)
+ } else {
+ w.writeValuesSpaced(vals, info.batchNum, batch, validBits,
validBitsOffset+valueOffset)
+ }
+ if err := w.commitWriteAndCheckPageLimit(batch, info.numSpaced()); err !=
nil {
+ panic(err)
+ }
+ valueOffset += info.numSpaced()
+
+ w.checkDictionarySizeLimit()
+ levelOffset += batch
+ }
+{{- else}}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64)
{
var vals []{{.name}}
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset,
batch), batch)
@@ -144,6 +263,7 @@ func (w *{{.Name}}ColumnChunkWriter)
WriteBatchSpaced(values []{{.name}}, defLev
w.checkDictionarySizeLimit()
})
+{{- end}}
}
func (w *{{.Name}}ColumnChunkWriter) WriteDictIndices(indices arrow.Array,
defLevels, repLevels []int16) (err error) {
@@ -185,58 +305,7 @@ func (w *{{.Name}}ColumnChunkWriter)
WriteDictIndices(indices arrow.Array, defLe
}
func (w *{{.Name}}ColumnChunkWriter) writeValues(values []{{.name}}, numNulls
int64) {
-{{- if or (eq .Name "ByteArray") (eq .Name "FixedLenByteArray")}}
- // For variable-length types, we need to check buffer size to prevent int32
overflow
- // For small values (<1MB), checking frequently adds negligible overhead
- // For large values (>1MB), we MUST check before each value
- const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
- const largeValueThreshold = 1.0 * 1024 * 1024 // 1MB
-
- encoder := w.currentEncoder.(encoding.{{.Name}}Encoder)
- currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
- // Batch process small values, check individually for large values
- batchStart := 0
- for i := 0; i < len(values); i++ {
-{{- if eq .Name "ByteArray"}}
- valueSize := int64(len(values[i]))
-{{- else}}
- valueSize := int64(w.descr.TypeLength())
-{{- end}}
-
- // If this value might cause overflow, flush first
- if currentSize + valueSize >= maxSafeBufferSize {
- // Add accumulated batch before flushing
- if i > batchStart {
- encoder.Put(values[batchStart:i])
- currentSize = w.currentEncoder.EstimatedDataEncodedSize()
- }
- // Flush the page
- if err := w.FlushCurrentPage(); err != nil {
- panic(err)
- }
- batchStart = i
- currentSize = 0
- }
-
- // Track size estimate
- currentSize += valueSize + 4 // +4 for length prefix
-
- // For large values, add and flush immediately if needed
- if valueSize >= largeValueThreshold {
- encoder.Put(values[i:i+1])
- batchStart = i + 1
- currentSize = w.currentEncoder.EstimatedDataEncodedSize()
- }
- }
-
- // Add remaining batch
- if batchStart < len(values) {
- encoder.Put(values[batchStart:])
- }
-{{- else}}
w.currentEncoder.(encoding.{{.Name}}Encoder).Put(values)
-{{- end}}
if w.pageStatistics != nil {
{{- if ne .Name "FixedLenByteArray"}}
w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls)
@@ -255,54 +324,11 @@ func (w *{{.Name}}ColumnChunkWriter) writeValues(values
[]{{.name}}, numNulls in
}
func (w *{{.Name}}ColumnChunkWriter) writeValuesSpaced(spacedValues
[]{{.name}}, numRead, numValues int64, validBits []byte, validBitsOffset int64)
{
-{{- if or (eq .Name "ByteArray") (eq .Name "FixedLenByteArray")}}
- // For variable-length types, we need to check buffer size to prevent int32
overflow
- // For small values (<1MB), checking frequently adds negligible overhead
- // For large values (>1MB), we MUST check before each value
- const maxSafeBufferSize = 1.0 * 1024 * 1024 * 1024 // 1GB threshold
- const largeValueThreshold = 1.0 * 1024 * 1024 // 1MB
-
- encoder := w.currentEncoder.(encoding.{{.Name}}Encoder)
- currentSize := w.currentEncoder.EstimatedDataEncodedSize()
-
- for i := 0; i < len(spacedValues); i++ {
-{{- if eq .Name "ByteArray"}}
- valueSize := int64(len(spacedValues[i]))
-{{- else}}
- valueSize := int64(w.descr.TypeLength())
-{{- end}}
-
- // If this value might cause overflow, flush first
- if currentSize + valueSize >= maxSafeBufferSize {
- if err := w.FlushCurrentPage(); err != nil {
- // If flush fails, panic will be caught by WriteBatch's defer recover
- panic(err)
- }
- currentSize = 0
- }
-
- // Add the value
- chunk := spacedValues[i:i+1]
- if len(spacedValues) != int(numRead) && validBits != nil {
- encoder.PutSpaced(chunk, validBits, validBitsOffset+int64(i))
- } else {
- encoder.Put(chunk)
- }
-
- // Track size estimate (only update for large values or every 100 values)
- if valueSize >= largeValueThreshold || i % 100 == 0 {
- currentSize = w.currentEncoder.EstimatedDataEncodedSize()
- } else {
- currentSize += valueSize + 4 // +4 for length prefix
- }
- }
-{{- else}}
if len(spacedValues) != int(numRead) {
w.currentEncoder.(encoding.{{.Name}}Encoder).PutSpaced(spacedValues,
validBits, validBitsOffset)
} else {
w.currentEncoder.(encoding.{{.Name}}Encoder).Put(spacedValues)
}
-{{- end}}
if w.pageStatistics != nil {
nulls := numValues - numRead
{{- if ne .Name "FixedLenByteArray"}}
diff --git a/parquet/file/large_value_test.go b/parquet/file/large_value_test.go
index 3def3dc6..66925f11 100644
--- a/parquet/file/large_value_test.go
+++ b/parquet/file/large_value_test.go
@@ -18,6 +18,8 @@ package file_test
import (
"bytes"
+ "context"
+ "fmt"
"runtime"
"testing"
@@ -33,24 +35,26 @@ import (
)
// TestLargeByteArrayValuesDoNotOverflowInt32 tests that writing large byte
array
-// values that would exceed the 1GB flush threshold does not cause an int32
overflow panic.
-// The fix ensures pages are flushed automatically before buffer size exceeds
safe limits.
+// values totalling over 1GB in a single WriteBatch call triggers adaptive
batch
+// sizing and does not cause an int32 overflow panic in FlushCurrentPage.
+//
+// Memory note: input values all share one 1.5MB buffer so input memory is low,
+// but the parquet output buffer grows to ~1GB (unavoidable for this boundary
test).
func TestLargeByteArrayValuesDoNotOverflowInt32(t *testing.T) {
if runtime.GOARCH == "386" {
t.Skip("Skipping test on 32-bit architecture")
}
- // Create schema with a single byte array column
sc := schema.NewSchema(schema.MustGroup(schema.NewGroupNode("schema",
parquet.Repetitions.Required, schema.FieldList{
schema.Must(schema.NewPrimitiveNode("large_data",
parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
}, -1)))
props := parquet.NewWriterProperties(
- parquet.WithStats(false), // Disable stats to focus on core
issue
+ parquet.WithStats(false),
parquet.WithVersion(parquet.V2_LATEST),
parquet.WithDataPageVersion(parquet.DataPageV2),
- parquet.WithDictionaryDefault(false), // Plain encoding
- parquet.WithDataPageSize(1024*1024), // 1MB page size
+ parquet.WithDictionaryDefault(false),
+ parquet.WithDataPageSize(1024*1024),
)
out := &bytes.Buffer{}
@@ -60,47 +64,33 @@ func TestLargeByteArrayValuesDoNotOverflowInt32(t
*testing.T) {
rgw := writer.AppendRowGroup()
colWriter, _ := rgw.NextColumn()
- // Create 700 values of 1.5MB each (1.05GB total)
- // This exceeds the 1GB flush threshold, triggering automatic page
flushes
- // Uses minimal memory (single 1.5MB buffer reused) while testing loop
logic thoroughly
- const valueSize = 1.5 * 1024 * 1024 // 1.5MB per value (>= 1MB
threshold for large value handling)
- const numValues = 700 // 700 values = 1.05GB total
+ // 700 values × 1.5MB = 1.05GB total, triggers adaptive batch split at
~1GB.
+ // All values share the same underlying buffer (only ~1.5MB of input
memory).
+ const valueSize = 1.5 * 1024 * 1024
+ const numValues = 700
- // Create a single 1.5MB buffer and reuse it (only allocates 1.5MB!)
- largeValue := make([]byte, valueSize)
+ largeValue := make([]byte, int(valueSize))
for i := range largeValue {
largeValue[i] = byte(i % 256)
}
values := make([]parquet.ByteArray, numValues)
for i := range values {
- values[i] = largeValue // Reuse same buffer (memory efficient:
2MB total, writes 1.1GB)
+ values[i] = largeValue
}
- // This should NOT panic with int32 overflow
- // Expected behavior: automatically flush pages at 1GB threshold
byteArrayWriter := colWriter.(*file.ByteArrayColumnChunkWriter)
_, err := byteArrayWriter.WriteBatch(values, nil, nil)
-
- // Should succeed without panic
- assert.NoError(t, err)
-
- err = colWriter.Close()
- assert.NoError(t, err)
-
- err = rgw.Close()
assert.NoError(t, err)
- err = writer.Close()
- assert.NoError(t, err)
-
- // Verify we wrote data successfully
- assert.Greater(t, out.Len(), 0, "should have written data to buffer")
+ assert.NoError(t, colWriter.Close())
+ assert.NoError(t, rgw.Close())
+ assert.NoError(t, writer.Close())
+ assert.Greater(t, out.Len(), 0)
}
-// TestLargeStringArrayWithArrow tests the same issue using Arrow arrays
-// This tests the pqarrow integration path which is commonly used.
-// Uses LARGE_STRING type (int64 offsets) to handle >1GB of string data
without overflow.
+// TestLargeStringArrayWithArrow tests the pqarrow integration path with large
values.
+// Writes >1GB total through multiple small batches via the Arrow writer.
func TestLargeStringArrayWithArrow(t *testing.T) {
if runtime.GOARCH == "386" {
t.Skip("Skipping test on 32-bit architecture")
@@ -108,11 +98,9 @@ func TestLargeStringArrayWithArrow(t *testing.T) {
mem := memory.NewGoAllocator()
- // Create Arrow schema with LARGE_STRING field (uses int64 offsets, can
handle >2GB)
field := arrow.Field{Name: "large_strings", Type:
arrow.BinaryTypes.LargeString, Nullable: true}
arrowSchema := arrow.NewSchema([]arrow.Field{field}, nil)
- // Write to Parquet
out := &bytes.Buffer{}
props := parquet.NewWriterProperties(
parquet.WithStats(false),
@@ -125,39 +113,222 @@ func TestLargeStringArrayWithArrow(t *testing.T) {
pqw, err := pqarrow.NewFileWriter(arrowSchema, out, props,
pqarrow.NewArrowWriterProperties())
require.NoError(t, err)
- // Write in multiple batches to reduce memory usage
- // Each batch: 10 values × 10MB = 100MB
- // Total: 11 batches = 1.1GB written (only 100MB memory at a time!)
- const valueSize = 10 * 1024 * 1024 // 10MB per string (realistic large
blob)
- const valuesPerBatch = 10 // 10 values per batch
- const numBatches = 11 // 11 batches = 1.1GB total
+ // 11 batches × 10 values × 10MB = 1.1GB total.
+ // Only one batch (~100MB) is live at a time.
+ const valueSize = 10 * 1024 * 1024
+ const valuesPerBatch = 10
+ const numBatches = 11
largeStr := string(make([]byte, valueSize))
for batchNum := 0; batchNum < numBatches; batchNum++ {
- // Build a small batch
builder := array.NewLargeStringBuilder(mem)
for i := 0; i < valuesPerBatch; i++ {
builder.Append(largeStr)
}
arr := builder.NewArray()
-
rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr},
int64(valuesPerBatch))
- // Write batch - this should NOT panic with int32 overflow
err = pqw.Write(rec)
-
- // Clean up batch resources
rec.Release()
arr.Release()
builder.Release()
-
assert.NoError(t, err)
}
- err = pqw.Close()
- assert.NoError(t, err)
+ assert.NoError(t, pqw.Close())
+ assert.Greater(t, out.Len(), 0)
+}
- // Verify we wrote data successfully
- assert.Greater(t, out.Len(), 0, "should have written data to buffer")
+// TestLargeByteArrayRoundTripCorrectness verifies that ByteArray values
+// written through the pqarrow path are read back correctly. This ensures the
+// adaptive batch sizing loop in WriteBatch produces valid data pages where
+// levels and values stay in sync.
+//
+// Uses modest value sizes (~10KB each, ~2MB total) to keep memory low while
+// still exercising the full write→encode→flush→read→decode→compare path.
+// The >1GB adaptive-split boundary is tested by
TestLargeByteArrayValuesDoNotOverflowInt32.
+func TestLargeByteArrayRoundTripCorrectness(t *testing.T) {
+ mem := memory.NewGoAllocator()
+
+ field := arrow.Field{Name: "data", Type: arrow.BinaryTypes.LargeString,
Nullable: false}
+ arrowSchema := arrow.NewSchema([]arrow.Field{field}, nil)
+
+ out := &bytes.Buffer{}
+ props := parquet.NewWriterProperties(
+ parquet.WithStats(true),
+ parquet.WithVersion(parquet.V2_LATEST),
+ parquet.WithDataPageVersion(parquet.DataPageV2),
+ parquet.WithDictionaryDefault(false),
+ parquet.WithDataPageSize(1024*1024),
+ )
+
+ pqw, err := pqarrow.NewFileWriter(arrowSchema, out, props,
pqarrow.NewArrowWriterProperties())
+ require.NoError(t, err)
+
+ // 200 values × 10KB = ~2MB total. Each value has a unique,
deterministic
+ // pattern so we can regenerate expected content during verification
+ // without storing all values in memory.
+ const valueSize = 10 * 1024
+ const numValues = 200
+
+ makeValue := func(idx int) string {
+ buf := make([]byte, valueSize)
+ // First 4 bytes: index for identification
+ buf[0] = byte(idx >> 24)
+ buf[1] = byte(idx >> 16)
+ buf[2] = byte(idx >> 8)
+ buf[3] = byte(idx)
+ // Remaining bytes: deterministic pattern
+ for j := 4; j < valueSize; j++ {
+ buf[j] = byte(idx*31 + j)
+ }
+ return string(buf)
+ }
+
+ builder := array.NewLargeStringBuilder(mem)
+ for i := 0; i < numValues; i++ {
+ builder.Append(makeValue(i))
+ }
+ arr := builder.NewArray()
+ rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr},
int64(numValues))
+
+ err = pqw.Write(rec)
+ require.NoError(t, err)
+
+ rec.Release()
+ arr.Release()
+ builder.Release()
+
+ require.NoError(t, pqw.Close())
+
+ // Read back and verify
+ rdr, err := file.NewParquetReader(bytes.NewReader(out.Bytes()))
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ arrowRdr, err := pqarrow.NewFileReader(rdr,
pqarrow.ArrowReadProperties{}, mem)
+ require.NoError(t, err)
+
+ tbl, err := arrowRdr.ReadTable(context.Background())
+ require.NoError(t, err)
+ defer tbl.Release()
+
+ require.EqualValues(t, numValues, tbl.NumRows())
+ require.Equal(t, 1, int(tbl.NumCols()))
+
+ col := tbl.Column(0)
+ rowIdx := 0
+ for _, chunk := range col.Data().Chunks() {
+ strArr := chunk.(*array.String)
+ for j := 0; j < strArr.Len(); j++ {
+ got := strArr.Value(j)
+ expected := makeValue(rowIdx)
+ require.Equal(t, len(expected), len(got),
+ "value %d: length mismatch", rowIdx)
+ require.Equal(t, expected[:4], got[:4],
+ "value %d: header mismatch (data corruption)",
rowIdx)
+ require.Equal(t, expected, got,
+ "value %d: full content mismatch", rowIdx)
+ rowIdx++
+ }
+ }
+ require.Equal(t, numValues, rowIdx, "did not read back all values")
+}
+
+// TestLargeByteArrayRoundTripWithNulls verifies correctness of the
+// WriteBatchSpaced path (nullable column) with moderately-sized values.
+// Every 3rd value is null. Uses ~3MB total.
+func TestLargeByteArrayRoundTripWithNulls(t *testing.T) {
+ mem := memory.NewGoAllocator()
+
+ field := arrow.Field{Name: "data", Type: arrow.BinaryTypes.LargeString,
Nullable: true}
+ arrowSchema := arrow.NewSchema([]arrow.Field{field}, nil)
+
+ out := &bytes.Buffer{}
+ props := parquet.NewWriterProperties(
+ parquet.WithStats(true),
+ parquet.WithVersion(parquet.V2_LATEST),
+ parquet.WithDataPageVersion(parquet.DataPageV2),
+ parquet.WithDictionaryDefault(false),
+ parquet.WithDataPageSize(1024*1024),
+ )
+
+ pqw, err := pqarrow.NewFileWriter(arrowSchema, out, props,
pqarrow.NewArrowWriterProperties())
+ require.NoError(t, err)
+
+ const valueSize = 10 * 1024
+ const numValues = 300
+
+ makeValue := func(idx int) string {
+ buf := make([]byte, valueSize)
+ buf[0] = byte(idx >> 24)
+ buf[1] = byte(idx >> 16)
+ buf[2] = byte(idx >> 8)
+ buf[3] = byte(idx)
+ for j := 4; j < valueSize; j++ {
+ buf[j] = byte(idx*17 + j)
+ }
+ return string(buf)
+ }
+
+ // Track which indices are null for verification
+ isNull := func(i int) bool { return i%3 == 0 }
+
+ builder := array.NewLargeStringBuilder(mem)
+ for i := 0; i < numValues; i++ {
+ if isNull(i) {
+ builder.AppendNull()
+ } else {
+ builder.Append(makeValue(i))
+ }
+ }
+ arr := builder.NewArray()
+ rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr},
int64(numValues))
+
+ err = pqw.Write(rec)
+ require.NoError(t, err)
+
+ rec.Release()
+ arr.Release()
+ builder.Release()
+
+ require.NoError(t, pqw.Close())
+
+ // Read back and verify
+ rdr, err := file.NewParquetReader(bytes.NewReader(out.Bytes()))
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ arrowRdr, err := pqarrow.NewFileReader(rdr,
pqarrow.ArrowReadProperties{}, mem)
+ require.NoError(t, err)
+
+ tbl, err := arrowRdr.ReadTable(context.Background())
+ require.NoError(t, err)
+ defer tbl.Release()
+
+ require.EqualValues(t, numValues, tbl.NumRows())
+
+ col := tbl.Column(0)
+ rowIdx := 0
+ for _, chunk := range col.Data().Chunks() {
+ strArr := chunk.(*array.String)
+ for j := 0; j < strArr.Len(); j++ {
+ if isNull(rowIdx) {
+ require.True(t, strArr.IsNull(j), "value %d:
expected null", rowIdx)
+ } else {
+ require.False(t, strArr.IsNull(j), "value %d:
unexpected null", rowIdx)
+ got := strArr.Value(j)
+ expected := makeValue(rowIdx)
+ require.Equal(t, len(expected), len(got),
+ "value %d: length mismatch", rowIdx)
+ require.Equal(t, expected[:4], got[:4],
+ "value %d: header mismatch (data
corruption)", rowIdx)
+ require.Equal(t, expected, got,
+ fmt.Sprintf("value %d: full content
mismatch", rowIdx))
+ }
+ rowIdx++
+ }
+ }
+ require.Equal(t, numValues, rowIdx, "did not read back all values")
}