This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new c62af3c6 feat(table): support zstd metadata compression codec (#1020)
c62af3c6 is described below

commit c62af3c624bcf7afd9f75123fbc6ce60362f8da8
Author: Tanmay Rauth <[email protected]>
AuthorDate: Thu May 7 12:26:32 2026 -0700

    feat(table): support zstd metadata compression codec (#1020)
    
    Adds MetadataCompressionCodecZstd alongside gzip/none and wires it
    through both read (decompress .metadata.json.zstd) and write
    (write.metadata.compression-codec=zstd) paths. Uses
    klauspost/compress/zstd already present as a transitive dependency.
    
    Closes #1005
---
 catalog/internal/utils.go                          |  20 ++-
 go.mod                                             |   4 +-
 table/locations.go                                 |   2 +
 table/metadata_internal_test.go                    |  23 +++
 table/properties.go                                |   1 +
 table/table.go                                     |  46 ++++--
 table/table_test.go                                | 176 +++++++++++++++++++++
 .../TableMetadataV2Valid.zstd.metadata.json        | Bin 0 -> 374 bytes
 table/testdata/TableMetadataV2ValidMinimal.json    |   1 +
 9 files changed, 255 insertions(+), 18 deletions(-)

diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index 6e1f2b39..87383bf1 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -37,11 +37,12 @@ import (
        icebergio "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/google/uuid"
+       "github.com/klauspost/compress/zstd"
 )
 
 func WriteTableMetadata(metadata table.Metadata, fs icebergio.WriteFileIO, loc 
string, compression string) (err error) {
        switch compression {
-       case table.MetadataCompressionCodecNone, 
table.MetadataCompressionCodecGzip:
+       case table.MetadataCompressionCodecNone, 
table.MetadataCompressionCodecGzip, table.MetadataCompressionCodecZstd:
                // supported codecs
        default:
                return fmt.Errorf("unsupported write metadata compression 
codec: %s", compression)
@@ -54,11 +55,18 @@ func WriteTableMetadata(metadata table.Metadata, fs 
icebergio.WriteFileIO, loc s
        defer internal.CheckedClose(out, &err)
 
        var writer io.Writer = out
-       var compressWriter io.WriteCloser
-       if compression == table.MetadataCompressionCodecGzip {
-               compressWriter = gzip.NewWriter(out)
-               writer = compressWriter
-               defer internal.CheckedClose(compressWriter, &err)
+       switch compression {
+       case table.MetadataCompressionCodecGzip:
+               gzw := gzip.NewWriter(out)
+               writer = gzw
+               defer internal.CheckedClose(gzw, &err)
+       case table.MetadataCompressionCodecZstd:
+               enc, zErr := zstd.NewWriter(out)
+               if zErr != nil {
+                       return zErr
+               }
+               writer = enc
+               defer internal.CheckedClose(enc, &err)
        }
 
        err = json.NewEncoder(writer).Encode(metadata)
diff --git a/go.mod b/go.mod
index 96115538..ccbcc823 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
        cloud.google.com/go/storage v1.62.1
        github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
        github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
+       github.com/alexflint/go-arg v1.6.1
        github.com/apache/arrow-go/v18 v18.6.0
        github.com/aws/aws-sdk-go-v2 v1.41.6
        github.com/aws/aws-sdk-go-v2/config v1.32.16
@@ -34,9 +35,9 @@ require (
        github.com/beltran/gohive v1.8.1
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
        github.com/docker/docker v28.5.2+incompatible
-       github.com/alexflint/go-arg v1.6.1
        github.com/google/go-cmp v0.7.0
        github.com/google/uuid v1.6.0
+       github.com/klauspost/compress v1.18.5
        github.com/pterm/pterm v0.12.83
        github.com/stretchr/testify v1.11.1
        github.com/substrait-io/substrait-go/v8 v8.1.0
@@ -169,7 +170,6 @@ require (
        github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf // 
indirect
        github.com/jinzhu/inflection v1.0.0 // indirect
        github.com/jonboulle/clockwork v0.5.0 // indirect
-       github.com/klauspost/compress v1.18.5 // indirect
        github.com/klauspost/cpuid/v2 v2.3.0 // indirect
        github.com/kylelemons/godebug v1.1.0 // indirect
        github.com/lithammer/fuzzysearch v1.1.8 // indirect
diff --git a/table/locations.go b/table/locations.go
index 1047b46c..bdd0b0f1 100644
--- a/table/locations.go
+++ b/table/locations.go
@@ -70,6 +70,8 @@ func (slp *simpleLocationProvider) 
NewTableMetadataFileLocation(newVersion int)
                ext = ".metadata.json"
        case MetadataCompressionCodecGzip:
                ext = ".gz.metadata.json"
+       case MetadataCompressionCodecZstd:
+               ext = ".zstd.metadata.json"
        default:
                return "", fmt.Errorf("unsupported write metadata compression 
codec: %s", compression)
        }
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index 174ed330..43e50f94 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -18,7 +18,9 @@
 package table
 
 import (
+       "bytes"
        "encoding/json"
+       "io"
        "os"
        "path"
        "slices"
@@ -29,6 +31,7 @@ import (
        "github.com/davecgh/go-spew/spew"
        "github.com/google/go-cmp/cmp"
        "github.com/google/uuid"
+       "github.com/klauspost/compress/zstd"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
@@ -1831,3 +1834,23 @@ func TestV3PartitionStatisticsRoundTrip(t *testing.T) {
 
        assert.JSONEq(t, string(raw), string(serialized))
 }
+
+func TestZstdGoldenFixture(t *testing.T) {
+       compressed, err := os.ReadFile(path.Join("testdata", 
"TableMetadataV2Valid.zstd.metadata.json"))
+       require.NoError(t, err)
+
+       dec, err := zstd.NewReader(bytes.NewReader(compressed))
+       require.NoError(t, err)
+       defer dec.Close()
+
+       data, err := io.ReadAll(dec)
+       require.NoError(t, err)
+
+       meta, err := ParseMetadataBytes(data)
+       require.NoError(t, err)
+
+       expected, err := 
getTestTableMetadata("TableMetadataV2ValidMinimal.json")
+       require.NoError(t, err)
+
+       assert.True(t, expected.Equals(meta))
+}
diff --git a/table/properties.go b/table/properties.go
index 6c5d04c1..c3878f36 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -157,6 +157,7 @@ var ReservedProperties = [9]string{
 const (
        MetadataCompressionCodecNone = "none"
        MetadataCompressionCodecGzip = "gzip"
+       MetadataCompressionCodecZstd = "zstd"
 )
 
 // Write modes
diff --git a/table/table.go b/table/table.go
index c4c2a5de..3dd78057 100644
--- a/table/table.go
+++ b/table/table.go
@@ -38,6 +38,7 @@ import (
        "github.com/apache/iceberg-go/internal"
        icebergio "github.com/apache/iceberg-go/io"
        tblutils "github.com/apache/iceberg-go/table/internal"
+       "github.com/klauspost/compress/zstd"
        "golang.org/x/sync/errgroup"
 )
 
@@ -864,13 +865,14 @@ func NewFromLocation(
                        return nil, err
                }
 
-               if isGzippedMetadataJson(metalocation) {
-                       gz, err := gzip.NewReader(bytes.NewReader(data))
+               if codec := metadataCompressionCodec(metalocation); codec != "" 
{
+                       rc, err := newDecompressor(bytes.NewReader(data), codec)
                        if err != nil {
                                return nil, err
                        }
-                       defer gz.Close()
-                       data, err = io.ReadAll(gz)
+                       defer rc.Close()
+
+                       data, err = io.ReadAll(rc)
                        if err != nil {
                                return nil, err
                        }
@@ -887,13 +889,14 @@ func NewFromLocation(
                defer internal.CheckedClose(f, &err)
 
                var r io.Reader = f
-               if isGzippedMetadataJson(metalocation) {
-                       gz, err := gzip.NewReader(f)
+               if codec := metadataCompressionCodec(metalocation); codec != "" 
{
+                       rc, err := newDecompressor(f, codec)
                        if err != nil {
                                return nil, err
                        }
-                       defer gz.Close()
-                       r = gz
+                       defer rc.Close()
+
+                       r = rc
                }
 
                if meta, err = ParseMetadata(r); err != nil {
@@ -904,6 +907,29 @@ func NewFromLocation(
        return New(ident, meta, metalocation, fsysF, cat), nil
 }
 
-func isGzippedMetadataJson(location string) bool {
-       return strings.HasSuffix(location, ".gz.metadata.json") || 
strings.HasSuffix(location, "metadata.json.gz")
+func metadataCompressionCodec(location string) string {
+       switch {
+       case strings.HasSuffix(location, ".gz.metadata.json") || 
strings.HasSuffix(location, "metadata.json.gz"):
+               return MetadataCompressionCodecGzip
+       case strings.HasSuffix(location, ".zstd.metadata.json") || 
strings.HasSuffix(location, "metadata.json.zstd"):
+               return MetadataCompressionCodecZstd
+       default:
+               return ""
+       }
+}
+
+func newDecompressor(r io.Reader, codec string) (io.ReadCloser, error) {
+       switch codec {
+       case MetadataCompressionCodecGzip:
+               return gzip.NewReader(r)
+       case MetadataCompressionCodecZstd:
+               dec, err := zstd.NewReader(r)
+               if err != nil {
+                       return nil, err
+               }
+
+               return dec.IOReadCloser(), nil
+       default:
+               return nil, fmt.Errorf("unsupported metadata decompression 
codec: %s", codec)
+       }
 }
diff --git a/table/table_test.go b/table/table_test.go
index f10c8019..17c5bd16 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -50,6 +50,7 @@ import (
        iceio "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/google/uuid"
+       "github.com/klauspost/compress/zstd"
        "github.com/pterm/pterm"
        "github.com/stretchr/testify/require"
        "github.com/stretchr/testify/suite"
@@ -171,6 +172,126 @@ func (t *TableTestSuite) 
TestNewTableFromReadFileGzipped() {
        t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
 }
 
+func (t *TableTestSuite) TestNewTableFromReadFileZstd() {
+       var b bytes.Buffer
+       enc, err := zstd.NewWriter(&b)
+       t.Require().NoError(err)
+
+       _, err = enc.Write([]byte(table.ExampleTableMetadataV2))
+       t.Require().NoError(err)
+       t.Require().NoError(enc.Close())
+
+       var mockfsReadFile internal.MockFSReadFile
+       mockfsReadFile.Test(t.T())
+       mockfsReadFile.On("ReadFile", 
"s3://bucket/test/location/uuid.zstd.metadata.json").
+               Return(b.Bytes(), nil)
+       defer mockfsReadFile.AssertExpectations(t.T())
+
+       tbl2, err := table.NewFromLocation(
+               t.T().Context(),
+               []string{"foo"},
+               "s3://bucket/test/location/uuid.zstd.metadata.json",
+               func(ctx context.Context) (iceio.IO, error) {
+                       return &mockfsReadFile, nil
+               },
+               nil,
+       )
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl2)
+
+       t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
+func (t *TableTestSuite) TestNewTableFromReadFileZstdAlternateSuffix() {
+       var b bytes.Buffer
+       enc, err := zstd.NewWriter(&b)
+       t.Require().NoError(err)
+
+       _, err = enc.Write([]byte(table.ExampleTableMetadataV2))
+       t.Require().NoError(err)
+       t.Require().NoError(enc.Close())
+
+       var mockfsReadFile internal.MockFSReadFile
+       mockfsReadFile.Test(t.T())
+       mockfsReadFile.On("ReadFile", 
"s3://bucket/test/location/uuid.metadata.json.zstd").
+               Return(b.Bytes(), nil)
+       defer mockfsReadFile.AssertExpectations(t.T())
+
+       tbl2, err := table.NewFromLocation(
+               t.T().Context(),
+               []string{"foo"},
+               "s3://bucket/test/location/uuid.metadata.json.zstd",
+               func(ctx context.Context) (iceio.IO, error) {
+                       return &mockfsReadFile, nil
+               },
+               nil,
+       )
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl2)
+
+       t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
+func (t *TableTestSuite) TestNewTableFromOpenZstd() {
+       var b bytes.Buffer
+       enc, err := zstd.NewWriter(&b)
+       t.Require().NoError(err)
+
+       _, err = enc.Write([]byte(table.ExampleTableMetadataV2))
+       t.Require().NoError(err)
+       t.Require().NoError(enc.Close())
+
+       var mockfs internal.MockFS
+       mockfs.Test(t.T())
+       mockfs.On("Open", "s3://bucket/test/location/uuid.zstd.metadata.json").
+               Return(&internal.MockFile{Contents: 
bytes.NewReader(b.Bytes())}, nil)
+       defer mockfs.AssertExpectations(t.T())
+
+       tbl2, err := table.NewFromLocation(
+               t.T().Context(),
+               []string{"foo"},
+               "s3://bucket/test/location/uuid.zstd.metadata.json",
+               func(ctx context.Context) (iceio.IO, error) {
+                       return &mockfs, nil
+               },
+               nil,
+       )
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl2)
+
+       t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
+func (t *TableTestSuite) TestNewTableFromOpenZstdAlternateSuffix() {
+       var b bytes.Buffer
+       enc, err := zstd.NewWriter(&b)
+       t.Require().NoError(err)
+
+       _, err = enc.Write([]byte(table.ExampleTableMetadataV2))
+       t.Require().NoError(err)
+       t.Require().NoError(enc.Close())
+
+       var mockfs internal.MockFS
+       mockfs.Test(t.T())
+       mockfs.On("Open", "s3://bucket/test/location/uuid.metadata.json.zstd").
+               Return(&internal.MockFile{Contents: 
bytes.NewReader(b.Bytes())}, nil)
+       defer mockfs.AssertExpectations(t.T())
+
+       tbl2, err := table.NewFromLocation(
+               t.T().Context(),
+               []string{"foo"},
+               "s3://bucket/test/location/uuid.metadata.json.zstd",
+               func(ctx context.Context) (iceio.IO, error) {
+                       return &mockfs, nil
+               },
+               nil,
+       )
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl2)
+
+       t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
 func (t *TableTestSuite) TestSchema() {
        t.True(t.tbl.Schema().Equals(iceberg.NewSchemaWithIdentifiers(1, 
[]int{1, 2},
                iceberg.NestedField{ID: 1, Name: "x", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
@@ -3063,6 +3184,61 @@ func (t *TableTestSuite) 
TestMetadataCompressionRoundTrip() {
        t.True(tbl.Equals(*tbl2))
 }
 
+func (t *TableTestSuite) TestMetadataCompressionRoundTripZstd() {
+       cat, err := catalog.Load(context.Background(), "default", 
iceberg.Properties{
+               "uri":          ":memory:",
+               "type":         "sql",
+               sql.DriverKey:  sqliteshim.ShimName,
+               sql.DialectKey: string(sql.SQLite),
+               "warehouse":    "file://" + t.T().TempDir(),
+       })
+       t.Require().NoError(err)
+
+       ident := table.Identifier{"test", "zstd_compression_table"}
+       t.Require().NoError(cat.CreateNamespace(context.Background(), 
catalog.NamespaceFromIdent(ident), nil))
+
+       tbl, err := cat.CreateTable(context.Background(), ident, t.tbl.Schema(),
+               catalog.WithProperties(iceberg.Properties{
+                       table.MetadataCompressionKey: "zstd",
+               }))
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl)
+
+       metadataLoc := tbl.MetadataLocation()
+       t.Contains(metadataLoc, ".zstd.metadata.json")
+
+       fs, err := tbl.FS(context.Background())
+       t.Require().NoError(err)
+
+       file, err := fs.Open(metadataLoc)
+       t.Require().NoError(err)
+       defer file.Close()
+
+       metadataBytes, err := io.ReadAll(file)
+       t.Require().NoError(err)
+
+       dec, err := zstd.NewReader(bytes.NewReader(metadataBytes))
+       t.Require().NoError(err)
+       defer dec.Close()
+
+       decompressed, err := io.ReadAll(dec)
+       t.Require().NoError(err)
+
+       var metadata map[string]any
+       err = json.Unmarshal(decompressed, &metadata)
+       t.Require().NoError(err)
+
+       t.Contains(metadata, "format-version")
+       t.Contains(metadata, "table-uuid")
+       t.Contains(metadata, "location")
+
+       tbl2, err := cat.LoadTable(context.Background(), ident)
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl2)
+
+       t.True(tbl.Equals(*tbl2))
+}
+
 type snapshotSummaryMatcher struct{}
 
 func (m *snapshotSummaryMatcher) Matches(expected *table.Summary, actual 
*table.Summary) bool {
diff --git a/table/testdata/TableMetadataV2Valid.zstd.metadata.json 
b/table/testdata/TableMetadataV2Valid.zstd.metadata.json
new file mode 100644
index 00000000..4709076c
Binary files /dev/null and 
b/table/testdata/TableMetadataV2Valid.zstd.metadata.json differ
diff --git a/table/testdata/TableMetadataV2ValidMinimal.json 
b/table/testdata/TableMetadataV2ValidMinimal.json
new file mode 100644
index 00000000..9d82d000
--- /dev/null
+++ b/table/testdata/TableMetadataV2ValidMinimal.json
@@ -0,0 +1 @@
+{"current-schema-id":0,"current-snapshot-id":-1,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":3,"last-partition-id":999,"last-sequence-number":0,"last-updated-ms":1602638573590,"location":"s3://bucket/test/location","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"x","required":true,"type":"long"},{"doc":"comment","id":2,"name":"y","required":true,"type":"long"},{"id":3,"nam
 [...]
\ No newline at end of file

Reply via email to