This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new b168e015 feat(catalog,io): refresh vended credentials (#795)
b168e015 is described below

commit b168e015f81456dd81ee7cc9f60991101d893da6
Author: Tyler Rockwood <[email protected]>
AuthorDate: Wed Mar 25 09:52:51 2026 -0500

    feat(catalog,io): refresh vended credentials (#795)
    
    Currently credential lifetimes are tied to the table instead, only load
    table refreshes the credentials. This commit caches and refreshes the
    credentials based on the response included in the table load response
    and refreshes it dynamically.
    
    This does still have the effect that holding on to a table IO for a long
    time does not refresh the table, but this makes steps in that direction.
    
    Fixes: https://github.com/apache/iceberg-go/issues/792
---
 catalog/rest/rest.go              |  72 +++++--
 catalog/rest/vended_creds.go      | 149 +++++++++++++++
 catalog/rest/vended_creds_test.go | 386 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 594 insertions(+), 13 deletions(-)

diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index dd181969..104ff16f 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -46,6 +46,7 @@ import (
        "github.com/aws/aws-sdk-go-v2/config"
        "golang.org/x/oauth2"
        "golang.org/x/oauth2/clientcredentials"
+       "golang.org/x/sync/semaphore"
 )
 
 var (
@@ -139,11 +140,21 @@ func (t *commitTableResponse) UnmarshalJSON(b []byte) 
(err error) {
        return err
 }
 
+type storageCredential struct {
+       Prefix string             `json:"prefix"`
+       Config iceberg.Properties `json:"config"`
+}
+
 type loadTableResponse struct {
-       MetadataLoc string             `json:"metadata-location"`
-       RawMetadata json.RawMessage    `json:"metadata"`
-       Config      iceberg.Properties `json:"config"`
-       Metadata    table.Metadata     `json:"-"`
+       MetadataLoc        string              `json:"metadata-location"`
+       RawMetadata        json.RawMessage     `json:"metadata"`
+       Config             iceberg.Properties  `json:"config"`
+       StorageCredentials []storageCredential `json:"storage-credentials"`
+       Metadata           table.Metadata      `json:"-"`
+}
+
+type loadCredentialsResponse struct {
+       StorageCredentials []storageCredential `json:"storage-credentials"`
 }
 
 func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) {
@@ -678,16 +689,47 @@ func checkValidNamespace(ident table.Identifier) error {
        return nil
 }
 
-func (r *Catalog) tableFromResponse(ctx context.Context, identifier []string, 
metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, 
error) {
+func (r *Catalog) tableFromResponse(_ context.Context, identifier []string, 
metadata table.Metadata, loc string, config iceberg.Properties, credsVended 
bool) (*table.Table, error) {
+       var fsF func(context.Context) (iceio.IO, error)
+       if credsVended {
+               refresher := &vendedCredentialRefresher{
+                       mu:         semaphore.NewWeighted(1),
+                       identifier: identifier,
+                       location:   loc,
+                       props:      config,
+                       fetchCreds: func(ctx context.Context, ident []string) 
(iceberg.Properties, error) {
+                               return r.fetchTableCreds(ctx, ident, loc)
+                       },
+               }
+               fsF = refresher.loadFS
+       } else {
+               fsF = iceio.LoadFSFunc(config, loc)
+       }
+
        return table.New(
                identifier,
                metadata,
                loc,
-               iceio.LoadFSFunc(config, loc),
+               fsF,
                r,
        ), nil
 }
 
+func (r *Catalog) fetchTableCreds(ctx context.Context, ident []string, 
location string) (iceberg.Properties, error) {
+       ns, tbl, err := splitIdentForPath(ident)
+       if err != nil {
+               return nil, err
+       }
+
+       ret, err := doGet[loadCredentialsResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl, "credentials"},
+               r.cl, map[int]error{http.StatusNotFound: 
catalog.ErrNoSuchTable})
+       if err != nil {
+               return nil, err
+       }
+
+       return resolveStorageCredentials(ret.StorageCredentials, location), nil
+}
+
 func (r *Catalog) ListTables(ctx context.Context, namespace table.Identifier) 
iter.Seq2[table.Identifier, error] {
        return func(yield func(table.Identifier, error) bool) {
                pageSize := r.getPageSize(ctx)
@@ -799,8 +841,10 @@ func (r *Catalog) CreateTable(ctx context.Context, 
identifier table.Identifier,
        config := maps.Clone(r.props)
        maps.Copy(config, ret.Metadata.Properties())
        maps.Copy(config, ret.Config)
+       credsVended := len(ret.StorageCredentials) > 0
+       maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials, 
ret.MetadataLoc))
 
-       return r.tableFromResponse(ctx, identifier, ret.Metadata, 
ret.MetadataLoc, config)
+       return r.tableFromResponse(ctx, identifier, ret.Metadata, 
ret.MetadataLoc, config, credsVended)
 }
 
 // commitStagedCreate performs the second phase of a staged table
@@ -973,8 +1017,10 @@ func (r *Catalog) RegisterTable(ctx context.Context, 
identifier table.Identifier
        config := maps.Clone(r.props)
        maps.Copy(config, ret.Metadata.Properties())
        maps.Copy(config, ret.Config)
+       credsVended := len(ret.StorageCredentials) > 0
+       maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials, 
ret.MetadataLoc))
 
-       return r.tableFromResponse(ctx, identifier, ret.Metadata, 
ret.MetadataLoc, config)
+       return r.tableFromResponse(ctx, identifier, ret.Metadata, 
ret.MetadataLoc, config, credsVended)
 }
 
 func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier) 
(*table.Table, error) {
@@ -991,11 +1037,11 @@ func (r *Catalog) LoadTable(ctx context.Context, 
identifier table.Identifier) (*
 
        config := maps.Clone(r.props)
        maps.Copy(config, ret.Metadata.Properties())
-       for k, v := range ret.Config {
-               config[k] = v
-       }
+       maps.Copy(config, ret.Config)
+       credsVended := len(ret.StorageCredentials) > 0
+       maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials, 
ret.MetadataLoc))
 
-       return r.tableFromResponse(ctx, identifier, ret.Metadata, 
ret.MetadataLoc, config)
+       return r.tableFromResponse(ctx, identifier, ret.Metadata, 
ret.MetadataLoc, config, credsVended)
 }
 
 func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier, 
requirements []table.Requirement, updates []table.Update) (*table.Table, error) 
{
@@ -1030,7 +1076,7 @@ func (r *Catalog) UpdateTable(ctx context.Context, ident 
table.Identifier, requi
        config := maps.Clone(r.props)
        maps.Copy(config, ret.Metadata.Properties())
 
-       return r.tableFromResponse(ctx, ident, ret.Metadata, ret.MetadataLoc, 
config)
+       return r.tableFromResponse(ctx, ident, ret.Metadata, ret.MetadataLoc, 
config, false)
 }
 
 func (r *Catalog) DropTable(ctx context.Context, identifier table.Identifier) 
error {
diff --git a/catalog/rest/vended_creds.go b/catalog/rest/vended_creds.go
new file mode 100644
index 00000000..958a2db2
--- /dev/null
+++ b/catalog/rest/vended_creds.go
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package rest
+
+import (
+       "context"
+       "maps"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "golang.org/x/sync/semaphore"
+)
+
+const (
+       keyS3TokenExpiresAtMs = "s3.session-token-expires-at-ms"
+       keyAdlsSasExpiresAtMs = "adls.sas-token-expires-at-ms"
+       keyGcsOAuthExpiresAt  = "gcs.oauth2.token-expires-at"
+       keyExpirationTime     = "expiration-time"
+
+       defaultVendedCredentialsTTL = 60 * time.Minute
+)
+
+// resolveStorageCredentials finds the best-matching credential for the given
+// location using longest-prefix match, mirroring the Java and Python 
implementations.
+func resolveStorageCredentials(creds []storageCredential, location string) 
iceberg.Properties {
+       var best *storageCredential
+       for i := range creds {
+               c := &creds[i]
+               if strings.HasPrefix(location, c.Prefix) {
+                       if best == nil || len(c.Prefix) > len(best.Prefix) {
+                               best = c
+                       }
+               }
+       }
+       if best == nil {
+               return nil
+       }
+
+       return best.Config
+}
+
+var credentialExpiryKeys = []string{
+       keyS3TokenExpiresAtMs,
+       keyAdlsSasExpiresAtMs,
+       keyGcsOAuthExpiresAt,
+       keyExpirationTime,
+}
+
+func parseCredentialExpiry(config iceberg.Properties) (time.Time, bool) {
+       for _, key := range credentialExpiryKeys {
+               if v, ok := config[key]; ok {
+                       ms, err := strconv.ParseInt(v, 10, 64)
+                       if err == nil && ms > 0 {
+                               return time.UnixMilli(ms), true
+                       }
+               }
+       }
+
+       return time.Time{}, false
+}
+
+type vendedCredentialRefresher struct {
+       // Use a weighted semaphore with a single unit to use as an exclusive 
lock
+       // but cancellation (via context) is supported. This is important as we 
do IO
+       // while holding this lock and we want to allow others to cancel during 
acquisition.
+       mu        *semaphore.Weighted
+       cachedIO  iceio.IO
+       expiresAt time.Time
+
+       identifier []string
+       location   string
+       props      iceberg.Properties
+
+       fetchCreds func(ctx context.Context, ident []string) 
(iceberg.Properties, error)
+
+       nowFunc func() time.Time // for testing
+}
+
+func (v *vendedCredentialRefresher) now() time.Time {
+       if v.nowFunc != nil {
+               return v.nowFunc()
+       }
+
+       return time.Now()
+}
+
+func (v *vendedCredentialRefresher) loadFS(ctx context.Context) (iceio.IO, 
error) {
+       if err := v.mu.Acquire(ctx, 1); err != nil {
+               return nil, err
+       }
+       defer v.mu.Release(1)
+
+       if v.cachedIO != nil && !v.now().After(v.expiresAt) {
+               return v.cachedIO, nil
+       }
+
+       var config iceberg.Properties
+       if v.cachedIO == nil {
+               config = v.props
+       } else {
+               freshCreds, err := v.fetchCreds(ctx, v.identifier)
+               if err != nil {
+                       return v.cachedIO, nil
+               }
+
+               config = maps.Clone(v.props)
+               maps.Copy(config, freshCreds)
+       }
+
+       newIO, err := iceio.LoadFS(ctx, config, v.location)
+       if err != nil {
+               if v.cachedIO != nil {
+                       return v.cachedIO, nil
+               }
+
+               return nil, err
+       }
+
+       v.cachedIO = newIO
+       v.expiresAt = v.expiresAtFromConfig(config)
+
+       return v.cachedIO, nil
+}
+
+func (v *vendedCredentialRefresher) expiresAtFromConfig(config 
iceberg.Properties) time.Time {
+       if exp, ok := parseCredentialExpiry(config); ok {
+               return exp
+       }
+
+       return v.now().Add(defaultVendedCredentialsTTL)
+}
diff --git a/catalog/rest/vended_creds_test.go 
b/catalog/rest/vended_creds_test.go
new file mode 100644
index 00000000..2fdf0876
--- /dev/null
+++ b/catalog/rest/vended_creds_test.go
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package rest
+
+import (
+       "context"
+       "errors"
+       "strconv"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "golang.org/x/sync/semaphore"
+)
+
+func newTestRefresher(fetchCreds func(ctx context.Context, ident []string) 
(iceberg.Properties, error)) *vendedCredentialRefresher {
+       return &vendedCredentialRefresher{
+               mu:         semaphore.NewWeighted(1),
+               identifier: []string{"db", "tbl"},
+               location:   "file:///tmp/test",
+               props:      iceberg.Properties{},
+               fetchCreds: fetchCreds,
+       }
+}
+
+func TestVendedCredsCachedIOReturnedWhenNotExpired(t *testing.T) {
+       t.Parallel()
+
+       var callCount atomic.Int32
+
+       r := newTestRefresher(func(ctx context.Context, ident []string) 
(iceberg.Properties, error) {
+               callCount.Add(1)
+
+               return iceberg.Properties{}, nil
+       })
+
+       // Seed cached IO and set expiry in the future.
+       r.cachedIO = iceio.LocalFS{}
+       r.expiresAt = time.Now().Add(time.Hour)
+
+       io1, err := r.loadFS(context.Background())
+       require.NoError(t, err)
+       assert.Equal(t, r.cachedIO, io1)
+
+       io2, err := r.loadFS(context.Background())
+       require.NoError(t, err)
+       assert.Equal(t, io1, io2)
+
+       assert.Equal(t, int32(0), callCount.Load(),
+               "fetchConfig should not be called when credentials have not 
expired")
+}
+
+func TestVendedCredsInitialLoadUsesProps(t *testing.T) {
+       t.Parallel()
+
+       var callCount atomic.Int32
+
+       r := newTestRefresher(func(ctx context.Context, ident []string) 
(iceberg.Properties, error) {
+               callCount.Add(1)
+
+               return iceberg.Properties{}, nil
+       })
+
+       // First call with no cached IO should create IO from props,
+       // not call fetchConfig.
+       io1, err := r.loadFS(context.Background())
+       require.NoError(t, err)
+       assert.NotNil(t, io1)
+       assert.Equal(t, int32(0), callCount.Load(),
+               "fetchConfig should not be called on initial load")
+
+       // Second call should return cached IO.
+       io2, err := r.loadFS(context.Background())
+       require.NoError(t, err)
+       assert.Equal(t, io1, io2)
+}
+
+func TestVendedCredsRefreshTriggeredOnExpiry(t *testing.T) {
+       t.Parallel()
+
+       var callCount atomic.Int32
+       now := time.Now()
+
+       r := newTestRefresher(func(ctx context.Context, ident []string) 
(iceberg.Properties, error) {
+               callCount.Add(1)
+
+               return iceberg.Properties{}, nil
+       })
+       r.nowFunc = func() time.Time { return now }
+
+       // Seed with expired IO.
+       r.cachedIO = iceio.LocalFS{}
+       r.expiresAt = now.Add(-time.Second)
+
+       _, err := r.loadFS(context.Background())
+       require.NoError(t, err)
+       assert.Equal(t, int32(1), callCount.Load(),
+               "fetchConfig should be called once on expired credentials")
+
+       // Second call should use cached IO (expiry was reset).
+       _, err = r.loadFS(context.Background())
+       require.NoError(t, err)
+       assert.Equal(t, int32(1), callCount.Load(),
+               "fetchConfig should not be called again within expiry window")
+}
+
+func TestVendedCredsConcurrentAccess(t *testing.T) {
+       t.Parallel()
+
+       var callCount atomic.Int32
+
+       r := newTestRefresher(func(ctx context.Context, ident []string) 
(iceberg.Properties, error) {
+               callCount.Add(1)
+
+               return iceberg.Properties{}, nil
+       })
+
+       // No cached IO — concurrent initial loads should only create IO once.
+       var wg sync.WaitGroup
+       for range 10 {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       _, err := r.loadFS(context.Background())
+                       assert.NoError(t, err)
+               }()
+       }
+       wg.Wait()
+
+       // Initial load uses props directly, not fetchConfig.
+       assert.Equal(t, int32(0), callCount.Load(),
+               "fetchConfig should not be called during initial load")
+       assert.NotNil(t, r.cachedIO, "cachedIO should be set after initial 
load")
+}
+
+func TestVendedCredsGracefulDegradation(t *testing.T) {
+       t.Parallel()
+
+       fetchErr := errors.New("network error")
+       now := time.Now()
+
+       r := newTestRefresher(func(ctx context.Context, ident []string) 
(iceberg.Properties, error) {
+               return nil, fetchErr
+       })
+       r.nowFunc = func() time.Time { return now }
+
+       // Seed with valid cached IO but expired.
+       existingIO := iceio.LocalFS{}
+       r.cachedIO = existingIO
+       r.expiresAt = now.Add(-time.Second)
+
+       got, err := r.loadFS(context.Background())
+       require.NoError(t, err, "should not return error when cached IO exists 
and refresh fails")
+       assert.Equal(t, existingIO, got, "should return cached IO on refresh 
failure")
+}
+
+func TestVendedCredsErrorWhenInitialLoadFails(t *testing.T) {
+       t.Parallel()
+
+       r := newTestRefresher(func(ctx context.Context, ident []string) 
(iceberg.Properties, error) {
+               return iceberg.Properties{}, nil
+       })
+       // Use an unregistered scheme so LoadFS fails on the initial load.
+       r.location = "notascheme://bucket/path"
+
+       got, err := r.loadFS(context.Background())
+       require.Error(t, err)
+       assert.Nil(t, got)
+}
+
+func TestParseCredentialExpiry(t *testing.T) {
+       t.Parallel()
+
+       epoch := time.Date(2025, 6, 15, 12, 0, 0, 0, time.UTC)
+       epochMs := strconv.FormatInt(epoch.UnixMilli(), 10)
+
+       tests := []struct {
+               name   string
+               config iceberg.Properties
+               want   time.Time
+               found  bool
+       }{
+               {
+                       name:   "s3 token expiry",
+                       config: iceberg.Properties{keyS3TokenExpiresAtMs: 
epochMs},
+                       want:   epoch,
+                       found:  true,
+               },
+               {
+                       name:   "adls sas expiry",
+                       config: iceberg.Properties{keyAdlsSasExpiresAtMs: 
epochMs},
+                       want:   epoch,
+                       found:  true,
+               },
+               {
+                       name:   "gcs oauth expiry",
+                       config: iceberg.Properties{keyGcsOAuthExpiresAt: 
epochMs},
+                       want:   epoch,
+                       found:  true,
+               },
+               {
+                       name:   "generic expiration-time",
+                       config: iceberg.Properties{keyExpirationTime: epochMs},
+                       want:   epoch,
+                       found:  true,
+               },
+               {
+                       name:   "s3 takes precedence over generic",
+                       config: iceberg.Properties{keyS3TokenExpiresAtMs: 
epochMs, keyExpirationTime: "9999999999999"},
+                       want:   epoch,
+                       found:  true,
+               },
+               {
+                       name:   "no expiry keys",
+                       config: iceberg.Properties{"some-other-key": "value"},
+                       found:  false,
+               },
+               {
+                       name:   "invalid value ignored",
+                       config: iceberg.Properties{keyS3TokenExpiresAtMs: 
"not-a-number"},
+                       found:  false,
+               },
+               {
+                       name:   "zero value ignored",
+                       config: iceberg.Properties{keyS3TokenExpiresAtMs: "0"},
+                       found:  false,
+               },
+               {
+                       name:   "empty config",
+                       config: iceberg.Properties{},
+                       found:  false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       t.Parallel()
+                       got, found := parseCredentialExpiry(tt.config)
+                       assert.Equal(t, tt.found, found)
+                       if tt.found {
+                               assert.Equal(t, tt.want.UnixMilli(), 
got.UnixMilli())
+                       }
+               })
+       }
+}
+
+func TestVendedCredsExpiresAtFromConfig(t *testing.T) {
+       t.Parallel()
+
+       now := time.Now()
+
+       t.Run("uses server-provided expiry", func(t *testing.T) {
+               t.Parallel()
+               serverExpiry := now.Add(30 * time.Minute)
+               config := iceberg.Properties{
+                       keyS3TokenExpiresAtMs: 
strconv.FormatInt(serverExpiry.UnixMilli(), 10),
+               }
+               r := &vendedCredentialRefresher{
+                       mu:      semaphore.NewWeighted(1),
+                       nowFunc: func() time.Time { return now },
+               }
+               got := r.expiresAtFromConfig(config)
+               assert.Equal(t, serverExpiry.UnixMilli(), got.UnixMilli())
+       })
+
+       t.Run("falls back to default 60m when no expiry key", func(t 
*testing.T) {
+               t.Parallel()
+               r := &vendedCredentialRefresher{
+                       mu:      semaphore.NewWeighted(1),
+                       nowFunc: func() time.Time { return now },
+               }
+               got := r.expiresAtFromConfig(iceberg.Properties{})
+               assert.Equal(t, now.Add(60*time.Minute).UnixMilli(), 
got.UnixMilli())
+       })
+}
+
+func TestVendedCredsServerExpiryUsedOnRefresh(t *testing.T) {
+       t.Parallel()
+
+       now := time.Now()
+       serverExpiry := now.Add(20 * time.Minute)
+
+       var callCount atomic.Int32
+       r := newTestRefresher(func(ctx context.Context, ident []string) 
(iceberg.Properties, error) {
+               callCount.Add(1)
+
+               return iceberg.Properties{
+                       keyS3TokenExpiresAtMs: 
strconv.FormatInt(serverExpiry.UnixMilli(), 10),
+               }, nil
+       })
+       r.nowFunc = func() time.Time { return now }
+
+       // Seed with expired IO to trigger a refresh (not initial load).
+       r.cachedIO = iceio.LocalFS{}
+       r.expiresAt = now.Add(-time.Second)
+
+       _, err := r.loadFS(context.Background())
+       require.NoError(t, err)
+       assert.Equal(t, int32(1), callCount.Load())
+
+       // expiresAt should be the server-provided value, not now+default.
+       assert.Equal(t, serverExpiry.UnixMilli(), r.expiresAt.UnixMilli())
+}
+
+func TestResolveStorageCredentials(t *testing.T) {
+       t.Parallel()
+
+       s3Creds := iceberg.Properties{"s3.access-key-id": "AKID", 
"s3.secret-access-key": "secret"}
+       specificCreds := iceberg.Properties{"s3.access-key-id": "SPECIFIC"}
+
+       tests := []struct {
+               name     string
+               creds    []storageCredential
+               location string
+               want     iceberg.Properties
+       }{
+               {
+                       name:     "empty credentials",
+                       creds:    nil,
+                       location: "s3://bucket/path",
+                       want:     nil,
+               },
+               {
+                       name: "matching prefix",
+                       creds: []storageCredential{
+                               {Prefix: "s3://bucket/", Config: s3Creds},
+                       },
+                       location: "s3://bucket/path/to/file",
+                       want:     s3Creds,
+               },
+               {
+                       name: "no matching prefix",
+                       creds: []storageCredential{
+                               {Prefix: "s3://other-bucket/", Config: s3Creds},
+                       },
+                       location: "s3://bucket/path",
+                       want:     nil,
+               },
+               {
+                       name: "longest prefix wins",
+                       creds: []storageCredential{
+                               {Prefix: "s3://bucket/", Config: s3Creds},
+                               {Prefix: "s3://bucket/specific/", Config: 
specificCreds},
+                       },
+                       location: "s3://bucket/specific/path",
+                       want:     specificCreds,
+               },
+               {
+                       name: "longest prefix wins regardless of order",
+                       creds: []storageCredential{
+                               {Prefix: "s3://bucket/specific/", Config: 
specificCreds},
+                               {Prefix: "s3://bucket/", Config: s3Creds},
+                       },
+                       location: "s3://bucket/specific/path",
+                       want:     specificCreds,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       t.Parallel()
+                       got := resolveStorageCredentials(tt.creds, tt.location)
+                       assert.Equal(t, tt.want, got)
+               })
+       }
+}

Reply via email to