This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 0df56cf6 feat(table): support writing gzipped metadata json (#612)
0df56cf6 is described below

commit 0df56cf6790ae6c86b7feeb8a7d5b75c4ad0ab23
Author: Alex <[email protected]>
AuthorDate: Tue Oct 28 10:11:09 2025 -0600

    feat(table): support writing gzipped metadata json (#612)
    
    Followup to #610 ; add write support for gzipped table metadata.
    
    Behavior is optional and enabled when table property
    `write.metadata.compression-codec` is set to `gzip`
---
 catalog/glue/glue.go      |  3 ++-
 catalog/internal/utils.go | 47 ++++++++++++++++++++++------------
 catalog/sql/sql.go        |  3 ++-
 table/locations.go        | 13 +++++++++-
 table/properties.go       |  9 +++++++
 table/table_test.go       | 65 +++++++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 121 insertions(+), 19 deletions(-)

diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go
index 4b12608d..bd95d3eb 100644
--- a/catalog/glue/glue.go
+++ b/catalog/glue/glue.go
@@ -250,7 +250,8 @@ func (c *Catalog) CreateTable(ctx context.Context, 
identifier table.Identifier,
                return nil, errors.New("loaded filesystem IO does not support 
writing")
        }
 
-       if err := internal.WriteTableMetadata(staged.Metadata(), wfs, 
staged.MetadataLocation()); err != nil {
+       compression := 
staged.Table.Properties().Get(table.MetadataCompressionKey, 
table.MetadataCompressionDefault)
+       if err := internal.WriteTableMetadata(staged.Metadata(), wfs, 
staged.MetadataLocation(), compression); err != nil {
                return nil, err
        }
 
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index d899eb96..76aa378a 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -18,10 +18,12 @@
 package internal
 
 import (
+       "compress/gzip"
        "context"
        "encoding/json"
        "errors"
        "fmt"
+       "io"
        "maps"
        "net/url"
        "path"
@@ -31,40 +33,53 @@ import (
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
-       "github.com/apache/iceberg-go/io"
+       icebergio "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/google/uuid"
 )
 
-func GetMetadataLoc(location string, newVersion uint) string {
-       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
-               location, newVersion, uuid.New().String())
-}
-
-func WriteTableMetadata(metadata table.Metadata, fs io.WriteFileIO, loc 
string) error {
+func WriteTableMetadata(metadata table.Metadata, fs icebergio.WriteFileIO, loc 
string, compression string) error {
        out, err := fs.Create(loc)
        if err != nil {
                return err
        }
 
-       return errors.Join(
-               json.NewEncoder(out).Encode(metadata),
-               out.Close(),
-       )
+       var writer io.Writer = out
+       var compressWriter io.WriteCloser
+       switch compression {
+       case table.MetadataCompressionCodecNone:
+               // no compression
+       case table.MetadataCompressionCodecGzip:
+               compressWriter = gzip.NewWriter(out)
+               writer = compressWriter
+       default:
+               return fmt.Errorf("unsupported write metadata compression 
codec: %s", compression)
+       }
+
+       encodeErr := json.NewEncoder(writer).Encode(metadata)
+
+       var compressionCloseErr error
+       if compressWriter != nil {
+               compressionCloseErr = compressWriter.Close()
+       }
+
+       return errors.Join(encodeErr, compressionCloseErr, out.Close())
 }
 
 func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string, 
props iceberg.Properties) error {
-       fs, err := io.LoadFS(ctx, props, loc)
+       fs, err := icebergio.LoadFS(ctx, props, loc)
        if err != nil {
                return err
        }
 
-       wfs, ok := fs.(io.WriteFileIO)
+       wfs, ok := fs.(icebergio.WriteFileIO)
        if !ok {
                return errors.New("filesystem IO does not support writing")
        }
 
-       return WriteTableMetadata(metadata, wfs, loc)
+       compression := props.Get(table.MetadataCompressionKey, 
table.MetadataCompressionDefault)
+
+       return WriteTableMetadata(metadata, wfs, loc, compression)
 }
 
 func UpdateTableMetadata(base table.Metadata, updates []table.Update, 
metadataLoc string) (table.Metadata, error) {
@@ -109,7 +124,7 @@ func CreateStagedTable(ctx context.Context, catprops 
iceberg.Properties, nsprops
                        ident,
                        metadata,
                        metadataLoc,
-                       io.LoadFSFunc(ioProps, metadataLoc),
+                       icebergio.LoadFSFunc(ioProps, metadataLoc),
                        nil,
                ),
        }, nil
@@ -213,7 +228,7 @@ func UpdateAndStageTable(ctx context.Context, current 
*table.Table, ident table.
                        ident,
                        updated,
                        newLocation,
-                       io.LoadFSFunc(updated.Properties(), newLocation),
+                       icebergio.LoadFSFunc(updated.Properties(), newLocation),
                        cat,
                ),
        }, nil
diff --git a/catalog/sql/sql.go b/catalog/sql/sql.go
index 0057bf15..e47f84f3 100644
--- a/catalog/sql/sql.go
+++ b/catalog/sql/sql.go
@@ -299,7 +299,8 @@ func (c *Catalog) CreateTable(ctx context.Context, ident 
table.Identifier, sc *i
                return nil, errors.New("loaded filesystem IO does not support 
writing")
        }
 
-       if err := internal.WriteTableMetadata(staged.Metadata(), wfs, 
staged.MetadataLocation()); err != nil {
+       compression := 
staged.Table.Properties().Get(table.MetadataCompressionKey, 
table.MetadataCompressionDefault)
+       if err := internal.WriteTableMetadata(staged.Metadata(), wfs, 
staged.MetadataLocation(), compression); err != nil {
                return nil, err
        }
 
diff --git a/table/locations.go b/table/locations.go
index 4e47fcbd..1047b46c 100644
--- a/table/locations.go
+++ b/table/locations.go
@@ -63,7 +63,18 @@ func (slp *simpleLocationProvider) 
NewTableMetadataFileLocation(newVersion int)
                return "", err
        }
 
-       fname := fmt.Sprintf("%05d-%s.metadata.json", newVersion, newUUID)
+       compression := slp.tableProps.Get(MetadataCompressionKey, 
MetadataCompressionDefault)
+       var ext string
+       switch compression {
+       case MetadataCompressionCodecNone:
+               ext = ".metadata.json"
+       case MetadataCompressionCodecGzip:
+               ext = ".gz.metadata.json"
+       default:
+               return "", fmt.Errorf("unsupported write metadata compression 
codec: %s", compression)
+       }
+
+       fname := fmt.Sprintf("%05d-%s%s", newVersion, newUUID, ext)
 
        return slp.NewMetadataLocation(fname), nil
 }
diff --git a/table/properties.go b/table/properties.go
index 8ef3d026..cab74e83 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -73,6 +73,9 @@ const (
        MetadataPreviousVersionsMaxKey     = 
"write.metadata.previous-versions-max"
        MetadataPreviousVersionsMaxDefault = 100
 
+       MetadataCompressionKey     = "write.metadata.compression-codec"
+       MetadataCompressionDefault = "none"
+
        WriteTargetFileSizeBytesKey     = "write.target-file-size-bytes"
        WriteTargetFileSizeBytesDefault = 512 * 1024 * 1024 // 512 MB
 
@@ -110,3 +113,9 @@ var ReservedProperties = [9]string{
        PropertyDefaultPartitionSpec,
        PropertyDefaultSortOrder,
 }
+
+// Metadata compression codecs
+const (
+       MetadataCompressionCodecNone = "none"
+       MetadataCompressionCodecGzip = "gzip"
+)
diff --git a/table/table_test.go b/table/table_test.go
index 4e8afbd7..718f6956 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -21,7 +21,9 @@ import (
        "bytes"
        "compress/gzip"
        "context"
+       "encoding/json"
        "fmt"
+       "io"
        "io/fs"
        "log"
        "os"
@@ -1638,3 +1640,66 @@ func (t *TableTestSuite) TestRefresh() {
        t.True(originalSchema.Equals(tbl.Schema()))
        t.Equal(originalSpec, tbl.Spec())
 }
+
+func (t *TableTestSuite) TestMetadataCompressionRoundTrip() {
+       cat, err := catalog.Load(context.Background(), "default", 
iceberg.Properties{
+               "uri":          ":memory:",
+               "type":         "sql",
+               sql.DriverKey:  sqliteshim.ShimName,
+               sql.DialectKey: string(sql.SQLite),
+               "warehouse":    "file://" + t.T().TempDir(),
+       })
+       t.Require().NoError(err)
+
+       ident := table.Identifier{"test", "compression_table"}
+       t.Require().NoError(cat.CreateNamespace(context.Background(), 
catalog.NamespaceFromIdent(ident), nil))
+
+       // Test with gzip compression enabled
+       tbl, err := cat.CreateTable(context.Background(), ident, t.tbl.Schema(),
+               catalog.WithProperties(iceberg.Properties{
+                       table.MetadataCompressionKey: "gzip",
+               }))
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl)
+
+       // Verify the metadata location has the correct extension for gzipped 
files
+       metadataLoc := tbl.MetadataLocation()
+       t.Contains(metadataLoc, ".gz.metadata.json")
+
+       // Test that we can read the gzipped metadata
+       fs, err := tbl.FS(context.Background())
+       t.Require().NoError(err)
+
+       // Read the metadata file and verify it's gzipped
+       file, err := fs.Open(metadataLoc)
+       t.Require().NoError(err)
+       defer file.Close()
+
+       metadataBytes, err := io.ReadAll(file)
+       t.Require().NoError(err)
+
+       // Verify it's gzipped by trying to decompress it
+       gzReader, err := gzip.NewReader(bytes.NewReader(metadataBytes))
+       t.Require().NoError(err)
+       defer gzReader.Close()
+
+       decompressed, err := io.ReadAll(gzReader)
+       t.Require().NoError(err)
+
+       // Verify the decompressed content is valid JSON
+       var metadata map[string]interface{}
+       err = json.Unmarshal(decompressed, &metadata)
+       t.Require().NoError(err)
+
+       // Verify it contains expected Iceberg metadata fields
+       t.Contains(metadata, "format-version")
+       t.Contains(metadata, "table-uuid")
+       t.Contains(metadata, "location")
+
+       // Verify that we can load the table from the metadata location
+       tbl2, err := cat.LoadTable(context.Background(), ident)
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl2)
+
+       t.True(tbl.Equals(*tbl2))
+}

Reply via email to