This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 2d3b5b4d fix: support INT32/INT64 physical types for decimal columns
(#686)
2d3b5b4d is described below
commit 2d3b5b4d48751ffe55993a76a4227a8fe9b9e949
Author: Karthic Rao <[email protected]>
AuthorDate: Thu Jan 22 22:53:08 2026 +0530
fix: support INT32/INT64 physical types for decimal columns (#686)
## Summary
Parquet decimals can be stored using multiple physical types depending
on precision:
- `INT32` for precision <= 9
- `INT64` for precision <= 18
- `FIXED_LEN_BYTE_ARRAY` for any precision
- `BYTE_ARRAY` for any precision
The previous implementation only accepted `FIXED_LEN_BYTE_ARRAY` for all
decimals and rejected valid parquet files with error:
```
unexpected physical type INT32 for decimal(7, 2), expected
FIXED_LEN_BYTE_ARRAY
```
This caused `AddFiles` to fail when importing datasets (like TPC-DS)
that use INT32/INT64 for small precision decimals, which is valid per
the Parquet specification.
## Changes
- Refactors `createStatsAgg` to switch on Iceberg logical type first,
then handle physical representations (matches iceberg-java's
`ParquetConversions.java` approach)
- For `DecimalType`, accepts all valid parquet physical types
- Updates `DataFileStatsFromMeta` to handle INT32/INT64 decimal
statistics
- Adds `wrappedDecByteArrayStats` for BYTE_ARRAY encoded decimals
## Test plan
- [x] Existing tests pass
- [x] Build succeeds
- [x] Tested with TPC-DS parquet files that use INT32 decimals
---
table/internal/parquet_files.go | 121 ++++++++++++++++++---------
table/internal/parquet_files_test.go | 155 +++++++++++++++++++++++++++++++++++
2 files changed, 235 insertions(+), 41 deletions(-)
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 7ed2f309..8e7cf252 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -101,59 +101,66 @@ func (parquetFormat) PathToIDMapping(sc *iceberg.Schema)
(map[string]int, error)
}
func (p parquetFormat) createStatsAgg(typ iceberg.PrimitiveType,
physicalTypeStr string, truncLen int) (StatsAgg, error) {
- expectedPhysical := p.PrimitiveTypeToPhysicalType(typ)
- if physicalTypeStr != expectedPhysical {
- switch {
- case physicalTypeStr == "INT32" && expectedPhysical == "INT64":
- case physicalTypeStr == "FLOAT" && expectedPhysical == "DOUBLE":
- default:
- return nil, fmt.Errorf("unexpected physical type %s for
%s, expected %s",
- physicalTypeStr, typ, expectedPhysical)
- }
- }
-
- switch physicalTypeStr {
- case "BOOLEAN":
+ // Switch on Iceberg logical type first, then handle physical
representations.
+ // This matches iceberg-java's approach in ParquetConversions.java.
+ switch typ.(type) {
+ case iceberg.BooleanType:
return newStatAgg[bool](typ, truncLen), nil
- case "INT32":
- switch typ.(type) {
- case iceberg.DecimalType:
- return &decAsIntAgg[int32]{
- newStatAgg[int32](typ,
truncLen).(*statsAggregator[int32]),
- }, nil
- }
+ case iceberg.Int32Type, iceberg.DateType:
return newStatAgg[int32](typ, truncLen), nil
- case "INT64":
- switch typ.(type) {
- case iceberg.DecimalType:
- return &decAsIntAgg[int64]{
- newStatAgg[int64](typ,
truncLen).(*statsAggregator[int64]),
- }, nil
+
+ case iceberg.Int64Type, iceberg.TimeType, iceberg.TimestampType,
iceberg.TimestampTzType:
+ // Allow INT32 physical for INT64 logical (promotion)
+ if physicalTypeStr == "INT32" {
+ return newStatAgg[int32](typ, truncLen), nil
}
return newStatAgg[int64](typ, truncLen), nil
- case "FLOAT":
+
+ case iceberg.Float32Type:
return newStatAgg[float32](typ, truncLen), nil
- case "DOUBLE":
+
+ case iceberg.Float64Type:
+ // Allow FLOAT physical for DOUBLE logical (promotion)
+ if physicalTypeStr == "FLOAT" {
+ return newStatAgg[float32](typ, truncLen), nil
+ }
+
return newStatAgg[float64](typ, truncLen), nil
- case "FIXED_LEN_BYTE_ARRAY":
- switch typ.(type) {
- case iceberg.UUIDType:
- return newStatAgg[uuid.UUID](typ, truncLen), nil
- case iceberg.DecimalType:
+
+ case iceberg.StringType:
+ return newStatAgg[string](typ, truncLen), nil
+
+ case iceberg.BinaryType:
+ return newStatAgg[[]byte](typ, truncLen), nil
+
+ case iceberg.UUIDType:
+ return newStatAgg[uuid.UUID](typ, truncLen), nil
+
+ case iceberg.FixedType:
+ return newStatAgg[[]byte](typ, truncLen), nil
+
+ case iceberg.DecimalType:
+ // Decimals can be stored as INT32 (precision <= 9), INT64
(precision <= 18),
+ // FIXED_LEN_BYTE_ARRAY, or BYTE_ARRAY per Parquet spec.
+ switch physicalTypeStr {
+ case "INT32":
+ return &decAsIntAgg[int32]{
+ newStatAgg[int32](typ,
truncLen).(*statsAggregator[int32]),
+ }, nil
+ case "INT64":
+ return &decAsIntAgg[int64]{
+ newStatAgg[int64](typ,
truncLen).(*statsAggregator[int64]),
+ }, nil
+ case "FIXED_LEN_BYTE_ARRAY", "BYTE_ARRAY":
return newStatAgg[iceberg.Decimal](typ, truncLen), nil
default:
- return newStatAgg[[]byte](typ, truncLen), nil
- }
- case "BYTE_ARRAY":
- if typ.Equals(iceberg.PrimitiveTypes.String) {
- return newStatAgg[string](typ, truncLen), nil
+ return nil, fmt.Errorf("unsupported physical type %s
for decimal", physicalTypeStr)
}
- return newStatAgg[[]byte](typ, truncLen), nil
default:
- return nil, fmt.Errorf("unsupported physical type: %s",
physicalTypeStr)
+ return nil, fmt.Errorf("unsupported iceberg type: %s", typ)
}
}
@@ -400,6 +407,29 @@ func (w wrappedDecStats) Max() iceberg.Decimal {
return iceberg.Decimal{Val: dec, Scale: w.scale}
}
+type wrappedDecByteArrayStats struct {
+ *metadata.ByteArrayStatistics
+ scale int
+}
+
+func (w wrappedDecByteArrayStats) Min() iceberg.Decimal {
+ dec, err := BigEndianToDecimal(w.ByteArrayStatistics.Min())
+ if err != nil {
+ panic(err)
+ }
+
+ return iceberg.Decimal{Val: dec, Scale: w.scale}
+}
+
+func (w wrappedDecByteArrayStats) Max() iceberg.Decimal {
+ dec, err := BigEndianToDecimal(w.ByteArrayStatistics.Max())
+ if err != nil {
+ panic(err)
+ }
+
+ return iceberg.Decimal{Val: dec, Scale: w.scale}
+}
+
func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols
map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics {
pqmeta := meta.(*metadata.FileMetaData)
var (
@@ -487,7 +517,16 @@ func (p parquetFormat) DataFileStatsFromMeta(meta
Metadata, statsCols map[int]St
case iceberg.FixedType:
stats =
&wrappedFLBAStats{stats.(*metadata.FixedLenByteArrayStatistics)}
case iceberg.DecimalType:
- stats =
&wrappedDecStats{stats.(*metadata.FixedLenByteArrayStatistics), t.Scale()}
+ // Decimals can be stored as INT32/INT64 (small
precision) or FIXED_LEN_BYTE_ARRAY/BYTE_ARRAY.
+ // Only wrap FIXED_LEN_BYTE_ARRAY and
BYTE_ARRAY statistics; INT32/INT64 stats
+ // are used directly by decAsIntAgg.
+ switch s := stats.(type) {
+ case *metadata.FixedLenByteArrayStatistics:
+ stats = &wrappedDecStats{s, t.Scale()}
+ case *metadata.ByteArrayStatistics:
+ stats = &wrappedDecByteArrayStats{s,
t.Scale()}
+ // INT32/INT64 statistics are used
directly by decAsIntAgg
+ }
}
agg.Update(stats)
diff --git a/table/internal/parquet_files_test.go
b/table/internal/parquet_files_test.go
index 47772cc8..85480da0 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -20,6 +20,7 @@ package internal_test
import (
"bytes"
"context"
+ "fmt"
"math/big"
"strings"
"testing"
@@ -33,6 +34,7 @@ import (
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/metadata"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
+ "github.com/apache/arrow-go/v18/parquet/schema"
"github.com/apache/iceberg-go"
internal2 "github.com/apache/iceberg-go/internal"
"github.com/apache/iceberg-go/table"
@@ -330,6 +332,159 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
})
}
+// TestDecimalPhysicalTypes tests that decimals stored as INT32/INT64 physical
types
+// are correctly handled. This is important because Parquet allows decimals
with
+// precision <= 9 to be stored as INT32, and precision <= 18 as INT64.
+func TestDecimalPhysicalTypes(t *testing.T) {
+ format := internal.GetFileFormat(iceberg.ParquetFile)
+
+ tests := []struct {
+ name string
+ precision int
+ scale int
+ physicalType parquet.Type
+ values []int64 // unscaled values
+ expectedMin int64
+ expectedMax int64
+ }{
+ {
+ name: "decimal_as_int32",
+ precision: 7,
+ scale: 2,
+ physicalType: parquet.Types.Int32,
+ values: []int64{12345, 67890}, // represents
123.45, 678.90
+ expectedMin: 12345,
+ expectedMax: 67890,
+ },
+ {
+ name: "decimal_as_int64",
+ precision: 15,
+ scale: 2,
+ physicalType: parquet.Types.Int64,
+ values: []int64{123456789012345, 987654321098765},
+ expectedMin: 123456789012345,
+ expectedMax: 987654321098765,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create a Parquet file with decimal stored as INT32
or INT64
+ var buf bytes.Buffer
+
+ // Build a custom schema with decimal logical type
+ decType :=
schema.NewDecimalLogicalType(int32(tt.precision), int32(tt.scale))
+ var node schema.Node
+ var err error
+ if tt.physicalType == parquet.Types.Int32 {
+ node, err =
schema.NewPrimitiveNodeLogical("value", parquet.Repetitions.Required,
+ decType, parquet.Types.Int32, 0, 1)
+ } else {
+ node, err =
schema.NewPrimitiveNodeLogical("value", parquet.Repetitions.Required,
+ decType, parquet.Types.Int64, 0, 1)
+ }
+ require.NoError(t, err)
+
+ rootNode, err := schema.NewGroupNode("schema",
parquet.Repetitions.Required, schema.FieldList{node}, -1)
+ require.NoError(t, err)
+
+ // Write the parquet file
+ writer := file.NewParquetWriter(&buf,
+ rootNode,
+
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithStats(true))))
+
+ rgw := writer.AppendRowGroup()
+ colWriter, err := rgw.NextColumn()
+ require.NoError(t, err)
+
+ if tt.physicalType == parquet.Types.Int32 {
+ int32Writer :=
colWriter.(*file.Int32ColumnChunkWriter)
+ vals := make([]int32, len(tt.values))
+ for i, v := range tt.values {
+ vals[i] = int32(v)
+ }
+ _, err = int32Writer.WriteBatch(vals, nil, nil)
+ } else {
+ int64Writer :=
colWriter.(*file.Int64ColumnChunkWriter)
+ _, err = int64Writer.WriteBatch(tt.values, nil,
nil)
+ }
+ require.NoError(t, err)
+
+ require.NoError(t, colWriter.Close())
+ require.NoError(t, rgw.Close())
+ require.NoError(t, writer.Close())
+
+ // Read back and get metadata
+ rdr, err :=
file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ meta := rdr.MetaData()
+
+ // Create table metadata with decimal type
+ tableMeta, err :=
table.ParseMetadataString(fmt.Sprintf(`{
+ "format-version": 2,
+ "location": "s3://bucket/test/location",
+ "last-column-id": 1,
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name":
"value", "required": true, "type": "decimal(%d, %d)"}
+ ]
+ }
+ ],
+ "last-partition-id": 0,
+ "last-updated-ms": -1,
+ "default-spec-id": 0,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "partition-specs": [{"spec-id": 0, "fields":
[]}],
+ "properties": {}
+ }`, tt.precision, tt.scale))
+ require.NoError(t, err)
+
+ mapping, err :=
format.PathToIDMapping(tableMeta.CurrentSchema())
+ require.NoError(t, err)
+
+ collector := map[int]internal.StatisticsCollector{
+ 1: {
+ FieldID: 1,
+ Mode: internal.MetricsMode{Typ:
internal.MetricModeFull},
+ ColName: "value",
+ IcebergTyp:
iceberg.DecimalTypeOf(tt.precision, tt.scale),
+ },
+ }
+
+ // This should not panic - the fix allows INT32/INT64
physical types for decimals
+ stats :=
format.DataFileStatsFromMeta(internal.Metadata(meta), collector, mapping)
+ require.NotNil(t, stats)
+
+ df := stats.ToDataFile(tableMeta.CurrentSchema(),
tableMeta.PartitionSpec(), "test.parquet",
+ iceberg.ParquetFile, meta.GetSourceFileSize(),
nil)
+
+ // Verify bounds are correctly extracted
+ require.Contains(t, df.LowerBoundValues(), 1)
+ require.Contains(t, df.UpperBoundValues(), 1)
+
+ // Verify the actual values
+ minLit, err :=
iceberg.LiteralFromBytes(iceberg.DecimalTypeOf(tt.precision, tt.scale),
df.LowerBoundValues()[1])
+ require.NoError(t, err)
+ minDec :=
minLit.(iceberg.TypedLiteral[iceberg.Decimal]).Value()
+ assert.Equal(t, uint64(tt.expectedMin),
minDec.Val.LowBits())
+ assert.Equal(t, tt.scale, minDec.Scale)
+
+ maxLit, err :=
iceberg.LiteralFromBytes(iceberg.DecimalTypeOf(tt.precision, tt.scale),
df.UpperBoundValues()[1])
+ require.NoError(t, err)
+ maxDec :=
maxLit.(iceberg.TypedLiteral[iceberg.Decimal]).Value()
+ assert.Equal(t, uint64(tt.expectedMax),
maxDec.Val.LowBits())
+ assert.Equal(t, tt.scale, maxDec.Scale)
+ })
+ }
+}
+
func TestWriteDataFileErrOnClose(t *testing.T) {
ctx := context.Background()
fm := internal.GetFileFormat(iceberg.ParquetFile)