This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new b58a334 feat(catalog): Add Catalog Registry (#244)
b58a334 is described below
commit b58a334c1592455804392b16bc90bbce1734da19
Author: Matt Topol <[email protected]>
AuthorDate: Fri Jan 10 15:17:42 2025 -0500
feat(catalog): Add Catalog Registry (#244)
* feat(catalog): Add Catalog Registry
* remove doc for SQL catalog since I factored the registry out for a
smaller PR
* fixup tests
* use `glue.access-key-id` for property name
* use constants and allow setting *some* access fields for glue
* Update catalog/registry.go
Co-authored-by: Kevin Liu <[email protected]>
* Update catalog/registry.go
Co-authored-by: Kevin Liu <[email protected]>
---------
Co-authored-by: Kevin Liu <[email protected]>
---
catalog/catalog.go | 51 ++++++++++++++++++
catalog/glue.go | 101 +++++++++++++++++++++--------------
catalog/registry.go | 135 +++++++++++++++++++++++++++++++++++++++++++++++
catalog/registry_test.go | 71 +++++++++++++++++++++++++
catalog/rest.go | 53 ++++++++++++++++---
catalog/rest_test.go | 45 ++++++++++++++++
6 files changed, 407 insertions(+), 49 deletions(-)
diff --git a/catalog/catalog.go b/catalog/catalog.go
index 9b5776d..0bc8f49 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -21,6 +21,8 @@ import (
"context"
"crypto/tls"
"errors"
+ "fmt"
+ "maps"
"net/url"
"github.com/apache/iceberg-go"
@@ -46,6 +48,8 @@ var (
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
ErrTableAlreadyExists = errors.New("table already exists")
+ ErrCatalogNotFound = errors.New("catalog type not registered")
+ ErrNamespaceNotEmpty = errors.New("namespace is not empty")
)
// WithAwsConfig sets the AWS configuration for the catalog.
@@ -195,3 +199,50 @@ func TableNameFromIdent(ident table.Identifier) string {
func NamespaceFromIdent(ident table.Identifier) table.Identifier {
return ident[:len(ident)-1]
}
+
+func checkForOverlap(removals []string, updates iceberg.Properties) error {
+ overlap := []string{}
+ for _, key := range removals {
+ if _, ok := updates[key]; ok {
+ overlap = append(overlap, key)
+ }
+ }
+ if len(overlap) > 0 {
+ return fmt.Errorf("conflict between removals and updates for
keys: %v", overlap)
+ }
+ return nil
+}
+
+func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals
[]string, updates iceberg.Properties) (iceberg.Properties,
PropertiesUpdateSummary, error) {
+ if err := checkForOverlap(removals, updates); err != nil {
+ return nil, PropertiesUpdateSummary{}, err
+ }
+
+ var (
+ updatedProps = maps.Clone(currentProps)
+ removed = make([]string, 0, len(removals))
+ updated = make([]string, 0, len(updates))
+ )
+
+ for _, key := range removals {
+ if _, exists := updatedProps[key]; exists {
+ delete(updatedProps, key)
+ removed = append(removed, key)
+ }
+ }
+
+ for key, value := range updates {
+ if updatedProps[key] != value {
+ updated = append(updated, key)
+ updatedProps[key] = value
+ }
+ }
+
+ summary := PropertiesUpdateSummary{
+ Removed: removed,
+ Updated: updated,
+ Missing: iceberg.Difference(removals, removed),
+ }
+
+ return updatedProps, summary, nil
+}
diff --git a/catalog/glue.go b/catalog/glue.go
index c970e5a..245116b 100644
--- a/catalog/glue.go
+++ b/catalog/glue.go
@@ -21,11 +21,14 @@ import (
"context"
"errors"
"fmt"
+ "strconv"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/glue"
"github.com/aws/aws-sdk-go-v2/service/glue/types"
)
@@ -47,13 +50,65 @@ const (
// The ID of the Glue Data Catalog where the tables reside. If none is
provided, Glue
// automatically uses the caller's AWS account ID by default.
// See:
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
- glueCatalogIdKey = "glue.id"
+ GlueCatalogIdKey = "glue.id"
+
+ GlueAccessKeyID = "glue.access-key-id"
+ GlueSecretAccessKey = "glue.secret-access-key"
+ GlueSessionToken = "glue.session-token"
+ GlueRegion = "glue.region"
+ GlueEndpoint = "glue.endpoint"
+ GlueMaxRetries = "glue.max-retries"
+ GlueRetryMode = "glue.retry-mode"
)
var (
_ Catalog = (*GlueCatalog)(nil)
)
+func init() {
+ Register("glue", RegistrarFunc(func(_ string, props iceberg.Properties)
(Catalog, error) {
+ awsConfig, err := toAwsConfig(props)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewGlueCatalog(WithAwsConfig(awsConfig),
WithAwsProperties(AwsProperties(props))), nil
+ }))
+}
+
+func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
+ opts := make([]func(*config.LoadOptions) error, 0)
+
+ for k, v := range p {
+ switch k {
+ case GlueRegion:
+ opts = append(opts, config.WithRegion(v))
+ case GlueEndpoint:
+ opts = append(opts, config.WithBaseEndpoint(v))
+ case GlueMaxRetries:
+ maxRetry, err := strconv.Atoi(v)
+ if err != nil {
+ return aws.Config{}, err
+ }
+ opts = append(opts,
config.WithRetryMaxAttempts(maxRetry))
+ case GlueRetryMode:
+ m, err := aws.ParseRetryMode(v)
+ if err != nil {
+ return aws.Config{}, err
+ }
+ opts = append(opts, config.WithRetryMode(m))
+ }
+ }
+
+ key, secret, token := p[GlueAccessKeyID], p[GlueSecretAccessKey],
p[GlueSessionToken]
+ if key != "" || secret != "" || token != "" {
+ opts = append(opts, config.WithCredentialsProvider(
+ credentials.NewStaticCredentialsProvider(key, secret,
token)))
+ }
+
+ return config.LoadDefaultConfig(context.Background(), opts...)
+}
+
type glueAPI interface {
CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns
...func(*glue.Options)) (*glue.CreateTableOutput, error)
GetTable(ctx context.Context, params *glue.GetTableInput, optFns
...func(*glue.Options)) (*glue.GetTableOutput, error)
@@ -80,7 +135,7 @@ func NewGlueCatalog(opts ...Option[GlueCatalog])
*GlueCatalog {
}
var catalogId *string
- if val, ok := glueOps.awsProperties[glueCatalogIdKey]; ok {
+ if val, ok := glueOps.awsProperties[GlueCatalogIdKey]; ok {
catalogId = &val
} else {
catalogId = nil
@@ -353,39 +408,9 @@ func (c *GlueCatalog) UpdateNamespaceProperties(ctx
context.Context, namespace t
return PropertiesUpdateSummary{}, err
}
- overlap := []string{}
- for _, key := range removals {
- if _, exists := updates[key]; exists {
- overlap = append(overlap, key)
- }
- }
- if len(overlap) > 0 {
- return PropertiesUpdateSummary{}, fmt.Errorf("conflict between
removals and updates for keys: %v", overlap)
- }
-
- updatedProperties := make(map[string]string)
- if database.Parameters != nil {
- for k, v := range database.Parameters {
- updatedProperties[k] = v
- }
- }
-
- // Removals.
- removed := []string{}
- for _, key := range removals {
- if _, exists := updatedProperties[key]; exists {
- delete(updatedProperties, key)
- removed = append(removed, key)
- }
- }
-
- // Updates.
- updated := []string{}
- for key, value := range updates {
- if updatedProperties[key] != value {
- updatedProperties[key] = value
- updated = append(updated, key)
- }
+ updatedProperties, propertiesUpdateSummary, err :=
getUpdatedPropsAndUpdateSummary(database.Parameters, removals, updates)
+ if err != nil {
+ return PropertiesUpdateSummary{}, err
}
_, err = c.glueSvc.UpdateDatabase(ctx,
&glue.UpdateDatabaseInput{CatalogId: c.catalogId, Name:
aws.String(databaseName), DatabaseInput: &types.DatabaseInput{
@@ -396,12 +421,6 @@ func (c *GlueCatalog) UpdateNamespaceProperties(ctx
context.Context, namespace t
return PropertiesUpdateSummary{}, fmt.Errorf("failed to update
namespace properties %s: %w", databaseName, err)
}
- propertiesUpdateSummary := PropertiesUpdateSummary{
- Removed: removed,
- Updated: updated,
- Missing: iceberg.Difference(removals, removed),
- }
-
return propertiesUpdateSummary, nil
}
diff --git a/catalog/registry.go b/catalog/registry.go
new file mode 100644
index 0000000..1c88351
--- /dev/null
+++ b/catalog/registry.go
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package catalog
+
+import (
+ "fmt"
+ "maps"
+ "net/url"
+ "slices"
+ "strings"
+ "sync"
+
+ "github.com/apache/iceberg-go"
+)
+
+type registry map[string]Registrar
+
+func (r registry) getKeys() []string {
+ regMutex.Lock()
+ defer regMutex.Unlock()
+ return slices.Collect(maps.Keys(r))
+}
+
+func (r registry) set(catalogType string, reg Registrar) {
+ regMutex.Lock()
+ defer regMutex.Unlock()
+ r[catalogType] = reg
+}
+
+func (r registry) get(catalogType string) (Registrar, bool) {
+ regMutex.Lock()
+ defer regMutex.Unlock()
+ reg, ok := r[catalogType]
+ return reg, ok
+}
+
+func (r registry) remove(catalogType string) {
+ regMutex.Lock()
+ defer regMutex.Unlock()
+ delete(r, catalogType)
+}
+
+var (
+ regMutex sync.Mutex
+ defaultRegistry = registry{}
+)
+
+// Registrar is a factory for creating Catalog instances, used for registering
to use
+// with LoadCatalog.
+type Registrar interface {
+ GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error)
+}
+
+type RegistrarFunc func(string, iceberg.Properties) (Catalog, error)
+
+func (f RegistrarFunc) GetCatalog(catalogURI string, props iceberg.Properties)
(Catalog, error) {
+ return f(catalogURI, props)
+}
+
+// Register adds the new catalog type to the registry. If the catalog type is
already registered, it will be replaced.
+func Register(catalogType string, reg Registrar) {
+ if reg == nil {
+ panic("catalog: RegisterCatalog catalog factory is nil")
+ }
+ defaultRegistry.set(catalogType, reg)
+}
+
+// Unregister removes the requested catalog factory from the registry.
+func Unregister(catalogType string) {
+ defaultRegistry.remove(catalogType)
+}
+
+// GetRegsisteredCatalogs returns the list of registered catalog names that can
+// be looked up via LoadCatalog.
+func GetRegisteredCatalogs() []string {
+ return defaultRegistry.getKeys()
+}
+
+// Load allows loading a specific catalog by URI and properties.
+//
+// This is utilized alongside RegisterCatalog/UnregisterCatalog to not only
allow
+// easier catalog loading but also to allow for custom catalog implementations
to
+// be registered and loaded external to this module.
+//
+// The URI is used to determine the catalog type by first checking if it
contains
+// the string "://" indicating the presence of a scheme. If so, the scheme is
used
+// to lookup the registered catalog. i.e. "glue://..." would return the Glue
catalog
+// implementation, passing the URI and properties to NewGlueCatalog. If no
scheme is
+// present, then the URI is used as-is to lookup the catalog factory function.
+//
+// Currently the following catalogs are registered by default:
+//
+// - "glue" for AWS Glue Data Catalog, the rest of the URI is ignored, all
configuration
+// should be provided using the properties. "glue.region", "glue.endpoint",
+// "glue.max-retries", etc. Default AWS credentials are used if found, or
can be
+// overridden by setting "glue.access-key-id", "glue.secret-access-key",
and "glue.session-token".
+//
+// - "rest" for a REST API catalog, if the properties have a "uri" key, then
that will be used
+// as the REST endpoint, otherwise the URI is used as the endpoint. The
REST catalog also
+// registers "http" and "https" so that Load with an http/s URI will
automatically
+// load the REST Catalog.
+func Load(catalogURI string, props iceberg.Properties) (Catalog, error) {
+ var catalogType string
+ if strings.Contains(catalogURI, "://") {
+ parsed, err := url.Parse(catalogURI)
+ if err != nil {
+ return nil, err
+ }
+ catalogType = parsed.Scheme
+ } else {
+ catalogType = catalogURI
+ }
+
+ cat, ok := defaultRegistry.get(catalogType)
+ if !ok {
+ return nil, fmt.Errorf("%w: %s", ErrCatalogNotFound,
catalogType)
+ }
+
+ return cat.GetCatalog(catalogURI, props)
+}
diff --git a/catalog/registry_test.go b/catalog/registry_test.go
new file mode 100644
index 0000000..c132656
--- /dev/null
+++ b/catalog/registry_test.go
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package catalog_test
+
+import (
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/catalog"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCatalogRegistry(t *testing.T) {
+ assert.ElementsMatch(t, []string{
+ "rest",
+ "http",
+ "https",
+ "glue",
+ }, catalog.GetRegisteredCatalogs())
+
+ catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p
iceberg.Properties) (catalog.Catalog, error) {
+ assert.Equal(t, "foobar", s)
+ assert.Equal(t, "baz", p.Get("foo", ""))
+ return nil, nil
+ }))
+
+ assert.ElementsMatch(t, []string{
+ "rest",
+ "http",
+ "foobar",
+ "https",
+ "glue",
+ }, catalog.GetRegisteredCatalogs())
+
+ c, err := catalog.Load("foobar", iceberg.Properties{"foo": "baz"})
+ assert.Nil(t, c)
+ assert.NoError(t, err)
+
+ catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p
iceberg.Properties) (catalog.Catalog, error) {
+ assert.Equal(t, "foobar://helloworld", s)
+ assert.Equal(t, "baz", p.Get("foo", ""))
+ return nil, nil
+ }))
+
+ c, err = catalog.Load("foobar://helloworld", iceberg.Properties{"foo":
"baz"})
+ assert.Nil(t, c)
+ assert.NoError(t, err)
+
+ catalog.Unregister("foobar")
+ assert.ElementsMatch(t, []string{
+ "rest",
+ "http",
+ "https",
+ "glue",
+ }, catalog.GetRegisteredCatalogs())
+}
diff --git a/catalog/rest.go b/catalog/rest.go
index 47be1eb..d068ae5 100644
--- a/catalog/rest.go
+++ b/catalog/rest.go
@@ -21,6 +21,7 @@ import (
"bytes"
"context"
"crypto/sha256"
+ "crypto/tls"
"encoding/json"
"errors"
"fmt"
@@ -56,6 +57,7 @@ const (
keyRestSigV4Region = "rest.signing-region"
keyRestSigV4Service = "rest.signing-name"
keyAuthUrl = "rest.authorization-url"
+ keyTlsSkipVerify = "rest.tls.skip-verify"
)
var (
@@ -71,6 +73,16 @@ var (
ErrOAuthError = fmt.Errorf("%w: oauth error", ErrRESTError)
)
+func init() {
+ reg := RegistrarFunc(func(endpoint string, p iceberg.Properties)
(Catalog, error) {
+ return newRestCatalogFromProps(endpoint, p.Get("uri",
endpoint), p)
+ })
+
+ Register(string(REST), reg)
+ Register("http", reg)
+ Register("https", reg)
+}
+
type errorResponse struct {
Message string `json:"message"`
Type string `json:"type"`
@@ -408,6 +420,15 @@ func fromProps(props iceberg.Properties) *options {
o.credential = v
case keyPrefix:
o.prefix = v
+ case keyTlsSkipVerify:
+ verify := strings.ToLower(v) == "true"
+ if o.tlsConfig == nil {
+ o.tlsConfig = &tls.Config{
+ InsecureSkipVerify: verify,
+ }
+ } else {
+ o.tlsConfig.InsecureSkipVerify = verify
+ }
}
}
return o
@@ -447,29 +468,45 @@ type RestCatalog struct {
props iceberg.Properties
}
+func newRestCatalogFromProps(name string, uri string, p iceberg.Properties)
(*RestCatalog, error) {
+ ops := fromProps(p)
+
+ r := &RestCatalog{name: name}
+ if err := r.init(ops, uri); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
func NewRestCatalog(name, uri string, opts ...Option[RestCatalog])
(*RestCatalog, error) {
ops := &options{}
for _, o := range opts {
o(ops)
}
- baseuri, err := url.Parse(uri)
- if err != nil {
+ r := &RestCatalog{name: name}
+ if err := r.init(ops, uri); err != nil {
return nil, err
}
- r := &RestCatalog{
- name: name,
- baseURI: baseuri.JoinPath("v1"),
+ return r, nil
+}
+
+func (r *RestCatalog) init(ops *options, uri string) error {
+ baseuri, err := url.Parse(uri)
+ if err != nil {
+ return err
}
+ r.baseURI = baseuri.JoinPath("v1")
if ops, err = r.fetchConfig(ops); err != nil {
- return nil, err
+ return err
}
cl, err := r.createSession(ops)
if err != nil {
- return nil, err
+ return err
}
r.cl = cl
@@ -477,7 +514,7 @@ func NewRestCatalog(name, uri string, opts
...Option[RestCatalog]) (*RestCatalog
r.baseURI = r.baseURI.JoinPath(ops.prefix)
}
r.props = toProps(ops)
- return r, nil
+ return nil
}
func (r *RestCatalog) fetchAccessToken(cl *http.Client, creds string, opts
*options) (string, error) {
diff --git a/catalog/rest_test.go b/catalog/rest_test.go
index 212d5a8..f290722 100644
--- a/catalog/rest_test.go
+++ b/catalog/rest_test.go
@@ -114,6 +114,39 @@ func (r *RestCatalogSuite) TestToken200() {
r.Equal(r.configVals.Get("warehouse"), "s3://some-bucket")
}
+func (r *RestCatalogSuite) TestLoadRegisteredCatalog() {
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ r.Equal(http.MethodPost, req.Method)
+
+ r.Equal(req.Header.Get("Content-Type"),
"application/x-www-form-urlencoded")
+
+ r.Require().NoError(req.ParseForm())
+ values := req.PostForm
+ r.Equal(values.Get("grant_type"), "client_credentials")
+ r.Equal(values.Get("client_id"), "client")
+ r.Equal(values.Get("client_secret"), "secret")
+ r.Equal(values.Get("scope"), "catalog")
+
+ w.WriteHeader(http.StatusOK)
+
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken,
+ "token_type": "Bearer",
+ "expires_in": 86400,
+ "issued_token_type":
"urn:ietf:params:oauth:token-type:access_token",
+ })
+ })
+
+ cat, err := catalog.Load(r.srv.URL, iceberg.Properties{
+ "warehouse": "s3://some-bucket",
+ "credential": TestCreds,
+ })
+ r.NoError(err)
+
+ r.NotNil(cat)
+ r.Equal(r.configVals.Get("warehouse"), "s3://some-bucket")
+}
+
func (r *RestCatalogSuite) TestToken400() {
r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
r.Equal(http.MethodPost, req.Method)
@@ -882,6 +915,18 @@ func (r *RestTLSCatalogSuite) TestSSLFail() {
r.ErrorContains(err, "tls: failed to verify certificate")
}
+func (r *RestTLSCatalogSuite) TestSSLLoadRegisteredCatalog() {
+ cat, err := catalog.Load(r.srv.URL, iceberg.Properties{
+ "warehouse": "s3://some-bucket",
+ "token": TestToken,
+ "rest.tls.skip-verify": "true",
+ })
+ r.NoError(err)
+
+ r.NotNil(cat)
+ r.Equal(r.configVals.Get("warehouse"), "s3://some-bucket")
+}
+
func (r *RestTLSCatalogSuite) TestSSLConfig() {
cat, err := catalog.NewRestCatalog("rest", r.srv.URL,
catalog.WithOAuthToken(TestToken),
catalog.WithWarehouseLocation("s3://some-bucket"),