This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 97a9566 feat(catalog): Propagate ctx from catalog interface through
call stack (#276)
97a9566 is described below
commit 97a9566b0f37ec904520a8efe59661a4377cad5a
Author: Rick Curtis <[email protected]>
AuthorDate: Wed Jan 29 16:21:31 2025 -0800
feat(catalog): Propagate ctx from catalog interface through call stack
(#276)
In general we should pass `context.Context` through call path rather
than creating `context.Background` at random places.
I made this change in response to [this
comment](https://github.com/apache/iceberg-go/pull/275#issuecomment-2617075856).
I'll fix the original problem in a subsequent PR.
---
catalog/glue/glue.go | 10 +++---
catalog/internal/utils.go | 5 +--
catalog/registry.go | 13 +++----
catalog/registry_test.go | 31 ++++++++++-------
catalog/rest/rest.go | 38 ++++++++++-----------
catalog/rest/rest_internal_test.go | 5 +--
catalog/rest/rest_test.go | 70 ++++++++++++++++++++------------------
catalog/sql/sql.go | 6 ++--
catalog/sql/sql_test.go | 12 +++----
cmd/iceberg/main.go | 53 ++++++++++++++---------------
io/blob.go | 19 ++++++-----
io/io.go | 9 +++--
io/s3.go | 6 ++--
table/arrow_scanner.go | 2 +-
table/arrow_utils.go | 4 +--
table/arrow_utils_test.go | 3 +-
table/scanner_test.go | 2 +-
17 files changed, 149 insertions(+), 139 deletions(-)
diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go
index c9f5950..682d28a 100644
--- a/catalog/glue/glue.go
+++ b/catalog/glue/glue.go
@@ -68,8 +68,8 @@ var (
)
func init() {
- catalog.Register("glue", catalog.RegistrarFunc(func(_ string, props
iceberg.Properties) (catalog.Catalog, error) {
- awsConfig, err := toAwsConfig(props)
+ catalog.Register("glue", catalog.RegistrarFunc(func(ctx
context.Context, _ string, props iceberg.Properties) (catalog.Catalog, error) {
+ awsConfig, err := toAwsConfig(ctx, props)
if err != nil {
return nil, err
}
@@ -78,7 +78,7 @@ func init() {
}))
}
-func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
+func toAwsConfig(ctx context.Context, p iceberg.Properties) (aws.Config,
error) {
opts := make([]func(*config.LoadOptions) error, 0)
for k, v := range p {
@@ -108,7 +108,7 @@ func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
credentials.NewStaticCredentialsProvider(key, secret,
token)))
}
- return config.LoadDefaultConfig(context.Background(), opts...)
+ return config.LoadDefaultConfig(ctx, opts...)
}
type glueAPI interface {
@@ -205,7 +205,7 @@ func (c *Catalog) LoadTable(ctx context.Context, identifier
table.Identifier, pr
}
// TODO: consider providing a way to directly access the S3 iofs to
enable testing of the catalog.
- iofs, err := io.LoadFS(props, location)
+ iofs, err := io.LoadFS(ctx, props, location)
if err != nil {
return nil, fmt.Errorf("failed to load table %s.%s: %w",
database, tableName, err)
}
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index 500368b..9b5e7bd 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -18,6 +18,7 @@
package internal
import (
+ "context"
"encoding/json"
"fmt"
@@ -39,8 +40,8 @@ func GetMetadataLoc(location string, newVersion uint) string {
location, newVersion, uuid.New().String())
}
-func WriteMetadata(metadata table.Metadata, loc string, props
iceberg.Properties) error {
- fs, err := io.LoadFS(props, loc)
+func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string,
props iceberg.Properties) error {
+ fs, err := io.LoadFS(ctx, props, loc)
if err != nil {
return err
}
diff --git a/catalog/registry.go b/catalog/registry.go
index e8e6132..bf24e83 100644
--- a/catalog/registry.go
+++ b/catalog/registry.go
@@ -18,6 +18,7 @@
package catalog
import (
+ "context"
"fmt"
"maps"
"net/url"
@@ -64,13 +65,13 @@ var (
// Registrar is a factory for creating Catalog instances, used for registering
to use
// with LoadCatalog.
type Registrar interface {
- GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error)
+ GetCatalog(ctx context.Context, catalogName string, props
iceberg.Properties) (Catalog, error)
}
-type RegistrarFunc func(string, iceberg.Properties) (Catalog, error)
+type RegistrarFunc func(context.Context, string, iceberg.Properties) (Catalog,
error)
-func (f RegistrarFunc) GetCatalog(catalogURI string, props iceberg.Properties)
(Catalog, error) {
- return f(catalogURI, props)
+func (f RegistrarFunc) GetCatalog(ctx context.Context, catalogName string,
props iceberg.Properties) (Catalog, error) {
+ return f(ctx, catalogName, props)
}
// Register adds the new catalog type to the registry. If the catalog type is
already registered, it will be replaced.
@@ -125,7 +126,7 @@ func GetRegisteredCatalogs() []string {
// as the REST endpoint, otherwise the URI is used as the endpoint. The
REST catalog also
// registers "http" and "https" so that Load with a http/s URI will
automatically
// load the REST Catalog.
-func Load(name string, props iceberg.Properties) (Catalog, error) {
+func Load(ctx context.Context, name string, props iceberg.Properties)
(Catalog, error) {
if name == "" {
name = config.EnvConfig.DefaultCatalog
}
@@ -159,5 +160,5 @@ func Load(name string, props iceberg.Properties) (Catalog,
error) {
return nil, fmt.Errorf("%w: %s", ErrCatalogNotFound,
catalogType)
}
- return cat.GetCatalog(name, props)
+ return cat.GetCatalog(ctx, name, props)
}
diff --git a/catalog/registry_test.go b/catalog/registry_test.go
index 1bc9271..df1d29f 100644
--- a/catalog/registry_test.go
+++ b/catalog/registry_test.go
@@ -18,6 +18,7 @@
package catalog_test
import (
+ "context"
"encoding/json"
"net/http"
"net/http/httptest"
@@ -33,6 +34,7 @@ import (
)
func TestCatalogRegistry(t *testing.T) {
+ ctx := context.Background()
assert.ElementsMatch(t, []string{
"rest",
"http",
@@ -40,7 +42,7 @@ func TestCatalogRegistry(t *testing.T) {
"glue",
}, catalog.GetRegisteredCatalogs())
- catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p
iceberg.Properties) (catalog.Catalog, error) {
+ catalog.Register("foobar", catalog.RegistrarFunc(func(ctx
context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "foobar", s)
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
@@ -54,28 +56,28 @@ func TestCatalogRegistry(t *testing.T) {
"glue",
}, catalog.GetRegisteredCatalogs())
- c, err := catalog.Load("foobar", iceberg.Properties{"foo": "baz"})
+ c, err := catalog.Load(ctx, "foobar", iceberg.Properties{"foo": "baz"})
assert.Nil(t, c)
assert.ErrorIs(t, err, catalog.ErrCatalogNotFound)
- catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p
iceberg.Properties) (catalog.Catalog, error) {
+ catalog.Register("foobar", catalog.RegistrarFunc(func(ctx
context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "not found", s)
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
}))
- c, err = catalog.Load("not found", iceberg.Properties{"type": "foobar",
"foo": "baz"})
+ c, err = catalog.Load(ctx, "not found", iceberg.Properties{"type":
"foobar", "foo": "baz"})
assert.Nil(t, c)
assert.NoError(t, err)
- catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p
iceberg.Properties) (catalog.Catalog, error) {
+ catalog.Register("foobar", catalog.RegistrarFunc(func(ctx
context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "not found", s)
assert.Equal(t, "foobar://helloworld", p.Get("uri", ""))
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
}))
- c, err = catalog.Load("not found", iceberg.Properties{
+ c, err = catalog.Load(ctx, "not found", iceberg.Properties{
"uri": "foobar://helloworld",
"foo": "baz"})
assert.Nil(t, c)
@@ -95,6 +97,7 @@ func TestRegistryPanic(t *testing.T) {
}
func TestCatalogWithEmptyName(t *testing.T) {
+ ctx := context.Background()
config.EnvConfig.DefaultCatalog = "test-default"
config.EnvConfig.Catalogs = map[string]config.CatalogConfig{
"test-default": {
@@ -104,7 +107,7 @@ func TestCatalogWithEmptyName(t *testing.T) {
CatalogType: "mock",
},
}
- catalog.Register("mock", catalog.RegistrarFunc(func(name string, props
iceberg.Properties) (catalog.Catalog, error) {
+ catalog.Register("mock", catalog.RegistrarFunc(func(ctx
context.Context, name string, props iceberg.Properties) (catalog.Catalog,
error) {
// Ensure the correct name and properties are passed
assert.Equal(t, "test-default", name)
assert.Equal(t, "http://localhost:8181/", props.Get("uri", ""))
@@ -112,7 +115,7 @@ func TestCatalogWithEmptyName(t *testing.T) {
assert.Equal(t, "/default/warehouse", props.Get("warehouse",
""))
return nil, nil
}))
- c, err := catalog.Load("", nil)
+ c, err := catalog.Load(ctx, "", nil)
assert.Nil(t, c)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{
@@ -127,6 +130,7 @@ func TestCatalogWithEmptyName(t *testing.T) {
}
func TestCatalogLoadInvalidURI(t *testing.T) {
+ ctx := context.Background()
config.EnvConfig.DefaultCatalog = "default"
config.EnvConfig.Catalogs = map[string]config.CatalogConfig{
"default": {
@@ -137,13 +141,13 @@ func TestCatalogLoadInvalidURI(t *testing.T) {
},
}
- catalog.Register("mock", catalog.RegistrarFunc(func(name string, props
iceberg.Properties) (catalog.Catalog, error) {
+ catalog.Register("mock", catalog.RegistrarFunc(func(ctx
context.Context, name string, props iceberg.Properties) (catalog.Catalog,
error) {
return nil, nil
}))
props := iceberg.Properties{
"uri": "://invalid-uri", // This will cause url.Parse to fail
}
- c, err := catalog.Load("mock", props)
+ c, err := catalog.Load(ctx, "mock", props)
assert.Nil(t, c)
assert.Error(t, err)
@@ -152,6 +156,7 @@ func TestCatalogLoadInvalidURI(t *testing.T) {
}
func TestRegistryFromConfig(t *testing.T) {
+ ctx := context.Background()
var params url.Values
mux := http.NewServeMux()
@@ -178,13 +183,13 @@ func TestRegistryFromConfig(t *testing.T) {
},
}
- c, err := catalog.Load("foobar", nil)
+ c, err := catalog.Load(ctx, "foobar", nil)
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
assert.Equal(t, "catalog_name", params.Get("warehouse"))
- c, err = catalog.Load("foobar", iceberg.Properties{"warehouse":
"overriden"})
+ c, err = catalog.Load(ctx, "foobar", iceberg.Properties{"warehouse":
"overriden"})
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
@@ -195,7 +200,7 @@ func TestRegistryFromConfig(t *testing.T) {
srv2 := httptest.NewServer(mux)
defer srv2.Close()
- c, err = catalog.Load("foobar", iceberg.Properties{"uri": srv2.URL})
+ c, err = catalog.Load(ctx, "foobar", iceberg.Properties{"uri":
srv2.URL})
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index a84c395..e0d20e8 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -81,8 +81,8 @@ var (
)
func init() {
- reg := catalog.RegistrarFunc(func(name string, p iceberg.Properties)
(catalog.Catalog, error) {
- return newCatalogFromProps(name, p.Get("uri", ""), p)
+ reg := catalog.RegistrarFunc(func(ctx context.Context, name string, p
iceberg.Properties) (catalog.Catalog, error) {
+ return newCatalogFromProps(ctx, name, p.Get("uri", ""), p)
})
catalog.Register(string(catalog.REST), reg)
@@ -439,39 +439,39 @@ type Catalog struct {
props iceberg.Properties
}
-func newCatalogFromProps(name string, uri string, p iceberg.Properties)
(*Catalog, error) {
+func newCatalogFromProps(ctx context.Context, name string, uri string, p
iceberg.Properties) (*Catalog, error) {
ops := fromProps(p)
r := &Catalog{name: name}
- if err := r.init(ops, uri); err != nil {
+ if err := r.init(ctx, ops, uri); err != nil {
return nil, err
}
return r, nil
}
-func NewCatalog(name, uri string, opts ...Option) (*Catalog, error) {
+func NewCatalog(ctx context.Context, name, uri string, opts ...Option)
(*Catalog, error) {
ops := &options{}
for _, o := range opts {
o(ops)
}
r := &Catalog{name: name}
- if err := r.init(ops, uri); err != nil {
+ if err := r.init(ctx, ops, uri); err != nil {
return nil, err
}
return r, nil
}
-func (r *Catalog) init(ops *options, uri string) error {
+func (r *Catalog) init(ctx context.Context, ops *options, uri string) error {
baseuri, err := url.Parse(uri)
if err != nil {
return err
}
r.baseURI = baseuri.JoinPath("v1")
- if r.cl, ops, err = r.fetchConfig(ops); err != nil {
+ if r.cl, ops, err = r.fetchConfig(ctx, ops); err != nil {
return err
}
@@ -535,7 +535,7 @@ func (r *Catalog) fetchAccessToken(cl *http.Client, creds
string, opts *options)
}
}
-func (r *Catalog) createSession(opts *options) (*http.Client, error) {
+func (r *Catalog) createSession(ctx context.Context, opts *options)
(*http.Client, error) {
session := &sessionTransport{
Transport: http.Transport{TLSClientConfig: opts.tlsConfig},
defaultHeaders: http.Header{},
@@ -560,7 +560,7 @@ func (r *Catalog) createSession(opts *options)
(*http.Client, error) {
session.defaultHeaders.Set("X-Iceberg-Access-Delegation",
"vended-credentials")
if opts.enableSigv4 {
- cfg, err := config.LoadDefaultConfig(context.Background())
+ cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, err
}
@@ -576,7 +576,7 @@ func (r *Catalog) createSession(opts *options)
(*http.Client, error) {
return cl, nil
}
-func (r *Catalog) fetchConfig(opts *options) (*http.Client, *options, error) {
+func (r *Catalog) fetchConfig(ctx context.Context, opts *options)
(*http.Client, *options, error) {
params := url.Values{}
if opts.warehouseLocation != "" {
params.Set(keyWarehouseLocation, opts.warehouseLocation)
@@ -585,12 +585,12 @@ func (r *Catalog) fetchConfig(opts *options)
(*http.Client, *options, error) {
route := r.baseURI.JoinPath("config")
route.RawQuery = params.Encode()
- sess, err := r.createSession(opts)
+ sess, err := r.createSession(ctx, opts)
if err != nil {
return nil, nil, err
}
- rsp, err := doGet[configResponse](context.Background(), route,
[]string{}, sess, nil)
+ rsp, err := doGet[configResponse](ctx, route, []string{}, sess, nil)
if err != nil {
return nil, nil, err
}
@@ -627,13 +627,13 @@ func checkValidNamespace(ident table.Identifier) error {
return nil
}
-func (r *Catalog) tableFromResponse(identifier []string, metadata
table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
+func (r *Catalog) tableFromResponse(ctx context.Context, identifier []string,
metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table,
error) {
id := identifier
if r.name != "" {
id = append([]string{r.name}, identifier...)
}
- iofs, err := iceio.LoadFS(config, loc)
+ iofs, err := iceio.LoadFS(ctx, config, loc)
if err != nil {
return nil, err
}
@@ -719,7 +719,7 @@ func (r *Catalog) CreateTable(ctx context.Context,
identifier table.Identifier,
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)
- return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc,
config)
+ return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config)
}
func (r *Catalog) CommitTable(ctx context.Context, tbl *table.Table,
requirements []table.Requirement, updates []table.Update) (table.Metadata,
string, error) {
@@ -774,7 +774,7 @@ func (r *Catalog) RegisterTable(ctx context.Context,
identifier table.Identifier
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)
- return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc,
config)
+ return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config)
}
func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier,
props iceberg.Properties) (*table.Table, error) {
@@ -796,7 +796,7 @@ func (r *Catalog) LoadTable(ctx context.Context, identifier
table.Identifier, pr
config[k] = v
}
- return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc,
config)
+ return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config)
}
func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier,
requirements []table.Requirement, updates []table.Update) (*table.Table, error)
{
@@ -824,7 +824,7 @@ func (r *Catalog) UpdateTable(ctx context.Context, ident
table.Identifier, requi
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
- return r.tableFromResponse(ident, ret.Metadata, ret.MetadataLoc, config)
+ return r.tableFromResponse(ctx, ident, ret.Metadata, ret.MetadataLoc,
config)
}
func (r *Catalog) DropTable(ctx context.Context, identifier table.Identifier)
error {
diff --git a/catalog/rest/rest_internal_test.go
b/catalog/rest/rest_internal_test.go
index 126b6bd..0a2e6f1 100644
--- a/catalog/rest/rest_internal_test.go
+++ b/catalog/rest/rest_internal_test.go
@@ -18,6 +18,7 @@
package rest
import (
+ "context"
"encoding/json"
"net/http"
"net/http/httptest"
@@ -59,7 +60,7 @@ func TestAuthHeader(t *testing.T) {
})
})
- cat, err := NewCatalog("rest", srv.URL,
+ cat, err := NewCatalog(context.Background(), "rest", srv.URL,
WithCredential("client:secret"))
require.NoError(t, err)
assert.NotNil(t, cat)
@@ -107,7 +108,7 @@ func TestAuthUriHeader(t *testing.T) {
authUri, err := url.Parse(srv.URL)
require.NoError(t, err)
- cat, err := NewCatalog("rest", srv.URL,
+ cat, err := NewCatalog(context.Background(), "rest", srv.URL,
WithCredential("client:secret"),
WithAuthURI(authUri.JoinPath("auth-token-url")))
require.NoError(t, err)
assert.NotNil(t, cat)
diff --git a/catalog/rest/rest_test.go b/catalog/rest/rest_test.go
index 2210ae1..d687cc6 100644
--- a/catalog/rest/rest_test.go
+++ b/catalog/rest/rest_test.go
@@ -107,7 +107,7 @@ func (r *RestCatalogSuite) TestToken200() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithWarehouseLocation("s3://some-bucket"),
rest.WithCredential(TestCreds),
rest.WithScope(scope))
@@ -118,6 +118,7 @@ func (r *RestCatalogSuite) TestToken200() {
}
func (r *RestCatalogSuite) TestLoadRegisteredCatalog() {
+ ctx := context.Background()
r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
r.Equal(http.MethodPost, req.Method)
@@ -140,7 +141,7 @@ func (r *RestCatalogSuite) TestLoadRegisteredCatalog() {
})
})
- cat, err := catalog.Load("restful", iceberg.Properties{
+ cat, err := catalog.Load(ctx, "restful", iceberg.Properties{
"uri": r.srv.URL,
"warehouse": "s3://some-bucket",
"credential": TestCreds,
@@ -165,7 +166,7 @@ func (r *RestCatalogSuite) TestToken400() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithCredential(TestCreds))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithCredential(TestCreds))
r.Nil(cat)
r.ErrorIs(err, rest.ErrRESTError)
@@ -198,7 +199,7 @@ func (r *RestCatalogSuite) TestToken200AuthUrl() {
authUri, err := url.Parse(r.srv.URL)
r.Require().NoError(err)
- cat, err := rest.NewCatalog("rest", r.srv.URL,
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithWarehouseLocation("s3://some-bucket"),
rest.WithCredential(TestCreds),
rest.WithAuthURI(authUri.JoinPath("auth-token-url")))
@@ -222,7 +223,7 @@ func (r *RestCatalogSuite) TestToken401() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithCredential(TestCreds))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithCredential(TestCreds))
r.Nil(cat)
r.ErrorIs(err, rest.ErrRESTError)
@@ -249,7 +250,7 @@ func (r *RestCatalogSuite) TestListTables200() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
tables, err := cat.ListTables(context.Background(),
catalog.ToIdentifier(namespace))
@@ -298,7 +299,7 @@ func (r *RestCatalogSuite) TestListTablesPrefixed200() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithPrefix("prefix"),
rest.WithWarehouseLocation("s3://some-bucket"),
rest.WithCredential(TestCreds))
@@ -331,7 +332,7 @@ func (r *RestCatalogSuite) TestListTables404() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
_, err = cat.ListTables(context.Background(),
catalog.ToIdentifier(namespace))
@@ -354,7 +355,7 @@ func (r *RestCatalogSuite) TestListNamespaces200() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
results, err := cat.ListNamespaces(context.Background(), nil)
@@ -379,7 +380,7 @@ func (r *RestCatalogSuite) TestListNamespaceWithParent200()
{
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
results, err := cat.ListNamespaces(context.Background(),
catalog.ToIdentifier("accounting"))
@@ -406,7 +407,7 @@ func (r *RestCatalogSuite) TestListNamespaces400() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
_, err = cat.ListNamespaces(context.Background(),
catalog.ToIdentifier("accounting"))
@@ -439,7 +440,7 @@ func (r *RestCatalogSuite) TestCreateNamespace200() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
r.Require().NoError(cat.CreateNamespace(context.Background(),
catalog.ToIdentifier("leden"), nil))
@@ -454,7 +455,7 @@ func (r *RestCatalogSuite) TestCheckNamespaceExists204() {
}
w.WriteHeader(http.StatusNoContent)
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
exists, err := cat.CheckNamespaceExists(context.Background(),
catalog.ToIdentifier("leden"))
@@ -483,7 +484,7 @@ func (r *RestCatalogSuite) TestCheckNamespaceExists404() {
}
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
exists, err := cat.CheckNamespaceExists(context.Background(),
catalog.ToIdentifier("noneexistent"))
@@ -516,7 +517,7 @@ func (r *RestCatalogSuite)
TestCreateNamespaceWithProps200() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
r.Require().NoError(cat.CreateNamespace(context.Background(),
catalog.ToIdentifier("leden"), iceberg.Properties{"foo": "bar", "super":
"duper"}))
@@ -552,7 +553,7 @@ func (r *RestCatalogSuite) TestCreateNamespace409() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
err = cat.CreateNamespace(context.Background(),
catalog.ToIdentifier("fokko"), nil)
@@ -571,7 +572,7 @@ func (r *RestCatalogSuite) TestDropNamespace204() {
w.WriteHeader(http.StatusNoContent)
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
r.NoError(cat.DropNamespace(context.Background(),
catalog.ToIdentifier("examples")))
@@ -595,7 +596,7 @@ func (r *RestCatalogSuite) TestDropNamespace404() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
err = cat.DropNamespace(context.Background(),
catalog.ToIdentifier("examples"))
@@ -618,7 +619,7 @@ func (r *RestCatalogSuite) TestLoadNamespaceProps200() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
props, err := cat.LoadNamespaceProperties(context.Background(),
catalog.ToIdentifier("leden"))
@@ -644,7 +645,7 @@ func (r *RestCatalogSuite) TestLoadNamespaceProps404() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
_, err = cat.LoadNamespaceProperties(context.Background(),
catalog.ToIdentifier("leden"))
@@ -667,7 +668,7 @@ func (r *RestCatalogSuite) TestUpdateNamespaceProps200() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
summary, err := cat.UpdateNamespaceProperties(context.Background(),
table.Identifier([]string{"fokko"}),
@@ -699,7 +700,7 @@ func (r *RestCatalogSuite) TestUpdateNamespaceProps404() {
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
_, err = cat.UpdateNamespaceProperties(context.Background(),
@@ -785,7 +786,7 @@ func (r *RestCatalogSuite) TestCreateTable200() {
t := createTableRestExample
_ = t
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
tbl, err := cat.CreateTable(
@@ -827,7 +828,7 @@ func (r *RestCatalogSuite) TestCreateTable409() {
json.NewEncoder(w).Encode(errorResponse)
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
// Attempt to create table with properties
@@ -854,7 +855,7 @@ func (r *RestCatalogSuite) TestCheckTableExists204() {
w.WriteHeader(http.StatusNoContent)
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
exists, err := cat.CheckTableExists(context.Background(),
catalog.ToIdentifier("fokko", "fokko2"))
@@ -879,7 +880,7 @@ func (r *RestCatalogSuite) TestCheckTableExists404() {
},
})
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
exists, err := cat.CheckTableExists(context.Background(),
catalog.ToIdentifier("fokko", "nonexistent"))
@@ -964,7 +965,7 @@ func (r *RestCatalogSuite) TestLoadTable200() {
}`))
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
tbl, err := cat.LoadTable(context.Background(),
catalog.ToIdentifier("fokko", "table"), nil)
@@ -1045,7 +1046,7 @@ func (r *RestCatalogSuite) TestRenameTable200() {
w.Write([]byte(createTableRestExample))
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
fromIdent := catalog.ToIdentifier("fokko", "source")
@@ -1079,7 +1080,7 @@ func (r *RestCatalogSuite) TestDropTable204() {
w.WriteHeader(http.StatusNoContent)
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
err = cat.DropTable(context.Background(), catalog.ToIdentifier("fokko",
"table"))
@@ -1106,7 +1107,7 @@ func (r *RestCatalogSuite) TestDropTable404() {
json.NewEncoder(w).Encode(errorResponse)
})
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Require().NoError(err)
err = cat.DropTable(context.Background(), catalog.ToIdentifier("fokko",
"table"))
@@ -1148,14 +1149,15 @@ func (r *RestTLSCatalogSuite) TearDownTest() {
}
func (r *RestTLSCatalogSuite) TestSSLFail() {
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken))
r.Nil(cat)
r.ErrorContains(err, "tls: failed to verify certificate")
}
func (r *RestTLSCatalogSuite) TestSSLLoadRegisteredCatalog() {
- cat, err := catalog.Load("foobar", iceberg.Properties{
+ ctx := context.Background()
+ cat, err := catalog.Load(ctx, "foobar", iceberg.Properties{
"uri": r.srv.URL,
"warehouse": "s3://some-bucket",
"token": TestToken,
@@ -1168,7 +1170,7 @@ func (r *RestTLSCatalogSuite)
TestSSLLoadRegisteredCatalog() {
}
func (r *RestTLSCatalogSuite) TestSSLConfig() {
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken),
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken),
rest.WithWarehouseLocation("s3://some-bucket"),
rest.WithTLSConfig(&tls.Config{InsecureSkipVerify: true}))
r.NoError(err)
@@ -1187,7 +1189,7 @@ func (r *RestTLSCatalogSuite) TestSSLCerts() {
}
}
- cat, err := rest.NewCatalog("rest", r.srv.URL,
rest.WithOAuthToken(TestToken),
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
rest.WithOAuthToken(TestToken),
rest.WithWarehouseLocation("s3://some-bucket"),
rest.WithTLSConfig(&tls.Config{RootCAs: certs}))
r.NoError(err)
diff --git a/catalog/sql/sql.go b/catalog/sql/sql.go
index 010744c..22edee1 100644
--- a/catalog/sql/sql.go
+++ b/catalog/sql/sql.go
@@ -62,7 +62,7 @@ const (
)
func init() {
- catalog.Register("sql", catalog.RegistrarFunc(func(name string, p
iceberg.Properties) (c catalog.Catalog, err error) {
+ catalog.Register("sql", catalog.RegistrarFunc(func(ctx context.Context,
name string, p iceberg.Properties) (c catalog.Catalog, err error) {
driver, ok := p[DriverKey]
if !ok {
return nil, errors.New("must provide driver to pass to
sql.Open")
@@ -315,7 +315,7 @@ func (c *Catalog) CreateTable(ctx context.Context, ident
table.Identifier, sc *i
return nil, err
}
- if err := internal.WriteMetadata(metadata, metadataLocation, c.props);
err != nil {
+ if err := internal.WriteMetadata(ctx, metadata, metadataLocation,
c.props); err != nil {
return nil, err
}
@@ -379,7 +379,7 @@ func (c *Catalog) LoadTable(ctx context.Context, identifier
table.Identifier, pr
tblProps := maps.Clone(c.props)
maps.Copy(props, tblProps)
- iofs, err := io.LoadFS(tblProps, result.MetadataLocation.String)
+ iofs, err := io.LoadFS(ctx, tblProps, result.MetadataLocation.String)
if err != nil {
return nil, err
}
diff --git a/catalog/sql/sql_test.go b/catalog/sql/sql_test.go
index 5e26c63..2cc4c61 100644
--- a/catalog/sql/sql_test.go
+++ b/catalog/sql/sql_test.go
@@ -90,15 +90,15 @@ var (
)
func TestCreateSQLCatalogNoDriverDialect(t *testing.T) {
- _, err := catalog.Load("sql", iceberg.Properties{})
+ _, err := catalog.Load(context.Background(), "sql",
iceberg.Properties{})
assert.Error(t, err)
- _, err = catalog.Load("sql", iceberg.Properties{sqlcat.DriverKey:
"sqlite"})
+ _, err = catalog.Load(context.Background(), "sql",
iceberg.Properties{sqlcat.DriverKey: "sqlite"})
assert.Error(t, err)
}
func TestInvalidDialect(t *testing.T) {
- _, err := catalog.Load("sql", iceberg.Properties{
+ _, err := catalog.Load(context.Background(), "sql", iceberg.Properties{
sqlcat.DriverKey: sqliteshim.ShimName,
sqlcat.DialectKey: "foobar",
})
@@ -193,7 +193,7 @@ func (s *SqliteCatalogTestSuite) confirmTablesExist(db
*sql.DB) {
}
func (s *SqliteCatalogTestSuite) loadCatalogForTableCreation() *sqlcat.Catalog
{
- cat, err := catalog.Load("default", iceberg.Properties{
+ cat, err := catalog.Load(context.Background(), "default",
iceberg.Properties{
"uri": s.catalogUri(),
sqlcat.DriverKey: sqliteshim.ShimName,
sqlcat.DialectKey: string(sqlcat.SQLite),
@@ -217,7 +217,7 @@ func (s *SqliteCatalogTestSuite) getDB() *sql.DB {
}
func (s *SqliteCatalogTestSuite) getCatalogMemory() *sqlcat.Catalog {
- cat, err := catalog.Load("default", iceberg.Properties{
+ cat, err := catalog.Load(context.Background(), "default",
iceberg.Properties{
"uri": ":memory:",
sqlcat.DriverKey: sqliteshim.ShimName,
sqlcat.DialectKey: string(sqlcat.SQLite),
@@ -230,7 +230,7 @@ func (s *SqliteCatalogTestSuite) getCatalogMemory()
*sqlcat.Catalog {
}
func (s *SqliteCatalogTestSuite) getCatalogSqlite() *sqlcat.Catalog {
- cat, err := catalog.Load("default", iceberg.Properties{
+ cat, err := catalog.Load(context.Background(), "default",
iceberg.Properties{
"uri": s.catalogUri(),
sqlcat.DriverKey: sqliteshim.ShimName,
sqlcat.DialectKey: string(sqlcat.SQLite),
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index 7fcac03..a8e1f6b 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -110,6 +110,7 @@ type Config struct {
}
func main() {
+ ctx := context.Background()
args, err := docopt.ParseArgs(usage, os.Args[1:], iceberg.Version())
if err != nil {
log.Fatal(err)
@@ -148,11 +149,11 @@ func main() {
opts = append(opts,
rest.WithWarehouseLocation(cfg.Warehouse))
}
- if cat, err = rest.NewCatalog("rest", cfg.URI, opts...); err !=
nil {
+ if cat, err = rest.NewCatalog(ctx, "rest", cfg.URI, opts...);
err != nil {
log.Fatal(err)
}
case catalog.Glue:
- awscfg, err := awsconfig.LoadDefaultConfig(context.Background())
+ awscfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
@@ -166,7 +167,7 @@ func main() {
switch {
case cfg.List:
- list(output, cat, cfg.Parent)
+ list(ctx, output, cat, cfg.Parent)
case cfg.Describe:
entityType := "any"
if cfg.Namespace {
@@ -175,21 +176,21 @@ func main() {
entityType = "tbl"
}
- describe(output, cat, cfg.Ident, entityType)
+ describe(ctx, output, cat, cfg.Ident, entityType)
case cfg.Schema:
- tbl := loadTable(output, cat, cfg.TableID)
+ tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Schema(tbl.Schema())
case cfg.Spec:
- tbl := loadTable(output, cat, cfg.TableID)
+ tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Spec(tbl.Spec())
case cfg.Location:
- tbl := loadTable(output, cat, cfg.TableID)
+ tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Text(tbl.Location())
case cfg.Uuid:
- tbl := loadTable(output, cat, cfg.TableID)
+ tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Uuid(tbl.Metadata().TableUUID())
case cfg.Props:
- properties(output, cat, propCmd{
+ properties(ctx, output, cat, propCmd{
get: cfg.Get, set: cfg.Set, remove: cfg.Remove,
namespace: cfg.Namespace, table: cfg.Table,
identifier: cfg.Ident,
@@ -197,7 +198,7 @@ func main() {
value: cfg.Value,
})
case cfg.Rename:
- _, err := cat.RenameTable(context.Background(),
+ _, err := cat.RenameTable(ctx,
catalog.ToIdentifier(cfg.RenameFrom),
catalog.ToIdentifier(cfg.RenameTo))
if err != nil {
output.Error(err)
@@ -208,13 +209,13 @@ func main() {
case cfg.Drop:
switch {
case cfg.Namespace:
- err := cat.DropNamespace(context.Background(),
catalog.ToIdentifier(cfg.Ident))
+ err := cat.DropNamespace(ctx,
catalog.ToIdentifier(cfg.Ident))
if err != nil {
output.Error(err)
os.Exit(1)
}
case cfg.Table:
- err := cat.DropTable(context.Background(),
catalog.ToIdentifier(cfg.Ident))
+ err := cat.DropTable(ctx,
catalog.ToIdentifier(cfg.Ident))
if err != nil {
output.Error(err)
os.Exit(1)
@@ -233,7 +234,7 @@ func main() {
props["Location"] = cfg.LocationURI
}
- err := cat.CreateNamespace(context.Background(),
catalog.ToIdentifier(cfg.Ident), props)
+ err := cat.CreateNamespace(ctx,
catalog.ToIdentifier(cfg.Ident), props)
if err != nil {
output.Error(err)
os.Exit(1)
@@ -245,22 +246,22 @@ func main() {
os.Exit(1)
}
case cfg.Files:
- tbl := loadTable(output, cat, cfg.TableID)
+ tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Files(tbl, cfg.History)
}
}
-func list(output Output, cat catalog.Catalog, parent string) {
+func list(ctx context.Context, output Output, cat catalog.Catalog, parent
string) {
prnt := catalog.ToIdentifier(parent)
- ids, err := cat.ListNamespaces(context.Background(), prnt)
+ ids, err := cat.ListNamespaces(ctx, prnt)
if err != nil {
output.Error(err)
os.Exit(1)
}
if len(ids) == 0 && parent != "" {
- ids, err = cat.ListTables(context.Background(), prnt)
+ ids, err = cat.ListTables(ctx, prnt)
if err != nil {
output.Error(err)
os.Exit(1)
@@ -269,9 +270,7 @@ func list(output Output, cat catalog.Catalog, parent
string) {
output.Identifiers(ids)
}
-func describe(output Output, cat catalog.Catalog, id string, entityType
string) {
- ctx := context.Background()
-
+func describe(ctx context.Context, output Output, cat catalog.Catalog, id
string, entityType string) {
ident := catalog.ToIdentifier(id)
isNS, isTbl := false, false
@@ -313,8 +312,8 @@ func describe(output Output, cat catalog.Catalog, id
string, entityType string)
}
}
-func loadTable(output Output, cat catalog.Catalog, id string) *table.Table {
- tbl, err := cat.LoadTable(context.Background(),
catalog.ToIdentifier(id), nil)
+func loadTable(ctx context.Context, output Output, cat catalog.Catalog, id
string) *table.Table {
+ tbl, err := cat.LoadTable(ctx, catalog.ToIdentifier(id), nil)
if err != nil {
output.Error(err)
os.Exit(1)
@@ -330,8 +329,8 @@ type propCmd struct {
identifier, propname, value string
}
-func properties(output Output, cat catalog.Catalog, args propCmd) {
- ctx, ident := context.Background(),
catalog.ToIdentifier(args.identifier)
+func properties(ctx context.Context, output Output, cat catalog.Catalog, args
propCmd) {
+ ident := catalog.ToIdentifier(args.identifier)
switch {
case args.get:
@@ -345,7 +344,7 @@ func properties(output Output, cat catalog.Catalog, args
propCmd) {
os.Exit(1)
}
case args.table:
- tbl := loadTable(output, cat, args.identifier)
+ tbl := loadTable(ctx, output, cat, args.identifier)
props = tbl.Metadata().Properties()
}
@@ -372,7 +371,7 @@ func properties(output Output, cat catalog.Catalog, args
propCmd) {
output.Text("updated " + args.propname + " on " +
args.identifier)
case args.table:
- loadTable(output, cat, args.identifier)
+ loadTable(ctx, output, cat, args.identifier)
output.Text("Setting " + args.propname + "=" +
args.value + " on " + args.identifier)
output.Error(errors.New("not implemented: Writing is
WIP"))
}
@@ -388,7 +387,7 @@ func properties(output Output, cat catalog.Catalog, args
propCmd) {
output.Text("removing " + args.propname + " from " +
args.identifier)
case args.table:
- loadTable(output, cat, args.identifier)
+ loadTable(ctx, output, cat, args.identifier)
output.Text("Setting " + args.propname + "=" +
args.value + " on " + args.identifier)
output.Error(errors.New("not implemented: Writing is
WIP"))
}
diff --git a/io/blob.go b/io/blob.go
index 18add29..3cbaee0 100644
--- a/io/blob.go
+++ b/io/blob.go
@@ -38,10 +38,11 @@ type blobOpenFile struct {
name, key string
b *blobFileIO
+ ctx context.Context
}
func (f *blobOpenFile) ReadAt(p []byte, off int64) (int, error) {
- rdr, err := f.b.Bucket.NewRangeReader(context.Background(), f.key, off,
int64(len(p)), nil)
+ rdr, err := f.b.Bucket.NewRangeReader(f.ctx, f.key, off, int64(len(p)),
nil)
if err != nil {
return 0, err
}
@@ -74,6 +75,7 @@ type blobFileIO struct {
*blob.Bucket
bucketName string
+ ctx context.Context
}
func (bfs *blobFileIO) preprocess(key string) string {
@@ -94,20 +96,20 @@ func (bfs *blobFileIO) Open(path string) (fs.File, error) {
key, name = path, filepath.Base(path)
)
- r, err := bfs.NewReader(context.Background(), key, nil)
+ r, err := bfs.NewReader(bfs.ctx, key, nil)
if err != nil {
return nil, err
}
- return &blobOpenFile{Reader: r, name: name, key: key, b: bfs}, nil
+ return &blobOpenFile{Reader: r, name: name, key: key, b: bfs, ctx:
bfs.ctx}, nil
}
func (bfs *blobFileIO) Remove(name string) error {
- return bfs.Bucket.Delete(context.Background(), name)
+ return bfs.Bucket.Delete(bfs.ctx, name)
}
func (bfs *blobFileIO) Create(name string) (FileWriter, error) {
- return bfs.NewWriter(name, true, nil)
+ return bfs.NewWriter(bfs.ctx, name, true, nil)
}
// NewWriter returns a Writer that writes to the blob stored at path.
@@ -118,13 +120,12 @@ func (bfs *blobFileIO) Create(name string) (FileWriter,
error) {
//
// The caller must call Close on the returned Writer, even if the write is
// aborted.
-func (io *blobFileIO) NewWriter(path string, overwrite bool, opts
*blob.WriterOptions) (w *blobWriteFile, err error) {
+func (io *blobFileIO) NewWriter(ctx context.Context, path string, overwrite
bool, opts *blob.WriterOptions) (w *blobWriteFile, err error) {
if !fs.ValidPath(path) {
return nil, &fs.PathError{Op: "new writer", Path: path, Err:
fs.ErrInvalid}
}
path = io.preprocess(path)
- ctx := context.Background()
if !overwrite {
if exists, err := io.Bucket.Exists(ctx, path); exists {
if err != nil {
@@ -143,8 +144,8 @@ func (io *blobFileIO) NewWriter(path string, overwrite
bool, opts *blob.WriterOp
nil
}
-func createBlobFS(bucket *blob.Bucket, bucketName string) IO {
- iofs := &blobFileIO{Bucket: bucket, bucketName: bucketName}
+func createBlobFS(ctx context.Context, bucket *blob.Bucket, bucketName string)
IO {
+ iofs := &blobFileIO{Bucket: bucket, bucketName: bucketName, ctx: ctx}
return FSPreProcName(iofs, iofs.preprocess)
}
diff --git a/io/io.go b/io/io.go
index 16df880..82d2c75 100644
--- a/io/io.go
+++ b/io/io.go
@@ -229,13 +229,12 @@ func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error)
{
return d.ReadDir(count)
}
-func inferFileIOFromSchema(path string, props map[string]string) (IO, error) {
+func inferFileIOFromSchema(ctx context.Context, path string, props
map[string]string) (IO, error) {
parsed, err := url.Parse(path)
if err != nil {
return nil, err
}
var bucket *blob.Bucket
- ctx := context.Background()
switch parsed.Scheme {
case "s3", "s3a", "s3n":
@@ -256,7 +255,7 @@ func inferFileIOFromSchema(path string, props
map[string]string) (IO, error) {
default:
return nil, fmt.Errorf("IO for file '%s' not implemented", path)
}
- return createBlobFS(bucket, parsed.Host), nil
+ return createBlobFS(ctx, bucket, parsed.Host), nil
}
// LoadFS takes a map of properties and an optional URI location
@@ -267,12 +266,12 @@ func inferFileIOFromSchema(path string, props
map[string]string) (IO, error) {
// does not yet have an implementation here.
//
// Currently local, S3, GCS, and In-Memory FSs are implemented.
-func LoadFS(props map[string]string, location string) (IO, error) {
+func LoadFS(ctx context.Context, props map[string]string, location string)
(IO, error) {
if location == "" {
location = props["warehouse"]
}
- iofs, err := inferFileIOFromSchema(location, props)
+ iofs, err := inferFileIOFromSchema(ctx, location, props)
if err != nil {
return nil, err
}
diff --git a/io/s3.go b/io/s3.go
index 430ac2f..de23a4b 100644
--- a/io/s3.go
+++ b/io/s3.go
@@ -55,7 +55,7 @@ var unsupportedS3Props = []string{
}
// ParseAWSConfig parses S3 properties and returns a configuration.
-func ParseAWSConfig(props map[string]string) (*aws.Config, error) {
+func ParseAWSConfig(ctx context.Context, props map[string]string)
(*aws.Config, error) {
// If any unsupported properties are set, return an error.
for k := range props {
if slices.Contains(unsupportedS3Props, k) {
@@ -98,7 +98,7 @@ func ParseAWSConfig(props map[string]string) (*aws.Config,
error) {
awscfg := new(aws.Config)
var err error
- *awscfg, err = config.LoadDefaultConfig(context.Background(), opts...)
+ *awscfg, err = config.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, err
}
@@ -107,7 +107,7 @@ func ParseAWSConfig(props map[string]string) (*aws.Config,
error) {
}
func createS3Bucket(ctx context.Context, parsed *url.URL, props
map[string]string) (*blob.Bucket, error) {
- awscfg, err := ParseAWSConfig(props)
+ awscfg, err := ParseAWSConfig(ctx, props)
if err != nil {
return nil, err
}
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index 5b2bcab..f87514a 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -428,7 +428,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context,
task internal.Enumerat
pipeline = append(pipeline, func(r arrow.Record) (arrow.Record, error) {
defer r.Release()
- return ToRequestedSchema(as.projectedSchema, iceSchema, r,
false, false, as.useLargeTypes)
+ return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r,
false, false, as.useLargeTypes)
})
err = as.processRecords(ctx, task, iceSchema, rdr, colIndices,
pipeline, out)
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 310fbfb..2bfe881 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -840,13 +840,13 @@ func (a *arrowProjectionVisitor) Primitive(_
iceberg.PrimitiveType, arr arrow.Ar
// ToRequestedSchema will construct a new record batch matching the requested
iceberg schema
// casting columns if necessary as appropriate.
-func ToRequestedSchema(requested, fileSchema *iceberg.Schema, batch
arrow.Record, downcastTimestamp, includeFieldIDs, useLargeTypes bool)
(arrow.Record, error) {
+func ToRequestedSchema(ctx context.Context, requested, fileSchema
*iceberg.Schema, batch arrow.Record, downcastTimestamp, includeFieldIDs,
useLargeTypes bool) (arrow.Record, error) {
st := array.RecordToStructArray(batch)
defer st.Release()
result, err := iceberg.VisitSchemaWithPartner[arrow.Array,
arrow.Array](requested, st,
&arrowProjectionVisitor{
- ctx: context.Background(),
+ ctx: ctx,
fileSchema: fileSchema,
includeFieldIDs: includeFieldIDs,
downcastNsTimestamp: downcastTimestamp,
diff --git a/table/arrow_utils_test.go b/table/arrow_utils_test.go
index 4dc12f2..134d34d 100644
--- a/table/arrow_utils_test.go
+++ b/table/arrow_utils_test.go
@@ -481,6 +481,7 @@ func ArrowRecordWithAllTimestampPrec(mem memory.Allocator)
arrow.Record {
}
func TestToRequestedSchemaTimestamps(t *testing.T) {
+ ctx := context.Background()
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
@@ -490,7 +491,7 @@ func TestToRequestedSchemaTimestamps(t *testing.T) {
requestedSchema := TableSchemaWithAllMicrosecondsTimestampPrec
fileSchema := requestedSchema
- converted, err := table.ToRequestedSchema(requestedSchema, fileSchema,
batch, true, false, false)
+ converted, err := table.ToRequestedSchema(ctx, requestedSchema,
fileSchema, batch, true, false, false)
require.NoError(t, err)
defer converted.Release()
diff --git a/table/scanner_test.go b/table/scanner_test.go
index 9775ca9..761d941 100644
--- a/table/scanner_test.go
+++ b/table/scanner_test.go
@@ -52,7 +52,7 @@ type ScannerSuite struct {
func (s *ScannerSuite) SetupTest() {
s.ctx = context.Background()
- cat, err := rest.NewCatalog("rest", "http://localhost:8181")
+ cat, err := rest.NewCatalog(s.ctx, "rest", "http://localhost:8181")
s.Require().NoError(err)
s.cat = cat