This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 85896524 fix(table): support gzipped metadata json (#610)
85896524 is described below

commit 85896524f4fcc6d539e05fa96482e5d8f7eda351
Author: Alex <[email protected]>
AuthorDate: Fri Oct 24 14:15:09 2025 -0600

    fix(table): support gzipped metadata json (#610)
    
    This PR adds support for gzipped metadata json files.
    
    - iceberg spec reference:
    
https://iceberg.apache.org/spec/#naming-for-gzip-compressed-metadata-json-files
    - pyiceberg implementation:
    
https://github.com/apache/iceberg-python/blob/7d5c58d5b2b2ef914cf0cf8577a97b01221afe3a/pyiceberg/serializers.py#L35
    
    While the spec (linked above) implies `.gz.metadata.json` is the
    standard, it mentions that the java implementation also supports reading
    `metadata.json.gz`. As written, this PR supports both, but I will defer
    to your guidance. FWIW, pyiceberg only supports `.gz.metadata.json`.
---
 table/table.go      | 66 ++++++++++++++++++++++++++++++++++++++---------------
 table/table_test.go | 35 ++++++++++++++++++++++++++++
 2 files changed, 83 insertions(+), 18 deletions(-)

diff --git a/table/table.go b/table/table.go
index bd2ab8cb..8abf6fb8 100644
--- a/table/table.go
+++ b/table/table.go
@@ -18,22 +18,26 @@
 package table
 
 import (
+       "bytes"
+       "compress/gzip"
        "context"
+       "io"
        "iter"
        "log"
        "runtime"
        "slices"
+       "strings"
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/internal"
-       "github.com/apache/iceberg-go/io"
+       icebergio "github.com/apache/iceberg-go/io"
        tblutils "github.com/apache/iceberg-go/table/internal"
        "golang.org/x/sync/errgroup"
 )
 
-type FSysF func(ctx context.Context) (io.IO, error)
+type FSysF func(ctx context.Context) (icebergio.IO, error)
 
 type Identifier = []string
 
@@ -56,19 +60,19 @@ func (t Table) Equals(other Table) bool {
                t.metadata.Equals(other.metadata)
 }
 
-func (t Table) Identifier() Identifier                { return t.identifier }
-func (t Table) Metadata() Metadata                    { return t.metadata }
-func (t Table) MetadataLocation() string              { return 
t.metadataLocation }
-func (t Table) FS(ctx context.Context) (io.IO, error) { return t.fsF(ctx) }
-func (t Table) Schema() *iceberg.Schema               { return 
t.metadata.CurrentSchema() }
-func (t Table) Spec() iceberg.PartitionSpec           { return 
t.metadata.PartitionSpec() }
-func (t Table) SortOrder() SortOrder                  { return 
t.metadata.SortOrder() }
-func (t Table) Properties() iceberg.Properties        { return 
t.metadata.Properties() }
-func (t Table) NameMapping() iceberg.NameMapping      { return 
t.metadata.NameMapping() }
-func (t Table) Location() string                      { return 
t.metadata.Location() }
-func (t Table) CurrentSnapshot() *Snapshot            { return 
t.metadata.CurrentSnapshot() }
-func (t Table) SnapshotByID(id int64) *Snapshot       { return 
t.metadata.SnapshotByID(id) }
-func (t Table) SnapshotByName(name string) *Snapshot  { return 
t.metadata.SnapshotByName(name) }
+func (t Table) Identifier() Identifier                       { return 
t.identifier }
+func (t Table) Metadata() Metadata                           { return 
t.metadata }
+func (t Table) MetadataLocation() string                     { return 
t.metadataLocation }
+func (t Table) FS(ctx context.Context) (icebergio.IO, error) { return 
t.fsF(ctx) }
+func (t Table) Schema() *iceberg.Schema                      { return 
t.metadata.CurrentSchema() }
+func (t Table) Spec() iceberg.PartitionSpec                  { return 
t.metadata.PartitionSpec() }
+func (t Table) SortOrder() SortOrder                         { return 
t.metadata.SortOrder() }
+func (t Table) Properties() iceberg.Properties               { return 
t.metadata.Properties() }
+func (t Table) NameMapping() iceberg.NameMapping             { return 
t.metadata.NameMapping() }
+func (t Table) Location() string                             { return 
t.metadata.Location() }
+func (t Table) CurrentSnapshot() *Snapshot                   { return 
t.metadata.CurrentSnapshot() }
+func (t Table) SnapshotByID(id int64) *Snapshot              { return 
t.metadata.SnapshotByID(id) }
+func (t Table) SnapshotByName(name string) *Snapshot         { return 
t.metadata.SnapshotByName(name) }
 func (t Table) Schemas() map[int]*iceberg.Schema {
        m := make(map[int]*iceberg.Schema)
        for _, s := range t.metadata.Schemas() {
@@ -256,7 +260,7 @@ func getFiles(it iter.Seq[MetadataLogEntry]) 
iter.Seq[string] {
        }
 }
 
-func deleteOldMetadata(fs io.IO, baseMeta, newMeta Metadata) {
+func deleteOldMetadata(fs icebergio.IO, baseMeta, newMeta Metadata) {
        deleteAfterCommit := 
newMeta.Properties().GetBool(MetadataDeleteAfterCommitEnabledKey,
                MetadataDeleteAfterCommitEnabledDefault)
 
@@ -397,12 +401,24 @@ func NewFromLocation(
        if err != nil {
                return nil, err
        }
-       if rf, ok := fsys.(io.ReadFileIO); ok {
+       if rf, ok := fsys.(icebergio.ReadFileIO); ok {
                data, err := rf.ReadFile(metalocation)
                if err != nil {
                        return nil, err
                }
 
+               if isGzippedMetadataJson(metalocation) {
+                       gz, err := gzip.NewReader(bytes.NewReader(data))
+                       if err != nil {
+                               return nil, err
+                       }
+                       defer gz.Close()
+                       data, err = io.ReadAll(gz)
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+
                if meta, err = ParseMetadataBytes(data); err != nil {
                        return nil, err
                }
@@ -413,10 +429,24 @@ func NewFromLocation(
                }
                defer internal.CheckedClose(f, &err)
 
-               if meta, err = ParseMetadata(f); err != nil {
+               var r io.Reader = f
+               if isGzippedMetadataJson(metalocation) {
+                       gz, err := gzip.NewReader(f)
+                       if err != nil {
+                               return nil, err
+                       }
+                       defer gz.Close()
+                       r = gz
+               }
+
+               if meta, err = ParseMetadata(r); err != nil {
                        return nil, err
                }
        }
 
        return New(ident, meta, metalocation, fsysF, cat), nil
 }
+
+func isGzippedMetadataJson(location string) bool {
+       return strings.HasSuffix(location, ".gz.metadata.json") || 
strings.HasSuffix(location, "metadata.json.gz")
+}
diff --git a/table/table_test.go b/table/table_test.go
index 7f7abc02..4e8afbd7 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -19,6 +19,7 @@ package table_test
 
 import (
        "bytes"
+       "compress/gzip"
        "context"
        "fmt"
        "io/fs"
@@ -115,6 +116,40 @@ func (t *TableTestSuite) TestNewTableFromReadFile() {
        t.True(t.tbl.Equals(*tbl2))
 }
 
+func (t *TableTestSuite) TestNewTableFromReadFileGzipped() {
+       var b bytes.Buffer
+       gzWriter := gzip.NewWriter(&b)
+
+       _, err := gzWriter.Write([]byte(table.ExampleTableMetadataV2))
+       if err != nil {
+               log.Fatalf("Error writing to gzip writer: %v", err)
+       }
+       err = gzWriter.Close()
+       if err != nil {
+               log.Fatalf("Error closing gzip writer: %v", err)
+       }
+
+       var mockfsReadFile internal.MockFSReadFile
+       mockfsReadFile.Test(t.T())
+       mockfsReadFile.On("ReadFile", 
"s3://bucket/test/location/uuid.gz.metadata.json").
+               Return(b.Bytes(), nil)
+       defer mockfsReadFile.AssertExpectations(t.T())
+
+       tbl2, err := table.NewFromLocation(
+               t.T().Context(),
+               []string{"foo"},
+               "s3://bucket/test/location/uuid.gz.metadata.json",
+               func(ctx context.Context) (iceio.IO, error) {
+                       return &mockfsReadFile, nil
+               },
+               nil,
+       )
+       t.Require().NoError(err)
+       t.Require().NotNil(tbl2)
+
+       t.True(t.tbl.Metadata().Equals(tbl2.Metadata()))
+}
+
 func (t *TableTestSuite) TestSchema() {
        t.True(t.tbl.Schema().Equals(iceberg.NewSchemaWithIdentifiers(1, 
[]int{1, 2},
                iceberg.NestedField{ID: 1, Name: "x", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},

Reply via email to