This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new b58a334  feat(catalog): Add Catalog Registry (#244)
b58a334 is described below

commit b58a334c1592455804392b16bc90bbce1734da19
Author: Matt Topol <[email protected]>
AuthorDate: Fri Jan 10 15:17:42 2025 -0500

    feat(catalog): Add Catalog Registry (#244)
    
    * feat(catalog): Add Catalog Registry
    
    * remove doc for SQL catalog since I factored the registry out for a 
smaller PR
    
    * fixup tests
    
    * use `glue.access-key-id` for property name
    
    * use constants and allow setting *some* access fields for glue
    
    * Update catalog/registry.go
    
    Co-authored-by: Kevin Liu <[email protected]>
    
    * Update catalog/registry.go
    
    Co-authored-by: Kevin Liu <[email protected]>
    
    ---------
    
    Co-authored-by: Kevin Liu <[email protected]>
---
 catalog/catalog.go       |  51 ++++++++++++++++++
 catalog/glue.go          | 101 +++++++++++++++++++++--------------
 catalog/registry.go      | 135 +++++++++++++++++++++++++++++++++++++++++++++++
 catalog/registry_test.go |  71 +++++++++++++++++++++++++
 catalog/rest.go          |  53 ++++++++++++++++---
 catalog/rest_test.go     |  45 ++++++++++++++++
 6 files changed, 407 insertions(+), 49 deletions(-)

diff --git a/catalog/catalog.go b/catalog/catalog.go
index 9b5776d..0bc8f49 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -21,6 +21,8 @@ import (
        "context"
        "crypto/tls"
        "errors"
+       "fmt"
+       "maps"
        "net/url"
 
        "github.com/apache/iceberg-go"
@@ -46,6 +48,8 @@ var (
        ErrNoSuchNamespace        = errors.New("namespace does not exist")
        ErrNamespaceAlreadyExists = errors.New("namespace already exists")
        ErrTableAlreadyExists     = errors.New("table already exists")
+       ErrCatalogNotFound        = errors.New("catalog type not registered")
+       ErrNamespaceNotEmpty      = errors.New("namespace is not empty")
 )
 
 // WithAwsConfig sets the AWS configuration for the catalog.
@@ -195,3 +199,50 @@ func TableNameFromIdent(ident table.Identifier) string {
 func NamespaceFromIdent(ident table.Identifier) table.Identifier {
        return ident[:len(ident)-1]
 }
+
+func checkForOverlap(removals []string, updates iceberg.Properties) error {
+       overlap := []string{}
+       for _, key := range removals {
+               if _, ok := updates[key]; ok {
+                       overlap = append(overlap, key)
+               }
+       }
+       if len(overlap) > 0 {
+               return fmt.Errorf("conflict between removals and updates for 
keys: %v", overlap)
+       }
+       return nil
+}
+
+func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals 
[]string, updates iceberg.Properties) (iceberg.Properties, 
PropertiesUpdateSummary, error) {
+       if err := checkForOverlap(removals, updates); err != nil {
+               return nil, PropertiesUpdateSummary{}, err
+       }
+
+       var (
+               updatedProps = maps.Clone(currentProps)
+               removed      = make([]string, 0, len(removals))
+               updated      = make([]string, 0, len(updates))
+       )
+
+       for _, key := range removals {
+               if _, exists := updatedProps[key]; exists {
+                       delete(updatedProps, key)
+                       removed = append(removed, key)
+               }
+       }
+
+       for key, value := range updates {
+               if updatedProps[key] != value {
+                       updated = append(updated, key)
+                       updatedProps[key] = value
+               }
+       }
+
+       summary := PropertiesUpdateSummary{
+               Removed: removed,
+               Updated: updated,
+               Missing: iceberg.Difference(removals, removed),
+       }
+
+       return updatedProps, summary, nil
+}
diff --git a/catalog/glue.go b/catalog/glue.go
index c970e5a..245116b 100644
--- a/catalog/glue.go
+++ b/catalog/glue.go
@@ -21,11 +21,14 @@ import (
        "context"
        "errors"
        "fmt"
+       "strconv"
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/aws/aws-sdk-go-v2/aws"
+       "github.com/aws/aws-sdk-go-v2/config"
+       "github.com/aws/aws-sdk-go-v2/credentials"
        "github.com/aws/aws-sdk-go-v2/service/glue"
        "github.com/aws/aws-sdk-go-v2/service/glue/types"
 )
@@ -47,13 +50,65 @@ const (
        // The ID of the Glue Data Catalog where the tables reside. If none is 
provided, Glue
        // automatically uses the caller's AWS account ID by default.
        // See: 
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
-       glueCatalogIdKey = "glue.id"
+       GlueCatalogIdKey = "glue.id"
+
+       GlueAccessKeyID     = "glue.access-key-id"
+       GlueSecretAccessKey = "glue.secret-access-key"
+       GlueSessionToken    = "glue.session-token"
+       GlueRegion          = "glue.region"
+       GlueEndpoint        = "glue.endpoint"
+       GlueMaxRetries      = "glue.max-retries"
+       GlueRetryMode       = "glue.retry-mode"
 )
 
 var (
        _ Catalog = (*GlueCatalog)(nil)
 )
 
+func init() {
+       Register("glue", RegistrarFunc(func(_ string, props iceberg.Properties) 
(Catalog, error) {
+               awsConfig, err := toAwsConfig(props)
+               if err != nil {
+                       return nil, err
+               }
+
+               return NewGlueCatalog(WithAwsConfig(awsConfig), 
WithAwsProperties(AwsProperties(props))), nil
+       }))
+}
+
+func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
+       opts := make([]func(*config.LoadOptions) error, 0)
+
+       for k, v := range p {
+               switch k {
+               case GlueRegion:
+                       opts = append(opts, config.WithRegion(v))
+               case GlueEndpoint:
+                       opts = append(opts, config.WithBaseEndpoint(v))
+               case GlueMaxRetries:
+                       maxRetry, err := strconv.Atoi(v)
+                       if err != nil {
+                               return aws.Config{}, err
+                       }
+                       opts = append(opts, 
config.WithRetryMaxAttempts(maxRetry))
+               case GlueRetryMode:
+                       m, err := aws.ParseRetryMode(v)
+                       if err != nil {
+                               return aws.Config{}, err
+                       }
+                       opts = append(opts, config.WithRetryMode(m))
+               }
+       }
+
+       key, secret, token := p[GlueAccessKeyID], p[GlueSecretAccessKey], 
p[GlueSessionToken]
+       if key != "" || secret != "" || token != "" {
+               opts = append(opts, config.WithCredentialsProvider(
+                       credentials.NewStaticCredentialsProvider(key, secret, 
token)))
+       }
+
+       return config.LoadDefaultConfig(context.Background(), opts...)
+}
+
 type glueAPI interface {
        CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns 
...func(*glue.Options)) (*glue.CreateTableOutput, error)
        GetTable(ctx context.Context, params *glue.GetTableInput, optFns 
...func(*glue.Options)) (*glue.GetTableOutput, error)
@@ -80,7 +135,7 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) 
*GlueCatalog {
        }
 
        var catalogId *string
-       if val, ok := glueOps.awsProperties[glueCatalogIdKey]; ok {
+       if val, ok := glueOps.awsProperties[GlueCatalogIdKey]; ok {
                catalogId = &val
        } else {
                catalogId = nil
@@ -353,39 +408,9 @@ func (c *GlueCatalog) UpdateNamespaceProperties(ctx 
context.Context, namespace t
                return PropertiesUpdateSummary{}, err
        }
 
-       overlap := []string{}
-       for _, key := range removals {
-               if _, exists := updates[key]; exists {
-                       overlap = append(overlap, key)
-               }
-       }
-       if len(overlap) > 0 {
-               return PropertiesUpdateSummary{}, fmt.Errorf("conflict between 
removals and updates for keys: %v", overlap)
-       }
-
-       updatedProperties := make(map[string]string)
-       if database.Parameters != nil {
-               for k, v := range database.Parameters {
-                       updatedProperties[k] = v
-               }
-       }
-
-       // Removals.
-       removed := []string{}
-       for _, key := range removals {
-               if _, exists := updatedProperties[key]; exists {
-                       delete(updatedProperties, key)
-                       removed = append(removed, key)
-               }
-       }
-
-       // Updates.
-       updated := []string{}
-       for key, value := range updates {
-               if updatedProperties[key] != value {
-                       updatedProperties[key] = value
-                       updated = append(updated, key)
-               }
+       updatedProperties, propertiesUpdateSummary, err := 
getUpdatedPropsAndUpdateSummary(database.Parameters, removals, updates)
+       if err != nil {
+               return PropertiesUpdateSummary{}, err
        }
 
        _, err = c.glueSvc.UpdateDatabase(ctx, 
&glue.UpdateDatabaseInput{CatalogId: c.catalogId, Name: 
aws.String(databaseName), DatabaseInput: &types.DatabaseInput{
@@ -396,12 +421,6 @@ func (c *GlueCatalog) UpdateNamespaceProperties(ctx 
context.Context, namespace t
                return PropertiesUpdateSummary{}, fmt.Errorf("failed to update 
namespace properties %s: %w", databaseName, err)
        }
 
-       propertiesUpdateSummary := PropertiesUpdateSummary{
-               Removed: removed,
-               Updated: updated,
-               Missing: iceberg.Difference(removals, removed),
-       }
-
        return propertiesUpdateSummary, nil
 }
 
diff --git a/catalog/registry.go b/catalog/registry.go
new file mode 100644
index 0000000..1c88351
--- /dev/null
+++ b/catalog/registry.go
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package catalog
+
+import (
+       "fmt"
+       "maps"
+       "net/url"
+       "slices"
+       "strings"
+       "sync"
+
+       "github.com/apache/iceberg-go"
+)
+
+type registry map[string]Registrar
+
+func (r registry) getKeys() []string {
+       regMutex.Lock()
+       defer regMutex.Unlock()
+       return slices.Collect(maps.Keys(r))
+}
+
+func (r registry) set(catalogType string, reg Registrar) {
+       regMutex.Lock()
+       defer regMutex.Unlock()
+       r[catalogType] = reg
+}
+
+func (r registry) get(catalogType string) (Registrar, bool) {
+       regMutex.Lock()
+       defer regMutex.Unlock()
+       reg, ok := r[catalogType]
+       return reg, ok
+}
+
+func (r registry) remove(catalogType string) {
+       regMutex.Lock()
+       defer regMutex.Unlock()
+       delete(r, catalogType)
+}
+
+var (
+       regMutex        sync.Mutex
+       defaultRegistry = registry{}
+)
+
+// Registrar is a factory for creating Catalog instances, used for registering 
to use
+// with LoadCatalog.
+type Registrar interface {
+       GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error)
+}
+
+type RegistrarFunc func(string, iceberg.Properties) (Catalog, error)
+
+func (f RegistrarFunc) GetCatalog(catalogURI string, props iceberg.Properties) 
(Catalog, error) {
+       return f(catalogURI, props)
+}
+
+// Register adds the new catalog type to the registry. If the catalog type is 
already registered, it will be replaced.
+func Register(catalogType string, reg Registrar) {
+       if reg == nil {
+               panic("catalog: RegisterCatalog catalog factory is nil")
+       }
+       defaultRegistry.set(catalogType, reg)
+}
+
+// Unregister removes the requested catalog factory from the registry.
+func Unregister(catalogType string) {
+       defaultRegistry.remove(catalogType)
+}
+
+// GetRegsisteredCatalogs returns the list of registered catalog names that can
+// be looked up via LoadCatalog.
+func GetRegisteredCatalogs() []string {
+       return defaultRegistry.getKeys()
+}
+
+// Load allows loading a specific catalog by URI and properties.
+//
+// This is utilized alongside RegisterCatalog/UnregisterCatalog to not only 
allow
+// easier catalog loading but also to allow for custom catalog implementations 
to
+// be registered and loaded external to this module.
+//
+// The URI is used to determine the catalog type by first checking if it 
contains
+// the string "://" indicating the presence of a scheme. If so, the scheme is 
used
+// to lookup the registered catalog. i.e. "glue://..." would return the Glue 
catalog
+// implementation, passing the URI and properties to NewGlueCatalog. If no 
scheme is
+// present, then the URI is used as-is to lookup the catalog factory function.
+//
+// Currently the following catalogs are registered by default:
+//
+//   - "glue" for AWS Glue Data Catalog, the rest of the URI is ignored, all 
configuration
+//     should be provided using the properties. "glue.region", "glue.endpoint",
+//     "glue.max-retries", etc. Default AWS credentials are used if found, or 
can be
+//     overridden by setting "glue.access-key-id", "glue.secret-access-key", 
and "glue.session-token".
+//
+//   - "rest" for a REST API catalog, if the properties have a "uri" key, then 
that will be used
+//     as the REST endpoint, otherwise the URI is used as the endpoint. The 
REST catalog also
+//     registers "http" and "https" so that Load with an http/s URI will 
automatically
+//     load the REST Catalog.
+func Load(catalogURI string, props iceberg.Properties) (Catalog, error) {
+       var catalogType string
+       if strings.Contains(catalogURI, "://") {
+               parsed, err := url.Parse(catalogURI)
+               if err != nil {
+                       return nil, err
+               }
+               catalogType = parsed.Scheme
+       } else {
+               catalogType = catalogURI
+       }
+
+       cat, ok := defaultRegistry.get(catalogType)
+       if !ok {
+               return nil, fmt.Errorf("%w: %s", ErrCatalogNotFound, 
catalogType)
+       }
+
+       return cat.GetCatalog(catalogURI, props)
+}
diff --git a/catalog/registry_test.go b/catalog/registry_test.go
new file mode 100644
index 0000000..c132656
--- /dev/null
+++ b/catalog/registry_test.go
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package catalog_test
+
+import (
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestCatalogRegistry(t *testing.T) {
+       assert.ElementsMatch(t, []string{
+               "rest",
+               "http",
+               "https",
+               "glue",
+       }, catalog.GetRegisteredCatalogs())
+
+       catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p 
iceberg.Properties) (catalog.Catalog, error) {
+               assert.Equal(t, "foobar", s)
+               assert.Equal(t, "baz", p.Get("foo", ""))
+               return nil, nil
+       }))
+
+       assert.ElementsMatch(t, []string{
+               "rest",
+               "http",
+               "foobar",
+               "https",
+               "glue",
+       }, catalog.GetRegisteredCatalogs())
+
+       c, err := catalog.Load("foobar", iceberg.Properties{"foo": "baz"})
+       assert.Nil(t, c)
+       assert.NoError(t, err)
+
+       catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p 
iceberg.Properties) (catalog.Catalog, error) {
+               assert.Equal(t, "foobar://helloworld", s)
+               assert.Equal(t, "baz", p.Get("foo", ""))
+               return nil, nil
+       }))
+
+       c, err = catalog.Load("foobar://helloworld", iceberg.Properties{"foo": 
"baz"})
+       assert.Nil(t, c)
+       assert.NoError(t, err)
+
+       catalog.Unregister("foobar")
+       assert.ElementsMatch(t, []string{
+               "rest",
+               "http",
+               "https",
+               "glue",
+       }, catalog.GetRegisteredCatalogs())
+}
diff --git a/catalog/rest.go b/catalog/rest.go
index 47be1eb..d068ae5 100644
--- a/catalog/rest.go
+++ b/catalog/rest.go
@@ -21,6 +21,7 @@ import (
        "bytes"
        "context"
        "crypto/sha256"
+       "crypto/tls"
        "encoding/json"
        "errors"
        "fmt"
@@ -56,6 +57,7 @@ const (
        keyRestSigV4Region  = "rest.signing-region"
        keyRestSigV4Service = "rest.signing-name"
        keyAuthUrl          = "rest.authorization-url"
+       keyTlsSkipVerify    = "rest.tls.skip-verify"
 )
 
 var (
@@ -71,6 +73,16 @@ var (
        ErrOAuthError           = fmt.Errorf("%w: oauth error", ErrRESTError)
 )
 
+func init() {
+       reg := RegistrarFunc(func(endpoint string, p iceberg.Properties) 
(Catalog, error) {
+               return newRestCatalogFromProps(endpoint, p.Get("uri", 
endpoint), p)
+       })
+
+       Register(string(REST), reg)
+       Register("http", reg)
+       Register("https", reg)
+}
+
 type errorResponse struct {
        Message string `json:"message"`
        Type    string `json:"type"`
@@ -408,6 +420,15 @@ func fromProps(props iceberg.Properties) *options {
                        o.credential = v
                case keyPrefix:
                        o.prefix = v
+               case keyTlsSkipVerify:
+                       verify := strings.ToLower(v) == "true"
+                       if o.tlsConfig == nil {
+                               o.tlsConfig = &tls.Config{
+                                       InsecureSkipVerify: verify,
+                               }
+                       } else {
+                               o.tlsConfig.InsecureSkipVerify = verify
+                       }
                }
        }
        return o
@@ -447,29 +468,45 @@ type RestCatalog struct {
        props iceberg.Properties
 }
 
+func newRestCatalogFromProps(name string, uri string, p iceberg.Properties) 
(*RestCatalog, error) {
+       ops := fromProps(p)
+
+       r := &RestCatalog{name: name}
+       if err := r.init(ops, uri); err != nil {
+               return nil, err
+       }
+
+       return r, nil
+}
+
 func NewRestCatalog(name, uri string, opts ...Option[RestCatalog]) 
(*RestCatalog, error) {
        ops := &options{}
        for _, o := range opts {
                o(ops)
        }
 
-       baseuri, err := url.Parse(uri)
-       if err != nil {
+       r := &RestCatalog{name: name}
+       if err := r.init(ops, uri); err != nil {
                return nil, err
        }
 
-       r := &RestCatalog{
-               name:    name,
-               baseURI: baseuri.JoinPath("v1"),
+       return r, nil
+}
+
+func (r *RestCatalog) init(ops *options, uri string) error {
+       baseuri, err := url.Parse(uri)
+       if err != nil {
+               return err
        }
 
+       r.baseURI = baseuri.JoinPath("v1")
        if ops, err = r.fetchConfig(ops); err != nil {
-               return nil, err
+               return err
        }
 
        cl, err := r.createSession(ops)
        if err != nil {
-               return nil, err
+               return err
        }
 
        r.cl = cl
@@ -477,7 +514,7 @@ func NewRestCatalog(name, uri string, opts 
...Option[RestCatalog]) (*RestCatalog
                r.baseURI = r.baseURI.JoinPath(ops.prefix)
        }
        r.props = toProps(ops)
-       return r, nil
+       return nil
 }
 
 func (r *RestCatalog) fetchAccessToken(cl *http.Client, creds string, opts 
*options) (string, error) {
diff --git a/catalog/rest_test.go b/catalog/rest_test.go
index 212d5a8..f290722 100644
--- a/catalog/rest_test.go
+++ b/catalog/rest_test.go
@@ -114,6 +114,39 @@ func (r *RestCatalogSuite) TestToken200() {
        r.Equal(r.configVals.Get("warehouse"), "s3://some-bucket")
 }
 
+func (r *RestCatalogSuite) TestLoadRegisteredCatalog() {
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               r.Equal(http.MethodPost, req.Method)
+
+               r.Equal(req.Header.Get("Content-Type"), 
"application/x-www-form-urlencoded")
+
+               r.Require().NoError(req.ParseForm())
+               values := req.PostForm
+               r.Equal(values.Get("grant_type"), "client_credentials")
+               r.Equal(values.Get("client_id"), "client")
+               r.Equal(values.Get("client_secret"), "secret")
+               r.Equal(values.Get("scope"), "catalog")
+
+               w.WriteHeader(http.StatusOK)
+
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token":      TestToken,
+                       "token_type":        "Bearer",
+                       "expires_in":        86400,
+                       "issued_token_type": 
"urn:ietf:params:oauth:token-type:access_token",
+               })
+       })
+
+       cat, err := catalog.Load(r.srv.URL, iceberg.Properties{
+               "warehouse":  "s3://some-bucket",
+               "credential": TestCreds,
+       })
+       r.NoError(err)
+
+       r.NotNil(cat)
+       r.Equal(r.configVals.Get("warehouse"), "s3://some-bucket")
+}
+
 func (r *RestCatalogSuite) TestToken400() {
        r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
                r.Equal(http.MethodPost, req.Method)
@@ -882,6 +915,18 @@ func (r *RestTLSCatalogSuite) TestSSLFail() {
        r.ErrorContains(err, "tls: failed to verify certificate")
 }
 
+func (r *RestTLSCatalogSuite) TestSSLLoadRegisteredCatalog() {
+       cat, err := catalog.Load(r.srv.URL, iceberg.Properties{
+               "warehouse":            "s3://some-bucket",
+               "token":                TestToken,
+               "rest.tls.skip-verify": "true",
+       })
+       r.NoError(err)
+
+       r.NotNil(cat)
+       r.Equal(r.configVals.Get("warehouse"), "s3://some-bucket")
+}
+
 func (r *RestTLSCatalogSuite) TestSSLConfig() {
        cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken),
                catalog.WithWarehouseLocation("s3://some-bucket"),

Reply via email to