This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new c62af3c6 feat(table): support zstd metadata compression codec (#1020)
c62af3c6 is described below
commit c62af3c624bcf7afd9f75123fbc6ce60362f8da8
Author: Tanmay Rauth <[email protected]>
AuthorDate: Thu May 7 12:26:32 2026 -0700
feat(table): support zstd metadata compression codec (#1020)
Adds MetadataCompressionCodecZstd alongside gzip/none and wires it
through both read (decompress .metadata.json.zstd) and write
(write.metadata.compression-codec=zstd) paths. Uses
klauspost/compress/zstd already present as a transitive dependency.
Closes #1005
---
catalog/internal/utils.go | 20 ++-
go.mod | 4 +-
table/locations.go | 2 +
table/metadata_internal_test.go | 23 +++
table/properties.go | 1 +
table/table.go | 46 ++++--
table/table_test.go | 176 +++++++++++++++++++++
.../TableMetadataV2Valid.zstd.metadata.json | Bin 0 -> 374 bytes
table/testdata/TableMetadataV2ValidMinimal.json | 1 +
9 files changed, 255 insertions(+), 18 deletions(-)
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index 6e1f2b39..87383bf1 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -37,11 +37,12 @@ import (
icebergio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/google/uuid"
+ "github.com/klauspost/compress/zstd"
)
func WriteTableMetadata(metadata table.Metadata, fs icebergio.WriteFileIO, loc
string, compression string) (err error) {
switch compression {
- case table.MetadataCompressionCodecNone,
table.MetadataCompressionCodecGzip:
+ case table.MetadataCompressionCodecNone,
table.MetadataCompressionCodecGzip, table.MetadataCompressionCodecZstd:
// supported codecs
default:
return fmt.Errorf("unsupported write metadata compression
codec: %s", compression)
@@ -54,11 +55,18 @@ func WriteTableMetadata(metadata table.Metadata, fs
icebergio.WriteFileIO, loc s
defer internal.CheckedClose(out, &err)
var writer io.Writer = out
- var compressWriter io.WriteCloser
- if compression == table.MetadataCompressionCodecGzip {
- compressWriter = gzip.NewWriter(out)
- writer = compressWriter
- defer internal.CheckedClose(compressWriter, &err)
+ switch compression {
+ case table.MetadataCompressionCodecGzip:
+ gzw := gzip.NewWriter(out)
+ writer = gzw
+ defer internal.CheckedClose(gzw, &err)
+ case table.MetadataCompressionCodecZstd:
+ enc, zErr := zstd.NewWriter(out)
+ if zErr != nil {
+ return zErr
+ }
+ writer = enc
+ defer internal.CheckedClose(enc, &err)
}
err = json.NewEncoder(writer).Encode(metadata)
diff --git a/go.mod b/go.mod
index 96115538..ccbcc823 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
cloud.google.com/go/storage v1.62.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
+ github.com/alexflint/go-arg v1.6.1
github.com/apache/arrow-go/v18 v18.6.0
github.com/aws/aws-sdk-go-v2 v1.41.6
github.com/aws/aws-sdk-go-v2/config v1.32.16
@@ -34,9 +35,9 @@ require (
github.com/beltran/gohive v1.8.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/docker/docker v28.5.2+incompatible
- github.com/alexflint/go-arg v1.6.1
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
+ github.com/klauspost/compress v1.18.5
github.com/pterm/pterm v0.12.83
github.com/stretchr/testify v1.11.1
github.com/substrait-io/substrait-go/v8 v8.1.0
@@ -169,7 +170,6 @@ require (
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf //
indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jonboulle/clockwork v0.5.0 // indirect
- github.com/klauspost/compress v1.18.5 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lithammer/fuzzysearch v1.1.8 // indirect
diff --git a/table/locations.go b/table/locations.go
index 1047b46c..bdd0b0f1 100644
--- a/table/locations.go
+++ b/table/locations.go
@@ -70,6 +70,8 @@ func (slp *simpleLocationProvider)
NewTableMetadataFileLocation(newVersion int)
ext = ".metadata.json"
case MetadataCompressionCodecGzip:
ext = ".gz.metadata.json"
+ case MetadataCompressionCodecZstd:
+ ext = ".zstd.metadata.json"
default:
return "", fmt.Errorf("unsupported write metadata compression
codec: %s", compression)
}
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index 174ed330..43e50f94 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -18,7 +18,9 @@
package table
import (
+ "bytes"
"encoding/json"
+ "io"
"os"
"path"
"slices"
@@ -29,6 +31,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
+ "github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -1831,3 +1834,23 @@ func TestV3PartitionStatisticsRoundTrip(t *testing.T) {
assert.JSONEq(t, string(raw), string(serialized))
}
+
+func TestZstdGoldenFixture(t *testing.T) {
+ compressed, err := os.ReadFile(path.Join("testdata",
"TableMetadataV2Valid.zstd.metadata.json"))
+ require.NoError(t, err)
+
+ dec, err := zstd.NewReader(bytes.NewReader(compressed))
+ require.NoError(t, err)
+ defer dec.Close()
+
+ data, err := io.ReadAll(dec)
+ require.NoError(t, err)
+
+ meta, err := ParseMetadataBytes(data)
+ require.NoError(t, err)
+
+ expected, err :=
getTestTableMetadata("TableMetadataV2ValidMinimal.json")
+ require.NoError(t, err)
+
+ assert.True(t, expected.Equals(meta))
+}
diff --git a/table/properties.go b/table/properties.go
index 6c5d04c1..c3878f36 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -157,6 +157,7 @@ var ReservedProperties = [9]string{
const (
MetadataCompressionCodecNone = "none"
MetadataCompressionCodecGzip = "gzip"
+ MetadataCompressionCodecZstd = "zstd"
)
// Write modes
diff --git a/table/table.go b/table/table.go
index c4c2a5de..3dd78057 100644
--- a/table/table.go
+++ b/table/table.go
@@ -38,6 +38,7 @@ import (
"github.com/apache/iceberg-go/internal"
icebergio "github.com/apache/iceberg-go/io"
tblutils "github.com/apache/iceberg-go/table/internal"
+ "github.com/klauspost/compress/zstd"
"golang.org/x/sync/errgroup"
)
@@ -864,13 +865,14 @@ func NewFromLocation(
return nil, err
}
- if isGzippedMetadataJson(metalocation) {
- gz, err := gzip.NewReader(bytes.NewReader(data))
+ if codec := metadataCompressionCodec(metalocation); codec != ""
{
+ rc, err := newDecompressor(bytes.NewReader(data), codec)
if err != nil {
return nil, err
}
- defer gz.Close()
- data, err = io.ReadAll(gz)
+ defer rc.Close()
+
+ data, err = io.ReadAll(rc)
if err != nil {
return nil, err
}
@@ -887,13 +889,14 @@ func NewFromLocation(
defer internal.CheckedClose(f, &err)
var r io.Reader = f
- if isGzippedMetadataJson(metalocation) {
- gz, err := gzip.NewReader(f)
+ if codec := metadataCompressionCodec(metalocation); codec != ""
{
+ rc, err := newDecompressor(f, codec)
if err != nil {
return nil, err
}
- defer gz.Close()
- r = gz
+ defer rc.Close()
+
+ r = rc
}
if meta, err = ParseMetadata(r); err != nil {
@@ -904,6 +907,29 @@ func NewFromLocation(
return New(ident, meta, metalocation, fsysF, cat), nil
}
-func isGzippedMetadataJson(location string) bool {
- return strings.HasSuffix(location, ".gz.metadata.json") ||
strings.HasSuffix(location, "metadata.json.gz")
+func metadataCompressionCodec(location string) string {
+ switch {
+ case strings.HasSuffix(location, ".gz.metadata.json") ||
strings.HasSuffix(location, "metadata.json.gz"):
+ return MetadataCompressionCodecGzip
+ case strings.HasSuffix(location, ".zstd.metadata.json") ||
strings.HasSuffix(location, "metadata.json.zstd"):
+ return MetadataCompressionCodecZstd
+ default:
+ return ""
+ }
+}
+
+func newDecompressor(r io.Reader, codec string) (io.ReadCloser, error) {
+ switch codec {
+ case MetadataCompressionCodecGzip:
+ return gzip.NewReader(r)
+ case MetadataCompressionCodecZstd:
+ dec, err := zstd.NewReader(r)
+ if err != nil {
+ return nil, err
+ }
+
+ return dec.IOReadCloser(), nil
+ default:
+ return nil, fmt.Errorf("unsupported metadata decompression
codec: %s", codec)
+ }
}
diff --git a/table/table_test.go b/table/table_test.go
index f10c8019..17c5bd16 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -50,6 +50,7 @@ import (
iceio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/google/uuid"
+ "github.com/klauspost/compress/zstd"
"github.com/pterm/pterm"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@@ -171,6 +172,126 @@ func (t *TableTestSuite)
TestNewTableFromReadFileGzipped() {
t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
}
+func (t *TableTestSuite) TestNewTableFromReadFileZstd() {
+ var b bytes.Buffer
+ enc, err := zstd.NewWriter(&b)
+ t.Require().NoError(err)
+
+ _, err = enc.Write([]byte(table.ExampleTableMetadataV2))
+ t.Require().NoError(err)
+ t.Require().NoError(enc.Close())
+
+ var mockfsReadFile internal.MockFSReadFile
+ mockfsReadFile.Test(t.T())
+ mockfsReadFile.On("ReadFile",
"s3://bucket/test/location/uuid.zstd.metadata.json").
+ Return(b.Bytes(), nil)
+ defer mockfsReadFile.AssertExpectations(t.T())
+
+ tbl2, err := table.NewFromLocation(
+ t.T().Context(),
+ []string{"foo"},
+ "s3://bucket/test/location/uuid.zstd.metadata.json",
+ func(ctx context.Context) (iceio.IO, error) {
+ return &mockfsReadFile, nil
+ },
+ nil,
+ )
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl2)
+
+ t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
+func (t *TableTestSuite) TestNewTableFromReadFileZstdAlternateSuffix() {
+ var b bytes.Buffer
+ enc, err := zstd.NewWriter(&b)
+ t.Require().NoError(err)
+
+ _, err = enc.Write([]byte(table.ExampleTableMetadataV2))
+ t.Require().NoError(err)
+ t.Require().NoError(enc.Close())
+
+ var mockfsReadFile internal.MockFSReadFile
+ mockfsReadFile.Test(t.T())
+ mockfsReadFile.On("ReadFile",
"s3://bucket/test/location/uuid.metadata.json.zstd").
+ Return(b.Bytes(), nil)
+ defer mockfsReadFile.AssertExpectations(t.T())
+
+ tbl2, err := table.NewFromLocation(
+ t.T().Context(),
+ []string{"foo"},
+ "s3://bucket/test/location/uuid.metadata.json.zstd",
+ func(ctx context.Context) (iceio.IO, error) {
+ return &mockfsReadFile, nil
+ },
+ nil,
+ )
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl2)
+
+ t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
+func (t *TableTestSuite) TestNewTableFromOpenZstd() {
+ var b bytes.Buffer
+ enc, err := zstd.NewWriter(&b)
+ t.Require().NoError(err)
+
+ _, err = enc.Write([]byte(table.ExampleTableMetadataV2))
+ t.Require().NoError(err)
+ t.Require().NoError(enc.Close())
+
+ var mockfs internal.MockFS
+ mockfs.Test(t.T())
+ mockfs.On("Open", "s3://bucket/test/location/uuid.zstd.metadata.json").
+ Return(&internal.MockFile{Contents:
bytes.NewReader(b.Bytes())}, nil)
+ defer mockfs.AssertExpectations(t.T())
+
+ tbl2, err := table.NewFromLocation(
+ t.T().Context(),
+ []string{"foo"},
+ "s3://bucket/test/location/uuid.zstd.metadata.json",
+ func(ctx context.Context) (iceio.IO, error) {
+ return &mockfs, nil
+ },
+ nil,
+ )
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl2)
+
+ t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
+func (t *TableTestSuite) TestNewTableFromOpenZstdAlternateSuffix() {
+ var b bytes.Buffer
+ enc, err := zstd.NewWriter(&b)
+ t.Require().NoError(err)
+
+ _, err = enc.Write([]byte(table.ExampleTableMetadataV2))
+ t.Require().NoError(err)
+ t.Require().NoError(enc.Close())
+
+ var mockfs internal.MockFS
+ mockfs.Test(t.T())
+ mockfs.On("Open", "s3://bucket/test/location/uuid.metadata.json.zstd").
+ Return(&internal.MockFile{Contents:
bytes.NewReader(b.Bytes())}, nil)
+ defer mockfs.AssertExpectations(t.T())
+
+ tbl2, err := table.NewFromLocation(
+ t.T().Context(),
+ []string{"foo"},
+ "s3://bucket/test/location/uuid.metadata.json.zstd",
+ func(ctx context.Context) (iceio.IO, error) {
+ return &mockfs, nil
+ },
+ nil,
+ )
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl2)
+
+ t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
func (t *TableTestSuite) TestSchema() {
t.True(t.tbl.Schema().Equals(iceberg.NewSchemaWithIdentifiers(1,
[]int{1, 2},
iceberg.NestedField{ID: 1, Name: "x", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
@@ -3063,6 +3184,61 @@ func (t *TableTestSuite)
TestMetadataCompressionRoundTrip() {
t.True(tbl.Equals(*tbl2))
}
+func (t *TableTestSuite) TestMetadataCompressionRoundTripZstd() {
+ cat, err := catalog.Load(context.Background(), "default",
iceberg.Properties{
+ "uri": ":memory:",
+ "type": "sql",
+ sql.DriverKey: sqliteshim.ShimName,
+ sql.DialectKey: string(sql.SQLite),
+ "warehouse": "file://" + t.T().TempDir(),
+ })
+ t.Require().NoError(err)
+
+ ident := table.Identifier{"test", "zstd_compression_table"}
+ t.Require().NoError(cat.CreateNamespace(context.Background(),
catalog.NamespaceFromIdent(ident), nil))
+
+ tbl, err := cat.CreateTable(context.Background(), ident, t.tbl.Schema(),
+ catalog.WithProperties(iceberg.Properties{
+ table.MetadataCompressionKey: "zstd",
+ }))
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl)
+
+ metadataLoc := tbl.MetadataLocation()
+ t.Contains(metadataLoc, ".zstd.metadata.json")
+
+ fs, err := tbl.FS(context.Background())
+ t.Require().NoError(err)
+
+ file, err := fs.Open(metadataLoc)
+ t.Require().NoError(err)
+ defer file.Close()
+
+ metadataBytes, err := io.ReadAll(file)
+ t.Require().NoError(err)
+
+ dec, err := zstd.NewReader(bytes.NewReader(metadataBytes))
+ t.Require().NoError(err)
+ defer dec.Close()
+
+ decompressed, err := io.ReadAll(dec)
+ t.Require().NoError(err)
+
+ var metadata map[string]any
+ err = json.Unmarshal(decompressed, &metadata)
+ t.Require().NoError(err)
+
+ t.Contains(metadata, "format-version")
+ t.Contains(metadata, "table-uuid")
+ t.Contains(metadata, "location")
+
+ tbl2, err := cat.LoadTable(context.Background(), ident)
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl2)
+
+ t.True(tbl.Equals(*tbl2))
+}
+
type snapshotSummaryMatcher struct{}
func (m *snapshotSummaryMatcher) Matches(expected *table.Summary, actual
*table.Summary) bool {
diff --git a/table/testdata/TableMetadataV2Valid.zstd.metadata.json
b/table/testdata/TableMetadataV2Valid.zstd.metadata.json
new file mode 100644
index 00000000..4709076c
Binary files /dev/null and
b/table/testdata/TableMetadataV2Valid.zstd.metadata.json differ
diff --git a/table/testdata/TableMetadataV2ValidMinimal.json
b/table/testdata/TableMetadataV2ValidMinimal.json
new file mode 100644
index 00000000..9d82d000
--- /dev/null
+++ b/table/testdata/TableMetadataV2ValidMinimal.json
@@ -0,0 +1 @@
+{"current-schema-id":0,"current-snapshot-id":-1,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":3,"last-partition-id":999,"last-sequence-number":0,"last-updated-ms":1602638573590,"location":"s3://bucket/test/location","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"x","required":true,"type":"long"},{"doc":"comment","id":2,"name":"y","required":true,"type":"long"},{"id":3,"nam
[...]
\ No newline at end of file