This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 0df56cf6 feat(table): support writing gzipped metadata json (#612)
0df56cf6 is described below
commit 0df56cf6790ae6c86b7feeb8a7d5b75c4ad0ab23
Author: Alex <[email protected]>
AuthorDate: Tue Oct 28 10:11:09 2025 -0600
feat(table): support writing gzipped metadata json (#612)
Followup to #610 ; add write support for gzipped table metadata.
Behavior is optional and enabled when table property
`write.metadata.compression-codec` is set to `gzip`
---
catalog/glue/glue.go | 3 ++-
catalog/internal/utils.go | 47 ++++++++++++++++++++++------------
catalog/sql/sql.go | 3 ++-
table/locations.go | 13 +++++++++-
table/properties.go | 9 +++++++
table/table_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 121 insertions(+), 19 deletions(-)
diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go
index 4b12608d..bd95d3eb 100644
--- a/catalog/glue/glue.go
+++ b/catalog/glue/glue.go
@@ -250,7 +250,8 @@ func (c *Catalog) CreateTable(ctx context.Context,
identifier table.Identifier,
return nil, errors.New("loaded filesystem IO does not support
writing")
}
- if err := internal.WriteTableMetadata(staged.Metadata(), wfs,
staged.MetadataLocation()); err != nil {
+ compression :=
staged.Table.Properties().Get(table.MetadataCompressionKey,
table.MetadataCompressionDefault)
+ if err := internal.WriteTableMetadata(staged.Metadata(), wfs,
staged.MetadataLocation(), compression); err != nil {
return nil, err
}
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index d899eb96..76aa378a 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -18,10 +18,12 @@
package internal
import (
+ "compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
+ "io"
"maps"
"net/url"
"path"
@@ -31,40 +33,53 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
- "github.com/apache/iceberg-go/io"
+ icebergio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/google/uuid"
)
-func GetMetadataLoc(location string, newVersion uint) string {
- return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
- location, newVersion, uuid.New().String())
-}
-
-func WriteTableMetadata(metadata table.Metadata, fs io.WriteFileIO, loc
string) error {
+func WriteTableMetadata(metadata table.Metadata, fs icebergio.WriteFileIO, loc
string, compression string) error {
out, err := fs.Create(loc)
if err != nil {
return err
}
- return errors.Join(
- json.NewEncoder(out).Encode(metadata),
- out.Close(),
- )
+ var writer io.Writer = out
+ var compressWriter io.WriteCloser
+ switch compression {
+ case table.MetadataCompressionCodecNone:
+ // no compression
+ case table.MetadataCompressionCodecGzip:
+ compressWriter = gzip.NewWriter(out)
+ writer = compressWriter
+ default:
+ return fmt.Errorf("unsupported write metadata compression
codec: %s", compression)
+ }
+
+ encodeErr := json.NewEncoder(writer).Encode(metadata)
+
+ var compressionCloseErr error
+ if compressWriter != nil {
+ compressionCloseErr = compressWriter.Close()
+ }
+
+ return errors.Join(encodeErr, compressionCloseErr, out.Close())
}
func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string,
props iceberg.Properties) error {
- fs, err := io.LoadFS(ctx, props, loc)
+ fs, err := icebergio.LoadFS(ctx, props, loc)
if err != nil {
return err
}
- wfs, ok := fs.(io.WriteFileIO)
+ wfs, ok := fs.(icebergio.WriteFileIO)
if !ok {
return errors.New("filesystem IO does not support writing")
}
- return WriteTableMetadata(metadata, wfs, loc)
+ compression := props.Get(table.MetadataCompressionKey,
table.MetadataCompressionDefault)
+
+ return WriteTableMetadata(metadata, wfs, loc, compression)
}
func UpdateTableMetadata(base table.Metadata, updates []table.Update,
metadataLoc string) (table.Metadata, error) {
@@ -109,7 +124,7 @@ func CreateStagedTable(ctx context.Context, catprops
iceberg.Properties, nsprops
ident,
metadata,
metadataLoc,
- io.LoadFSFunc(ioProps, metadataLoc),
+ icebergio.LoadFSFunc(ioProps, metadataLoc),
nil,
),
}, nil
@@ -213,7 +228,7 @@ func UpdateAndStageTable(ctx context.Context, current
*table.Table, ident table.
ident,
updated,
newLocation,
- io.LoadFSFunc(updated.Properties(), newLocation),
+ icebergio.LoadFSFunc(updated.Properties(), newLocation),
cat,
),
}, nil
diff --git a/catalog/sql/sql.go b/catalog/sql/sql.go
index 0057bf15..e47f84f3 100644
--- a/catalog/sql/sql.go
+++ b/catalog/sql/sql.go
@@ -299,7 +299,8 @@ func (c *Catalog) CreateTable(ctx context.Context, ident
table.Identifier, sc *i
return nil, errors.New("loaded filesystem IO does not support
writing")
}
- if err := internal.WriteTableMetadata(staged.Metadata(), wfs,
staged.MetadataLocation()); err != nil {
+ compression :=
staged.Table.Properties().Get(table.MetadataCompressionKey,
table.MetadataCompressionDefault)
+ if err := internal.WriteTableMetadata(staged.Metadata(), wfs,
staged.MetadataLocation(), compression); err != nil {
return nil, err
}
diff --git a/table/locations.go b/table/locations.go
index 4e47fcbd..1047b46c 100644
--- a/table/locations.go
+++ b/table/locations.go
@@ -63,7 +63,18 @@ func (slp *simpleLocationProvider)
NewTableMetadataFileLocation(newVersion int)
return "", err
}
- fname := fmt.Sprintf("%05d-%s.metadata.json", newVersion, newUUID)
+ compression := slp.tableProps.Get(MetadataCompressionKey,
MetadataCompressionDefault)
+ var ext string
+ switch compression {
+ case MetadataCompressionCodecNone:
+ ext = ".metadata.json"
+ case MetadataCompressionCodecGzip:
+ ext = ".gz.metadata.json"
+ default:
+ return "", fmt.Errorf("unsupported write metadata compression
codec: %s", compression)
+ }
+
+ fname := fmt.Sprintf("%05d-%s%s", newVersion, newUUID, ext)
return slp.NewMetadataLocation(fname), nil
}
diff --git a/table/properties.go b/table/properties.go
index 8ef3d026..cab74e83 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -73,6 +73,9 @@ const (
MetadataPreviousVersionsMaxKey =
"write.metadata.previous-versions-max"
MetadataPreviousVersionsMaxDefault = 100
+ MetadataCompressionKey = "write.metadata.compression-codec"
+ MetadataCompressionDefault = "none"
+
WriteTargetFileSizeBytesKey = "write.target-file-size-bytes"
WriteTargetFileSizeBytesDefault = 512 * 1024 * 1024 // 512 MB
@@ -110,3 +113,9 @@ var ReservedProperties = [9]string{
PropertyDefaultPartitionSpec,
PropertyDefaultSortOrder,
}
+
+// Metadata compression codecs
+const (
+ MetadataCompressionCodecNone = "none"
+ MetadataCompressionCodecGzip = "gzip"
+)
diff --git a/table/table_test.go b/table/table_test.go
index 4e8afbd7..718f6956 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -21,7 +21,9 @@ import (
"bytes"
"compress/gzip"
"context"
+ "encoding/json"
"fmt"
+ "io"
"io/fs"
"log"
"os"
@@ -1638,3 +1640,66 @@ func (t *TableTestSuite) TestRefresh() {
t.True(originalSchema.Equals(tbl.Schema()))
t.Equal(originalSpec, tbl.Spec())
}
+
+func (t *TableTestSuite) TestMetadataCompressionRoundTrip() {
+ cat, err := catalog.Load(context.Background(), "default",
iceberg.Properties{
+ "uri": ":memory:",
+ "type": "sql",
+ sql.DriverKey: sqliteshim.ShimName,
+ sql.DialectKey: string(sql.SQLite),
+ "warehouse": "file://" + t.T().TempDir(),
+ })
+ t.Require().NoError(err)
+
+ ident := table.Identifier{"test", "compression_table"}
+ t.Require().NoError(cat.CreateNamespace(context.Background(),
catalog.NamespaceFromIdent(ident), nil))
+
+ // Test with gzip compression enabled
+ tbl, err := cat.CreateTable(context.Background(), ident, t.tbl.Schema(),
+ catalog.WithProperties(iceberg.Properties{
+ table.MetadataCompressionKey: "gzip",
+ }))
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl)
+
+ // Verify the metadata location has the correct extension for gzipped
files
+ metadataLoc := tbl.MetadataLocation()
+ t.Contains(metadataLoc, ".gz.metadata.json")
+
+ // Test that we can read the gzipped metadata
+ fs, err := tbl.FS(context.Background())
+ t.Require().NoError(err)
+
+ // Read the metadata file and verify it's gzipped
+ file, err := fs.Open(metadataLoc)
+ t.Require().NoError(err)
+ defer file.Close()
+
+ metadataBytes, err := io.ReadAll(file)
+ t.Require().NoError(err)
+
+ // Verify it's gzipped by trying to decompress it
+ gzReader, err := gzip.NewReader(bytes.NewReader(metadataBytes))
+ t.Require().NoError(err)
+ defer gzReader.Close()
+
+ decompressed, err := io.ReadAll(gzReader)
+ t.Require().NoError(err)
+
+ // Verify the decompressed content is valid JSON
+ var metadata map[string]interface{}
+ err = json.Unmarshal(decompressed, &metadata)
+ t.Require().NoError(err)
+
+ // Verify it contains expected Iceberg metadata fields
+ t.Contains(metadata, "format-version")
+ t.Contains(metadata, "table-uuid")
+ t.Contains(metadata, "location")
+
+ // Verify that we can load the table from the metadata location
+ tbl2, err := cat.LoadTable(context.Background(), ident)
+ t.Require().NoError(err)
+ t.Require().NotNil(tbl2)
+
+ t.True(tbl.Equals(*tbl2))
+}