This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new b168e015 feat(catalog,io): refresh vended credentials (#795)
b168e015 is described below
commit b168e015f81456dd81ee7cc9f60991101d893da6
Author: Tyler Rockwood <[email protected]>
AuthorDate: Wed Mar 25 09:52:51 2026 -0500
feat(catalog,io): refresh vended credentials (#795)
Currently credential lifetimes are tied to the table instead, only load
table refreshes the credentials. This commit caches and refreshes the
credentials based on the response included in the table load response
and refreshes it dynamically.
This does still have the effect that holding on to a table IO for a long
time does not refresh the table, but this makes steps in that direction.
Fixes: https://github.com/apache/iceberg-go/issues/792
---
catalog/rest/rest.go | 72 +++++--
catalog/rest/vended_creds.go | 149 +++++++++++++++
catalog/rest/vended_creds_test.go | 386 ++++++++++++++++++++++++++++++++++++++
3 files changed, 594 insertions(+), 13 deletions(-)
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index dd181969..104ff16f 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -46,6 +46,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
+ "golang.org/x/sync/semaphore"
)
var (
@@ -139,11 +140,21 @@ func (t *commitTableResponse) UnmarshalJSON(b []byte)
(err error) {
return err
}
+type storageCredential struct {
+ Prefix string `json:"prefix"`
+ Config iceberg.Properties `json:"config"`
+}
+
type loadTableResponse struct {
- MetadataLoc string `json:"metadata-location"`
- RawMetadata json.RawMessage `json:"metadata"`
- Config iceberg.Properties `json:"config"`
- Metadata table.Metadata `json:"-"`
+ MetadataLoc string `json:"metadata-location"`
+ RawMetadata json.RawMessage `json:"metadata"`
+ Config iceberg.Properties `json:"config"`
+ StorageCredentials []storageCredential `json:"storage-credentials"`
+ Metadata table.Metadata `json:"-"`
+}
+
+type loadCredentialsResponse struct {
+ StorageCredentials []storageCredential `json:"storage-credentials"`
}
func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) {
@@ -678,16 +689,47 @@ func checkValidNamespace(ident table.Identifier) error {
return nil
}
-func (r *Catalog) tableFromResponse(ctx context.Context, identifier []string,
metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table,
error) {
+func (r *Catalog) tableFromResponse(_ context.Context, identifier []string,
metadata table.Metadata, loc string, config iceberg.Properties, credsVended
bool) (*table.Table, error) {
+ var fsF func(context.Context) (iceio.IO, error)
+ if credsVended {
+ refresher := &vendedCredentialRefresher{
+ mu: semaphore.NewWeighted(1),
+ identifier: identifier,
+ location: loc,
+ props: config,
+ fetchCreds: func(ctx context.Context, ident []string)
(iceberg.Properties, error) {
+ return r.fetchTableCreds(ctx, ident, loc)
+ },
+ }
+ fsF = refresher.loadFS
+ } else {
+ fsF = iceio.LoadFSFunc(config, loc)
+ }
+
return table.New(
identifier,
metadata,
loc,
- iceio.LoadFSFunc(config, loc),
+ fsF,
r,
), nil
}
+func (r *Catalog) fetchTableCreds(ctx context.Context, ident []string,
location string) (iceberg.Properties, error) {
+ ns, tbl, err := splitIdentForPath(ident)
+ if err != nil {
+ return nil, err
+ }
+
+ ret, err := doGet[loadCredentialsResponse](ctx, r.baseURI,
[]string{"namespaces", ns, "tables", tbl, "credentials"},
+ r.cl, map[int]error{http.StatusNotFound:
catalog.ErrNoSuchTable})
+ if err != nil {
+ return nil, err
+ }
+
+ return resolveStorageCredentials(ret.StorageCredentials, location), nil
+}
+
func (r *Catalog) ListTables(ctx context.Context, namespace table.Identifier)
iter.Seq2[table.Identifier, error] {
return func(yield func(table.Identifier, error) bool) {
pageSize := r.getPageSize(ctx)
@@ -799,8 +841,10 @@ func (r *Catalog) CreateTable(ctx context.Context,
identifier table.Identifier,
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)
+ credsVended := len(ret.StorageCredentials) > 0
+ maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials,
ret.MetadataLoc))
- return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config)
+ return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config, credsVended)
}
// commitStagedCreate performs the second phase of a staged table
@@ -973,8 +1017,10 @@ func (r *Catalog) RegisterTable(ctx context.Context,
identifier table.Identifier
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)
+ credsVended := len(ret.StorageCredentials) > 0
+ maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials,
ret.MetadataLoc))
- return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config)
+ return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config, credsVended)
}
func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier)
(*table.Table, error) {
@@ -991,11 +1037,11 @@ func (r *Catalog) LoadTable(ctx context.Context,
identifier table.Identifier) (*
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
- for k, v := range ret.Config {
- config[k] = v
- }
+ maps.Copy(config, ret.Config)
+ credsVended := len(ret.StorageCredentials) > 0
+ maps.Copy(config, resolveStorageCredentials(ret.StorageCredentials,
ret.MetadataLoc))
- return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config)
+ return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config, credsVended)
}
func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier,
requirements []table.Requirement, updates []table.Update) (*table.Table, error)
{
@@ -1030,7 +1076,7 @@ func (r *Catalog) UpdateTable(ctx context.Context, ident
table.Identifier, requi
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
- return r.tableFromResponse(ctx, ident, ret.Metadata, ret.MetadataLoc,
config)
+ return r.tableFromResponse(ctx, ident, ret.Metadata, ret.MetadataLoc,
config, false)
}
func (r *Catalog) DropTable(ctx context.Context, identifier table.Identifier)
error {
diff --git a/catalog/rest/vended_creds.go b/catalog/rest/vended_creds.go
new file mode 100644
index 00000000..958a2db2
--- /dev/null
+++ b/catalog/rest/vended_creds.go
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package rest
+
+import (
+ "context"
+ "maps"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "golang.org/x/sync/semaphore"
+)
+
+const (
+ keyS3TokenExpiresAtMs = "s3.session-token-expires-at-ms"
+ keyAdlsSasExpiresAtMs = "adls.sas-token-expires-at-ms"
+ keyGcsOAuthExpiresAt = "gcs.oauth2.token-expires-at"
+ keyExpirationTime = "expiration-time"
+
+ defaultVendedCredentialsTTL = 60 * time.Minute
+)
+
+// resolveStorageCredentials finds the best-matching credential for the given
+// location using longest-prefix match, mirroring the Java and Python
implementations.
+func resolveStorageCredentials(creds []storageCredential, location string)
iceberg.Properties {
+ var best *storageCredential
+ for i := range creds {
+ c := &creds[i]
+ if strings.HasPrefix(location, c.Prefix) {
+ if best == nil || len(c.Prefix) > len(best.Prefix) {
+ best = c
+ }
+ }
+ }
+ if best == nil {
+ return nil
+ }
+
+ return best.Config
+}
+
+var credentialExpiryKeys = []string{
+ keyS3TokenExpiresAtMs,
+ keyAdlsSasExpiresAtMs,
+ keyGcsOAuthExpiresAt,
+ keyExpirationTime,
+}
+
+func parseCredentialExpiry(config iceberg.Properties) (time.Time, bool) {
+ for _, key := range credentialExpiryKeys {
+ if v, ok := config[key]; ok {
+ ms, err := strconv.ParseInt(v, 10, 64)
+ if err == nil && ms > 0 {
+ return time.UnixMilli(ms), true
+ }
+ }
+ }
+
+ return time.Time{}, false
+}
+
+type vendedCredentialRefresher struct {
+ // Use a weighted semaphore with a single unit to use as an exclusive
lock
+ // but cancellation (via context) is supported. This is important as we
do IO
+ // while holding this lock and we want to allow others to cancel during
acquisition.
+ mu *semaphore.Weighted
+ cachedIO iceio.IO
+ expiresAt time.Time
+
+ identifier []string
+ location string
+ props iceberg.Properties
+
+ fetchCreds func(ctx context.Context, ident []string)
(iceberg.Properties, error)
+
+ nowFunc func() time.Time // for testing
+}
+
+func (v *vendedCredentialRefresher) now() time.Time {
+ if v.nowFunc != nil {
+ return v.nowFunc()
+ }
+
+ return time.Now()
+}
+
+func (v *vendedCredentialRefresher) loadFS(ctx context.Context) (iceio.IO,
error) {
+ if err := v.mu.Acquire(ctx, 1); err != nil {
+ return nil, err
+ }
+ defer v.mu.Release(1)
+
+ if v.cachedIO != nil && !v.now().After(v.expiresAt) {
+ return v.cachedIO, nil
+ }
+
+ var config iceberg.Properties
+ if v.cachedIO == nil {
+ config = v.props
+ } else {
+ freshCreds, err := v.fetchCreds(ctx, v.identifier)
+ if err != nil {
+ return v.cachedIO, nil
+ }
+
+ config = maps.Clone(v.props)
+ maps.Copy(config, freshCreds)
+ }
+
+ newIO, err := iceio.LoadFS(ctx, config, v.location)
+ if err != nil {
+ if v.cachedIO != nil {
+ return v.cachedIO, nil
+ }
+
+ return nil, err
+ }
+
+ v.cachedIO = newIO
+ v.expiresAt = v.expiresAtFromConfig(config)
+
+ return v.cachedIO, nil
+}
+
+func (v *vendedCredentialRefresher) expiresAtFromConfig(config
iceberg.Properties) time.Time {
+ if exp, ok := parseCredentialExpiry(config); ok {
+ return exp
+ }
+
+ return v.now().Add(defaultVendedCredentialsTTL)
+}
diff --git a/catalog/rest/vended_creds_test.go
b/catalog/rest/vended_creds_test.go
new file mode 100644
index 00000000..2fdf0876
--- /dev/null
+++ b/catalog/rest/vended_creds_test.go
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package rest
+
+import (
+ "context"
+ "errors"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "golang.org/x/sync/semaphore"
+)
+
+func newTestRefresher(fetchCreds func(ctx context.Context, ident []string)
(iceberg.Properties, error)) *vendedCredentialRefresher {
+ return &vendedCredentialRefresher{
+ mu: semaphore.NewWeighted(1),
+ identifier: []string{"db", "tbl"},
+ location: "file:///tmp/test",
+ props: iceberg.Properties{},
+ fetchCreds: fetchCreds,
+ }
+}
+
+func TestVendedCredsCachedIOReturnedWhenNotExpired(t *testing.T) {
+ t.Parallel()
+
+ var callCount atomic.Int32
+
+ r := newTestRefresher(func(ctx context.Context, ident []string)
(iceberg.Properties, error) {
+ callCount.Add(1)
+
+ return iceberg.Properties{}, nil
+ })
+
+ // Seed cached IO and set expiry in the future.
+ r.cachedIO = iceio.LocalFS{}
+ r.expiresAt = time.Now().Add(time.Hour)
+
+ io1, err := r.loadFS(context.Background())
+ require.NoError(t, err)
+ assert.Equal(t, r.cachedIO, io1)
+
+ io2, err := r.loadFS(context.Background())
+ require.NoError(t, err)
+ assert.Equal(t, io1, io2)
+
+ assert.Equal(t, int32(0), callCount.Load(),
+ "fetchConfig should not be called when credentials have not
expired")
+}
+
+func TestVendedCredsInitialLoadUsesProps(t *testing.T) {
+ t.Parallel()
+
+ var callCount atomic.Int32
+
+ r := newTestRefresher(func(ctx context.Context, ident []string)
(iceberg.Properties, error) {
+ callCount.Add(1)
+
+ return iceberg.Properties{}, nil
+ })
+
+ // First call with no cached IO should create IO from props,
+ // not call fetchConfig.
+ io1, err := r.loadFS(context.Background())
+ require.NoError(t, err)
+ assert.NotNil(t, io1)
+ assert.Equal(t, int32(0), callCount.Load(),
+ "fetchConfig should not be called on initial load")
+
+ // Second call should return cached IO.
+ io2, err := r.loadFS(context.Background())
+ require.NoError(t, err)
+ assert.Equal(t, io1, io2)
+}
+
+func TestVendedCredsRefreshTriggeredOnExpiry(t *testing.T) {
+ t.Parallel()
+
+ var callCount atomic.Int32
+ now := time.Now()
+
+ r := newTestRefresher(func(ctx context.Context, ident []string)
(iceberg.Properties, error) {
+ callCount.Add(1)
+
+ return iceberg.Properties{}, nil
+ })
+ r.nowFunc = func() time.Time { return now }
+
+ // Seed with expired IO.
+ r.cachedIO = iceio.LocalFS{}
+ r.expiresAt = now.Add(-time.Second)
+
+ _, err := r.loadFS(context.Background())
+ require.NoError(t, err)
+ assert.Equal(t, int32(1), callCount.Load(),
+ "fetchConfig should be called once on expired credentials")
+
+ // Second call should use cached IO (expiry was reset).
+ _, err = r.loadFS(context.Background())
+ require.NoError(t, err)
+ assert.Equal(t, int32(1), callCount.Load(),
+ "fetchConfig should not be called again within expiry window")
+}
+
+func TestVendedCredsConcurrentAccess(t *testing.T) {
+ t.Parallel()
+
+ var callCount atomic.Int32
+
+ r := newTestRefresher(func(ctx context.Context, ident []string)
(iceberg.Properties, error) {
+ callCount.Add(1)
+
+ return iceberg.Properties{}, nil
+ })
+
+ // No cached IO — concurrent initial loads should only create IO once.
+ var wg sync.WaitGroup
+ for range 10 {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, err := r.loadFS(context.Background())
+ assert.NoError(t, err)
+ }()
+ }
+ wg.Wait()
+
+ // Initial load uses props directly, not fetchConfig.
+ assert.Equal(t, int32(0), callCount.Load(),
+ "fetchConfig should not be called during initial load")
+ assert.NotNil(t, r.cachedIO, "cachedIO should be set after initial
load")
+}
+
+func TestVendedCredsGracefulDegradation(t *testing.T) {
+ t.Parallel()
+
+ fetchErr := errors.New("network error")
+ now := time.Now()
+
+ r := newTestRefresher(func(ctx context.Context, ident []string)
(iceberg.Properties, error) {
+ return nil, fetchErr
+ })
+ r.nowFunc = func() time.Time { return now }
+
+ // Seed with valid cached IO but expired.
+ existingIO := iceio.LocalFS{}
+ r.cachedIO = existingIO
+ r.expiresAt = now.Add(-time.Second)
+
+ got, err := r.loadFS(context.Background())
+ require.NoError(t, err, "should not return error when cached IO exists
and refresh fails")
+ assert.Equal(t, existingIO, got, "should return cached IO on refresh
failure")
+}
+
+func TestVendedCredsErrorWhenInitialLoadFails(t *testing.T) {
+ t.Parallel()
+
+ r := newTestRefresher(func(ctx context.Context, ident []string)
(iceberg.Properties, error) {
+ return iceberg.Properties{}, nil
+ })
+ // Use an unregistered scheme so LoadFS fails on the initial load.
+ r.location = "notascheme://bucket/path"
+
+ got, err := r.loadFS(context.Background())
+ require.Error(t, err)
+ assert.Nil(t, got)
+}
+
+func TestParseCredentialExpiry(t *testing.T) {
+ t.Parallel()
+
+ epoch := time.Date(2025, 6, 15, 12, 0, 0, 0, time.UTC)
+ epochMs := strconv.FormatInt(epoch.UnixMilli(), 10)
+
+ tests := []struct {
+ name string
+ config iceberg.Properties
+ want time.Time
+ found bool
+ }{
+ {
+ name: "s3 token expiry",
+ config: iceberg.Properties{keyS3TokenExpiresAtMs:
epochMs},
+ want: epoch,
+ found: true,
+ },
+ {
+ name: "adls sas expiry",
+ config: iceberg.Properties{keyAdlsSasExpiresAtMs:
epochMs},
+ want: epoch,
+ found: true,
+ },
+ {
+ name: "gcs oauth expiry",
+ config: iceberg.Properties{keyGcsOAuthExpiresAt:
epochMs},
+ want: epoch,
+ found: true,
+ },
+ {
+ name: "generic expiration-time",
+ config: iceberg.Properties{keyExpirationTime: epochMs},
+ want: epoch,
+ found: true,
+ },
+ {
+ name: "s3 takes precedence over generic",
+ config: iceberg.Properties{keyS3TokenExpiresAtMs:
epochMs, keyExpirationTime: "9999999999999"},
+ want: epoch,
+ found: true,
+ },
+ {
+ name: "no expiry keys",
+ config: iceberg.Properties{"some-other-key": "value"},
+ found: false,
+ },
+ {
+ name: "invalid value ignored",
+ config: iceberg.Properties{keyS3TokenExpiresAtMs:
"not-a-number"},
+ found: false,
+ },
+ {
+ name: "zero value ignored",
+ config: iceberg.Properties{keyS3TokenExpiresAtMs: "0"},
+ found: false,
+ },
+ {
+ name: "empty config",
+ config: iceberg.Properties{},
+ found: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+ got, found := parseCredentialExpiry(tt.config)
+ assert.Equal(t, tt.found, found)
+ if tt.found {
+ assert.Equal(t, tt.want.UnixMilli(),
got.UnixMilli())
+ }
+ })
+ }
+}
+
+func TestVendedCredsExpiresAtFromConfig(t *testing.T) {
+ t.Parallel()
+
+ now := time.Now()
+
+ t.Run("uses server-provided expiry", func(t *testing.T) {
+ t.Parallel()
+ serverExpiry := now.Add(30 * time.Minute)
+ config := iceberg.Properties{
+ keyS3TokenExpiresAtMs:
strconv.FormatInt(serverExpiry.UnixMilli(), 10),
+ }
+ r := &vendedCredentialRefresher{
+ mu: semaphore.NewWeighted(1),
+ nowFunc: func() time.Time { return now },
+ }
+ got := r.expiresAtFromConfig(config)
+ assert.Equal(t, serverExpiry.UnixMilli(), got.UnixMilli())
+ })
+
+ t.Run("falls back to default 60m when no expiry key", func(t
*testing.T) {
+ t.Parallel()
+ r := &vendedCredentialRefresher{
+ mu: semaphore.NewWeighted(1),
+ nowFunc: func() time.Time { return now },
+ }
+ got := r.expiresAtFromConfig(iceberg.Properties{})
+ assert.Equal(t, now.Add(60*time.Minute).UnixMilli(),
got.UnixMilli())
+ })
+}
+
+func TestVendedCredsServerExpiryUsedOnRefresh(t *testing.T) {
+ t.Parallel()
+
+ now := time.Now()
+ serverExpiry := now.Add(20 * time.Minute)
+
+ var callCount atomic.Int32
+ r := newTestRefresher(func(ctx context.Context, ident []string)
(iceberg.Properties, error) {
+ callCount.Add(1)
+
+ return iceberg.Properties{
+ keyS3TokenExpiresAtMs:
strconv.FormatInt(serverExpiry.UnixMilli(), 10),
+ }, nil
+ })
+ r.nowFunc = func() time.Time { return now }
+
+ // Seed with expired IO to trigger a refresh (not initial load).
+ r.cachedIO = iceio.LocalFS{}
+ r.expiresAt = now.Add(-time.Second)
+
+ _, err := r.loadFS(context.Background())
+ require.NoError(t, err)
+ assert.Equal(t, int32(1), callCount.Load())
+
+ // expiresAt should be the server-provided value, not now+default.
+ assert.Equal(t, serverExpiry.UnixMilli(), r.expiresAt.UnixMilli())
+}
+
+func TestResolveStorageCredentials(t *testing.T) {
+ t.Parallel()
+
+ s3Creds := iceberg.Properties{"s3.access-key-id": "AKID",
"s3.secret-access-key": "secret"}
+ specificCreds := iceberg.Properties{"s3.access-key-id": "SPECIFIC"}
+
+ tests := []struct {
+ name string
+ creds []storageCredential
+ location string
+ want iceberg.Properties
+ }{
+ {
+ name: "empty credentials",
+ creds: nil,
+ location: "s3://bucket/path",
+ want: nil,
+ },
+ {
+ name: "matching prefix",
+ creds: []storageCredential{
+ {Prefix: "s3://bucket/", Config: s3Creds},
+ },
+ location: "s3://bucket/path/to/file",
+ want: s3Creds,
+ },
+ {
+ name: "no matching prefix",
+ creds: []storageCredential{
+ {Prefix: "s3://other-bucket/", Config: s3Creds},
+ },
+ location: "s3://bucket/path",
+ want: nil,
+ },
+ {
+ name: "longest prefix wins",
+ creds: []storageCredential{
+ {Prefix: "s3://bucket/", Config: s3Creds},
+ {Prefix: "s3://bucket/specific/", Config:
specificCreds},
+ },
+ location: "s3://bucket/specific/path",
+ want: specificCreds,
+ },
+ {
+ name: "longest prefix wins regardless of order",
+ creds: []storageCredential{
+ {Prefix: "s3://bucket/specific/", Config:
specificCreds},
+ {Prefix: "s3://bucket/", Config: s3Creds},
+ },
+ location: "s3://bucket/specific/path",
+ want: specificCreds,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+ got := resolveStorageCredentials(tt.creds, tt.location)
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}