This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 666b036  refactor(catalog): restructure catalog package (#266)
666b036 is described below

commit 666b0367b940adf3f2ff6d7b78816bf78691cd0e
Author: Matt Topol <[email protected]>
AuthorDate: Sun Jan 26 14:28:48 2025 -0500

    refactor(catalog): restructure catalog package (#266)
    
    Refactoring and restructuring the `catalog` package based on discussions
    with others in the Go community. This achieves the following benefits:
    
    * Improved, and shorter, naming of types and methods. Things that are
    specific to the REST catalog or Glue catalog can be put directly into
    those packages allowing for a public API such as `rest.NewCatalog`
    instead of `catalog.NewRestCatalog`. Options can be made specific to
    catalog implementations without weird generic handling, etc.
    * Using the Registry allows consumers to only import the catalog
    implementations they care about, allowing any others to be pruned and
    removed by the compiler as unused and unreachable if not imported. This
    will enable smaller binaries if not all catalog implementations are
    needed.
---
 catalog/catalog.go                       | 228 ++++++-------------------------
 catalog/{ => glue}/glue.go               | 151 +++++++++++++-------
 catalog/{ => glue}/glue_test.go          |  55 ++++----
 catalog/glue/options.go                  |  44 ++++++
 catalog/internal/utils.go                |  30 ++++
 catalog/registry_test.go                 |  14 +-
 catalog/rest/options.go                  | 116 ++++++++++++++++
 catalog/{ => rest}/rest.go               | 153 ++++++++++-----------
 catalog/{ => rest}/rest_internal_test.go |   6 +-
 catalog/{ => rest}/rest_test.go          | 135 +++++++++---------
 cmd/iceberg/main.go                      |  34 ++---
 table/scanner_test.go                    |  29 ++--
 12 files changed, 546 insertions(+), 449 deletions(-)

diff --git a/catalog/catalog.go b/catalog/catalog.go
index bb843b1..22cce9a 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -15,32 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package catalog provides an interface for Catalog implementations along with
+// a registry for registering catalog implementations.
+//
+// Subpackages of this package provide some default implementations for select
+// catalog types which will register themselves if imported. For instance,
+// adding the following import:
+//
+//     import _ "github.com/apache/iceberg-go/catalog/rest"
+//
+// Will register the REST catalog implementation.
 package catalog
 
 import (
        "context"
-       "crypto/tls"
        "errors"
-       "fmt"
-       "maps"
-       "net/url"
        "strings"
 
        "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog/internal"
        "github.com/apache/iceberg-go/table"
-       "github.com/aws/aws-sdk-go-v2/aws"
 )
 
-type CatalogType string
-
-type AwsProperties map[string]string
+type Type string
 
 const (
-       REST     CatalogType = "rest"
-       Hive     CatalogType = "hive"
-       Glue     CatalogType = "glue"
-       DynamoDB CatalogType = "dynamodb"
-       SQL      CatalogType = "sql"
+       REST     Type = "rest"
+       Hive     Type = "hive"
+       Glue     Type = "glue"
+       DynamoDB Type = "dynamodb"
+       SQL      Type = "sql"
 )
 
 var (
@@ -53,106 +57,6 @@ var (
        ErrNamespaceNotEmpty      = errors.New("namespace is not empty")
 )
 
-// WithAwsConfig sets the AWS configuration for the catalog.
-func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] {
-       return func(o *options) {
-               o.awsConfig = cfg
-       }
-}
-
-func WithAwsProperties(props AwsProperties) Option[GlueCatalog] {
-       return func(o *options) {
-               o.awsProperties = props
-       }
-}
-
-func WithCredential(cred string) Option[RestCatalog] {
-       return func(o *options) {
-               o.credential = cred
-       }
-}
-
-func WithOAuthToken(token string) Option[RestCatalog] {
-       return func(o *options) {
-               o.oauthToken = token
-       }
-}
-
-func WithTLSConfig(config *tls.Config) Option[RestCatalog] {
-       return func(o *options) {
-               o.tlsConfig = config
-       }
-}
-
-func WithWarehouseLocation(loc string) Option[RestCatalog] {
-       return func(o *options) {
-               o.warehouseLocation = loc
-       }
-}
-
-func WithMetadataLocation(loc string) Option[RestCatalog] {
-       return func(o *options) {
-               o.metadataLocation = loc
-       }
-}
-
-func WithSigV4() Option[RestCatalog] {
-       return func(o *options) {
-               o.enableSigv4 = true
-               o.sigv4Service = "execute-api"
-       }
-}
-
-func WithSigV4RegionSvc(region, service string) Option[RestCatalog] {
-       return func(o *options) {
-               o.enableSigv4 = true
-               o.sigv4Region = region
-
-               if service == "" {
-                       o.sigv4Service = "execute-api"
-               } else {
-                       o.sigv4Service = service
-               }
-       }
-}
-
-func WithAuthURI(uri *url.URL) Option[RestCatalog] {
-       return func(o *options) {
-               o.authUri = uri
-       }
-}
-
-func WithPrefix(prefix string) Option[RestCatalog] {
-       return func(o *options) {
-               o.prefix = prefix
-       }
-}
-
-func WithScope(scope string) Option[RestCatalog] {
-       return func(o *options) {
-               o.scope = scope
-       }
-}
-
-type Option[T GlueCatalog | RestCatalog] func(*options)
-
-type options struct {
-       awsConfig     aws.Config
-       awsProperties AwsProperties
-
-       tlsConfig         *tls.Config
-       credential        string
-       oauthToken        string
-       warehouseLocation string
-       metadataLocation  string
-       enableSigv4       bool
-       sigv4Region       string
-       sigv4Service      string
-       prefix            string
-       authUri           *url.URL
-       scope             string
-}
-
 type PropertiesUpdateSummary struct {
        Removed []string `json:"removed"`
        Updated []string `json:"updated"`
@@ -162,12 +66,12 @@ type PropertiesUpdateSummary struct {
 // Catalog for iceberg table operations like create, drop, load, list and 
others.
 type Catalog interface {
        // CatalogType returns the type of the catalog.
-       CatalogType() CatalogType
+       CatalogType() Type
 
        // CreateTable creates a new iceberg table in the catalog using the 
provided identifier
        // and schema. Options can be used to optionally provide location, 
partition spec, sort order,
        // and custom properties.
-       CreateTable(ctx context.Context, identifier table.Identifier, schema 
*iceberg.Schema, opts ...createTableOpt) (*table.Table, error)
+       CreateTable(ctx context.Context, identifier table.Identifier, schema 
*iceberg.Schema, opts ...CreateTableOpt) (*table.Table, error)
        // CommitTable commits the table metadata and updates to the catalog, 
returning the new metadata
        CommitTable(context.Context, *table.Table, []table.Requirement, 
[]table.Update) (table.Metadata, string, error)
        // ListTables returns a list of table identifiers in the catalog, with 
the returned
@@ -195,12 +99,16 @@ type Catalog interface {
                removals []string, updates iceberg.Properties) 
(PropertiesUpdateSummary, error)
 }
 
-const (
-       keyOauthToken        = "token"
-       keyWarehouseLocation = "warehouse"
-       keyMetadataLocation  = "metadata_location"
-       keyOauthCredential   = "credential"
-)
+func ToIdentifier(ident ...string) table.Identifier {
+       if len(ident) == 1 {
+               if ident[0] == "" {
+                       return nil
+               }
+               return table.Identifier(strings.Split(ident[0], "."))
+       }
+
+       return table.Identifier(ident)
+}
 
 func TableNameFromIdent(ident table.Identifier) string {
        if len(ident) == 0 {
@@ -214,80 +122,28 @@ func NamespaceFromIdent(ident table.Identifier) 
table.Identifier {
        return ident[:len(ident)-1]
 }
 
-func checkForOverlap(removals []string, updates iceberg.Properties) error {
-       overlap := []string{}
-       for _, key := range removals {
-               if _, ok := updates[key]; ok {
-                       overlap = append(overlap, key)
-               }
-       }
-       if len(overlap) > 0 {
-               return fmt.Errorf("conflict between removals and updates for 
keys: %v", overlap)
-       }
-       return nil
-}
-
-func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals 
[]string, updates iceberg.Properties) (iceberg.Properties, 
PropertiesUpdateSummary, error) {
-       if err := checkForOverlap(removals, updates); err != nil {
-               return nil, PropertiesUpdateSummary{}, err
-       }
-       var (
-               updatedProps = maps.Clone(currentProps)
-               removed      = make([]string, 0, len(removals))
-               updated      = make([]string, 0, len(updates))
-       )
-
-       for _, key := range removals {
-               if _, exists := updatedProps[key]; exists {
-                       delete(updatedProps, key)
-                       removed = append(removed, key)
-               }
-       }
-
-       for key, value := range updates {
-               if updatedProps[key] != value {
-                       updated = append(updated, key)
-                       updatedProps[key] = value
-               }
-       }
-
-       summary := PropertiesUpdateSummary{
-               Removed: removed,
-               Updated: updated,
-               Missing: iceberg.Difference(removals, removed),
-       }
-       return updatedProps, summary, nil
-}
-
-type createTableOpt func(*createTableCfg)
-
-type createTableCfg struct {
-       location      string
-       partitionSpec *iceberg.PartitionSpec
-       sortOrder     table.SortOrder
-       properties    iceberg.Properties
-}
+type CreateTableOpt func(*internal.CreateTableCfg)
 
-func WithLocation(location string) createTableOpt {
-       return func(cfg *createTableCfg) {
-               cfg.location = strings.TrimRight(location, "/")
+func WithLocation(location string) CreateTableOpt {
+       return func(cfg *internal.CreateTableCfg) {
+               cfg.Location = strings.TrimRight(location, "/")
        }
 }
 
-func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOpt {
-       return func(cfg *createTableCfg) {
-               cfg.partitionSpec = spec
+func WithPartitionSpec(spec *iceberg.PartitionSpec) CreateTableOpt {
+       return func(cfg *internal.CreateTableCfg) {
+               cfg.PartitionSpec = spec
        }
 }
 
-func WithSortOrder(order table.SortOrder) createTableOpt {
-       return func(cfg *createTableCfg) {
-               cfg.sortOrder = order
+func WithSortOrder(order table.SortOrder) CreateTableOpt {
+       return func(cfg *internal.CreateTableCfg) {
+               cfg.SortOrder = order
        }
 }
 
-func WithProperties(props iceberg.Properties) createTableOpt {
-       return func(cfg *createTableCfg) {
-               cfg.properties = props
+func WithProperties(props iceberg.Properties) CreateTableOpt {
+       return func(cfg *internal.CreateTableCfg) {
+               cfg.Properties = props
        }
 }
diff --git a/catalog/glue.go b/catalog/glue/glue.go
similarity index 75%
rename from catalog/glue.go
rename to catalog/glue/glue.go
index d661a44..1d62ee6 100644
--- a/catalog/glue.go
+++ b/catalog/glue/glue.go
@@ -15,15 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package catalog
+package glue
 
 import (
        "context"
        "errors"
        "fmt"
+       "maps"
        "strconv"
 
        "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
        "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/aws/aws-sdk-go-v2/aws"
@@ -50,29 +52,29 @@ const (
        // The ID of the Glue Data Catalog where the tables reside. If none is 
provided, Glue
        // automatically uses the caller's AWS account ID by default.
        // See: 
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
-       GlueCatalogIdKey = "glue.id"
-
-       GlueAccessKeyID     = "glue.access-key-id"
-       GlueSecretAccessKey = "glue.secret-access-key"
-       GlueSessionToken    = "glue.session-token"
-       GlueRegion          = "glue.region"
-       GlueEndpoint        = "glue.endpoint"
-       GlueMaxRetries      = "glue.max-retries"
-       GlueRetryMode       = "glue.retry-mode"
+       CatalogIdKey = "glue.id"
+
+       AccessKeyID     = "glue.access-key-id"
+       SecretAccessKey = "glue.secret-access-key"
+       SessionToken    = "glue.session-token"
+       Region          = "glue.region"
+       Endpoint        = "glue.endpoint"
+       MaxRetries      = "glue.max-retries"
+       RetryMode       = "glue.retry-mode"
 )
 
 var (
-       _ Catalog = (*GlueCatalog)(nil)
+       _ catalog.Catalog = (*Catalog)(nil)
 )
 
 func init() {
-       Register("glue", RegistrarFunc(func(_ string, props iceberg.Properties) 
(Catalog, error) {
+       catalog.Register("glue", catalog.RegistrarFunc(func(_ string, props 
iceberg.Properties) (catalog.Catalog, error) {
                awsConfig, err := toAwsConfig(props)
                if err != nil {
                        return nil, err
                }
 
-               return NewGlueCatalog(WithAwsConfig(awsConfig), 
WithAwsProperties(AwsProperties(props))), nil
+               return NewCatalog(WithAwsConfig(awsConfig), 
WithAwsProperties(AwsProperties(props))), nil
        }))
 }
 
@@ -81,17 +83,17 @@ func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
 
        for k, v := range p {
                switch k {
-               case GlueRegion:
+               case Region:
                        opts = append(opts, config.WithRegion(v))
-               case GlueEndpoint:
+               case Endpoint:
                        opts = append(opts, config.WithBaseEndpoint(v))
-               case GlueMaxRetries:
+               case MaxRetries:
                        maxRetry, err := strconv.Atoi(v)
                        if err != nil {
                                return aws.Config{}, err
                        }
                        opts = append(opts, 
config.WithRetryMaxAttempts(maxRetry))
-               case GlueRetryMode:
+               case RetryMode:
                        m, err := aws.ParseRetryMode(v)
                        if err != nil {
                                return aws.Config{}, err
@@ -100,7 +102,7 @@ func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
                }
        }
 
-       key, secret, token := p[GlueAccessKeyID], p[GlueSecretAccessKey], 
p[GlueSessionToken]
+       key, secret, token := p[AccessKeyID], p[SecretAccessKey], 
p[SessionToken]
        if key != "" || secret != "" || token != "" {
                opts = append(opts, config.WithCredentialsProvider(
                        credentials.NewStaticCredentialsProvider(key, secret, 
token)))
@@ -121,13 +123,13 @@ type glueAPI interface {
        UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput, 
optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error)
 }
 
-type GlueCatalog struct {
+type Catalog struct {
        glueSvc   glueAPI
        catalogId *string
 }
 
-// NewGlueCatalog creates a new instance of GlueCatalog with the given options.
-func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
+// NewCatalog creates a new instance of glue.Catalog with the given options.
+func NewCatalog(opts ...Option) *Catalog {
        glueOps := &options{}
 
        for _, o := range opts {
@@ -135,13 +137,13 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) 
*GlueCatalog {
        }
 
        var catalogId *string
-       if val, ok := glueOps.awsProperties[GlueCatalogIdKey]; ok {
+       if val, ok := glueOps.awsProperties[CatalogIdKey]; ok {
                catalogId = &val
        } else {
                catalogId = nil
        }
 
-       return &GlueCatalog{
+       return &Catalog{
                glueSvc:   glue.NewFromConfig(glueOps.awsConfig),
                catalogId: catalogId,
        }
@@ -150,7 +152,7 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) 
*GlueCatalog {
 // ListTables returns a list of Iceberg tables in the given Glue database.
 //
 // The namespace should just contain the Glue database name.
-func (c *GlueCatalog) ListTables(ctx context.Context, namespace 
table.Identifier) ([]table.Identifier, error) {
+func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) 
([]table.Identifier, error) {
        database, err := identifierToGlueDatabase(namespace)
        if err != nil {
                return nil, err
@@ -182,7 +184,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, 
namespace table.Identifier
 // LoadTable loads a table from the catalog table details.
 //
 // The identifier should contain the Glue database name, then Glue table name.
-func (c *GlueCatalog) LoadTable(ctx context.Context, identifier 
table.Identifier, props iceberg.Properties) (*table.Table, error) {
+func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, 
props iceberg.Properties) (*table.Table, error) {
        database, tableName, err := identifierToGlueTable(identifier)
        if err != nil {
                return nil, err
@@ -216,20 +218,20 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, 
identifier table.Identifier
        return icebergTable, nil
 }
 
-func (c *GlueCatalog) CatalogType() CatalogType {
-       return Glue
+func (c *Catalog) CatalogType() catalog.Type {
+       return catalog.Glue
 }
 
-func (c *GlueCatalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...createTableOpt) 
(*table.Table, error) {
+func (c *Catalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) 
(*table.Table, error) {
        panic("create table not implemented for Glue Catalog")
 }
 
-func (c *GlueCatalog) CommitTable(context.Context, *table.Table, 
[]table.Requirement, []table.Update) (table.Metadata, string, error) {
+func (c *Catalog) CommitTable(context.Context, *table.Table, 
[]table.Requirement, []table.Update) (table.Metadata, string, error) {
        panic("commit table not implemented for Glue Catalog")
 }
 
 // DropTable deletes an Iceberg table from the Glue catalog.
-func (c *GlueCatalog) DropTable(ctx context.Context, identifier 
table.Identifier) error {
+func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) 
error {
        database, tableName, err := identifierToGlueTable(identifier)
        if err != nil {
                return err
@@ -255,7 +257,7 @@ func (c *GlueCatalog) DropTable(ctx context.Context, 
identifier table.Identifier
 }
 
 // RenameTable renames an Iceberg table in the Glue catalog.
-func (c *GlueCatalog) RenameTable(ctx context.Context, from, to 
table.Identifier) (*table.Table, error) {
+func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) 
(*table.Table, error) {
        fromDatabase, fromTable, err := identifierToGlueTable(from)
        if err != nil {
                return nil, err
@@ -313,7 +315,7 @@ func (c *GlueCatalog) RenameTable(ctx context.Context, 
from, to table.Identifier
        }
 
        // Load the new table to return.
-       renamedTable, err := c.LoadTable(ctx, GlueTableIdentifier(toDatabase, 
toTable), nil)
+       renamedTable, err := c.LoadTable(ctx, TableIdentifier(toDatabase, 
toTable), nil)
        if err != nil {
                return nil, fmt.Errorf("failed to load renamed table %s.%s: 
%w", toDatabase, toTable, err)
        }
@@ -322,7 +324,7 @@ func (c *GlueCatalog) RenameTable(ctx context.Context, 
from, to table.Identifier
 }
 
 // CreateNamespace creates a new Iceberg namespace in the Glue catalog.
-func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
+func (c *Catalog) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
        database, err := identifierToGlueDatabase(namespace)
        if err != nil {
                return err
@@ -358,7 +360,7 @@ func (c *GlueCatalog) CreateNamespace(ctx context.Context, 
namespace table.Ident
 }
 
 // DropNamespace deletes an Iceberg namespace from the Glue catalog.
-func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
+func (c *Catalog) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
        databaseName, err := identifierToGlueDatabase(namespace)
        if err != nil {
                return err
@@ -380,7 +382,7 @@ func (c *GlueCatalog) DropNamespace(ctx context.Context, 
namespace table.Identif
 }
 
 // LoadNamespaceProperties loads the properties of an Iceberg namespace from 
the Glue catalog.
-func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace 
table.Identifier) (iceberg.Properties, error) {
+func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace 
table.Identifier) (iceberg.Properties, error) {
        databaseName, err := identifierToGlueDatabase(namespace)
        if err != nil {
                return nil, err
@@ -403,22 +405,22 @@ func (c *GlueCatalog) LoadNamespaceProperties(ctx 
context.Context, namespace tab
 
 // UpdateNamespaceProperties updates the properties of an Iceberg namespace in 
the Glue catalog.
 // The removals list contains the keys to remove, and the updates map contains 
the keys and values to update.
-func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace 
table.Identifier,
-       removals []string, updates iceberg.Properties) 
(PropertiesUpdateSummary, error) {
+func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace 
table.Identifier,
+       removals []string, updates iceberg.Properties) 
(catalog.PropertiesUpdateSummary, error) {
 
        databaseName, err := identifierToGlueDatabase(namespace)
        if err != nil {
-               return PropertiesUpdateSummary{}, err
+               return catalog.PropertiesUpdateSummary{}, err
        }
 
        database, err := c.getDatabase(ctx, databaseName)
        if err != nil {
-               return PropertiesUpdateSummary{}, err
+               return catalog.PropertiesUpdateSummary{}, err
        }
 
        updatedProperties, propertiesUpdateSummary, err := 
getUpdatedPropsAndUpdateSummary(database.Parameters, removals, updates)
        if err != nil {
-               return PropertiesUpdateSummary{}, err
+               return catalog.PropertiesUpdateSummary{}, err
        }
 
        _, err = c.glueSvc.UpdateDatabase(ctx, 
&glue.UpdateDatabaseInput{CatalogId: c.catalogId, Name: 
aws.String(databaseName), DatabaseInput: &types.DatabaseInput{
@@ -426,14 +428,14 @@ func (c *GlueCatalog) UpdateNamespaceProperties(ctx 
context.Context, namespace t
                Parameters: updatedProperties,
        }})
        if err != nil {
-               return PropertiesUpdateSummary{}, fmt.Errorf("failed to update 
namespace properties %s: %w", databaseName, err)
+               return catalog.PropertiesUpdateSummary{}, fmt.Errorf("failed to 
update namespace properties %s: %w", databaseName, err)
        }
 
        return propertiesUpdateSummary, nil
 }
 
 // ListNamespaces returns a list of Iceberg namespaces from the given Glue 
catalog.
-func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent 
table.Identifier) ([]table.Identifier, error) {
+func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) 
([]table.Identifier, error) {
        params := &glue.GetDatabasesInput{
                CatalogId: c.catalogId,
        }
@@ -464,7 +466,7 @@ func (c *GlueCatalog) ListNamespaces(ctx context.Context, 
parent table.Identifie
 }
 
 // GetTable loads a table from the Glue Catalog using the given database and 
table name.
-func (c *GlueCatalog) getTable(ctx context.Context, database, tableName 
string) (*types.Table, error) {
+func (c *Catalog) getTable(ctx context.Context, database, tableName string) 
(*types.Table, error) {
        tblRes, err := c.glueSvc.GetTable(ctx,
                &glue.GetTableInput{
                        CatalogId:    c.catalogId,
@@ -474,7 +476,7 @@ func (c *GlueCatalog) getTable(ctx context.Context, 
database, tableName string)
        )
        if err != nil {
                if errors.Is(err, &types.EntityNotFoundException{}) {
-                       return nil, fmt.Errorf("failed to get table %s.%s: %w", 
database, tableName, ErrNoSuchTable)
+                       return nil, fmt.Errorf("failed to get table %s.%s: %w", 
database, tableName, catalog.ErrNoSuchTable)
                }
                return nil, fmt.Errorf("failed to get table %s.%s: %w", 
database, tableName, err)
        }
@@ -487,11 +489,11 @@ func (c *GlueCatalog) getTable(ctx context.Context, 
database, tableName string)
 }
 
 // GetDatabase loads a database from the Glue Catalog using the given database 
name.
-func (c *GlueCatalog) getDatabase(ctx context.Context, databaseName string) 
(*types.Database, error) {
+func (c *Catalog) getDatabase(ctx context.Context, databaseName string) 
(*types.Database, error) {
        database, err := c.glueSvc.GetDatabase(ctx, 
&glue.GetDatabaseInput{CatalogId: c.catalogId, Name: aws.String(databaseName)})
        if err != nil {
                if errors.Is(err, &types.EntityNotFoundException{}) {
-                       return nil, fmt.Errorf("failed to get namespace %s: 
%w", databaseName, ErrNoSuchNamespace)
+                       return nil, fmt.Errorf("failed to get namespace %s: 
%w", databaseName, catalog.ErrNoSuchNamespace)
                }
                return nil, fmt.Errorf("failed to get namespace %s: %w", 
databaseName, err)
        }
@@ -519,13 +521,13 @@ func identifierToGlueDatabase(identifier 
table.Identifier) (string, error) {
        return identifier[0], nil
 }
 
-// GlueTableIdentifier returns a glue table identifier for an Iceberg table in 
the format [database, table].
-func GlueTableIdentifier(database string, tableName string) table.Identifier {
+// TableIdentifier returns a glue table identifier for an Iceberg table in the 
format [database, table].
+func TableIdentifier(database string, tableName string) table.Identifier {
        return []string{database, tableName}
 }
 
-// GlueDatabaseIdentifier returns a database identifier for a Glue database in 
the format [database].
-func GlueDatabaseIdentifier(database string) table.Identifier {
+// DatabaseIdentifier returns a database identifier for a Glue database in the 
format [database].
+func DatabaseIdentifier(database string) table.Identifier {
        return []string{database}
 }
 
@@ -536,7 +538,7 @@ func filterTableListByType(database string, tableList 
[]types.Table, tableType s
                if tbl.Parameters[tableTypePropsKey] != tableType {
                        continue
                }
-               filtered = append(filtered, GlueTableIdentifier(database, 
aws.ToString(tbl.Name)))
+               filtered = append(filtered, TableIdentifier(database, 
aws.ToString(tbl.Name)))
        }
 
        return filtered
@@ -549,8 +551,53 @@ func filterDatabaseListByType(databases []types.Database, 
databaseType string) [
                if database.Parameters[databaseTypePropsKey] != databaseType {
                        continue
                }
-               filtered = append(filtered, 
GlueDatabaseIdentifier(aws.ToString(database.Name)))
+               filtered = append(filtered, 
DatabaseIdentifier(aws.ToString(database.Name)))
        }
 
        return filtered
 }
+
+func checkForOverlap(removals []string, updates iceberg.Properties) error {
+       overlap := []string{}
+       for _, key := range removals {
+               if _, ok := updates[key]; ok {
+                       overlap = append(overlap, key)
+               }
+       }
+       if len(overlap) > 0 {
+               return fmt.Errorf("conflict between removals and updates for 
keys: %v", overlap)
+       }
+       return nil
+}
+
+func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals 
[]string, updates iceberg.Properties) (iceberg.Properties, 
catalog.PropertiesUpdateSummary, error) {
+       if err := checkForOverlap(removals, updates); err != nil {
+               return nil, catalog.PropertiesUpdateSummary{}, err
+       }
+       var (
+               updatedProps = maps.Clone(currentProps)
+               removed      = make([]string, 0, len(removals))
+               updated      = make([]string, 0, len(updates))
+       )
+
+       for _, key := range removals {
+               if _, exists := updatedProps[key]; exists {
+                       delete(updatedProps, key)
+                       removed = append(removed, key)
+               }
+       }
+
+       for key, value := range updates {
+               if updatedProps[key] != value {
+                       updated = append(updated, key)
+                       updatedProps[key] = value
+               }
+       }
+
+       summary := catalog.PropertiesUpdateSummary{
+               Removed: removed,
+               Updated: updated,
+               Missing: iceberg.Difference(removals, removed),
+       }
+       return updatedProps, summary, nil
+}
diff --git a/catalog/glue_test.go b/catalog/glue/glue_test.go
similarity index 91%
rename from catalog/glue_test.go
rename to catalog/glue/glue_test.go
index c08f4cc..4c8aa48 100644
--- a/catalog/glue_test.go
+++ b/catalog/glue/glue_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package catalog
+package glue
 
 import (
        "context"
@@ -23,6 +23,7 @@ import (
        "os"
        "testing"
 
+       "github.com/apache/iceberg-go/catalog"
        "github.com/aws/aws-sdk-go-v2/aws"
        "github.com/aws/aws-sdk-go-v2/config"
        "github.com/aws/aws-sdk-go-v2/service/glue"
@@ -105,7 +106,7 @@ func TestGlueGetTable(t *testing.T) {
                Name:         aws.String("test_table"),
        }, mock.Anything).Return(&glue.GetTableOutput{Table: 
&testIcebergGlueTable}, nil)
 
-       glueCatalog := &GlueCatalog{
+       glueCatalog := &Catalog{
                glueSvc: mockGlueSvc,
        }
 
@@ -125,11 +126,11 @@ func TestGlueListTables(t *testing.T) {
                TableList: []types.Table{testIcebergGlueTable, 
testNonIcebergGlueTable},
        }, nil).Once()
 
-       glueCatalog := &GlueCatalog{
+       glueCatalog := &Catalog{
                glueSvc: mockGlueSvc,
        }
 
-       tables, err := glueCatalog.ListTables(context.TODO(), 
GlueDatabaseIdentifier("test_database"))
+       tables, err := glueCatalog.ListTables(context.TODO(), 
DatabaseIdentifier("test_database"))
        assert.NoError(err)
        assert.Len(tables, 1)
        assert.Equal([]string{"test_database", "test_table"}, tables[0])
@@ -155,7 +156,7 @@ func TestGlueListNamespaces(t *testing.T) {
                },
        }, nil).Once()
 
-       glueCatalog := &GlueCatalog{
+       glueCatalog := &Catalog{
                glueSvc: mockGlueSvc,
        }
 
@@ -182,11 +183,11 @@ func TestGlueDropTable(t *testing.T) {
                Name:         aws.String("test_table"),
        }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
 
-       glueCatalog := &GlueCatalog{
+       glueCatalog := &Catalog{
                glueSvc: mockGlueSvc,
        }
 
-       err := glueCatalog.DropTable(context.TODO(), 
GlueTableIdentifier("test_database", "test_table"))
+       err := glueCatalog.DropTable(context.TODO(), 
TableIdentifier("test_database", "test_table"))
        assert.NoError(err)
 }
 
@@ -206,7 +207,7 @@ func TestGlueCreateNamespace(t *testing.T) {
                },
        }, mock.Anything).Return(&glue.CreateDatabaseOutput{}, nil).Once()
 
-       glueCatalog := &GlueCatalog{
+       glueCatalog := &Catalog{
                glueSvc: mockGlueSvc,
        }
 
@@ -215,7 +216,7 @@ func TestGlueCreateNamespace(t *testing.T) {
                locationPropsKey:    "s3://test-location",
        }
 
-       err := glueCatalog.CreateNamespace(context.TODO(), 
GlueDatabaseIdentifier("test_namespace"), props)
+       err := glueCatalog.CreateNamespace(context.TODO(), 
DatabaseIdentifier("test_namespace"), props)
        assert.NoError(err)
 }
 
@@ -239,11 +240,11 @@ func TestGlueDropNamespace(t *testing.T) {
                Name: aws.String("test_namespace"),
        }, mock.Anything).Return(&glue.DeleteDatabaseOutput{}, nil).Once()
 
-       glueCatalog := &GlueCatalog{
+       glueCatalog := &Catalog{
                glueSvc: mockGlueSvc,
        }
 
-       err := glueCatalog.DropNamespace(context.TODO(), 
GlueDatabaseIdentifier("test_namespace"))
+       err := glueCatalog.DropNamespace(context.TODO(), 
DatabaseIdentifier("test_namespace"))
        assert.NoError(err)
 }
 
@@ -253,7 +254,7 @@ func TestGlueUpdateNamespaceProperties(t *testing.T) {
                initial     map[string]string
                updates     map[string]string
                removals    []string
-               expected    PropertiesUpdateSummary
+               expected    catalog.PropertiesUpdateSummary
                shouldError bool
        }{
                {
@@ -279,7 +280,7 @@ func TestGlueUpdateNamespaceProperties(t *testing.T) {
                                "key3": "value3",
                        },
                        removals: []string{"key4"},
-                       expected: PropertiesUpdateSummary{
+                       expected: catalog.PropertiesUpdateSummary{
                                Removed: []string{},
                                Updated: []string{"key3"},
                                Missing: []string{"key4"},
@@ -297,7 +298,7 @@ func TestGlueUpdateNamespaceProperties(t *testing.T) {
                                "key3": "value3",
                        },
                        removals: []string{},
-                       expected: PropertiesUpdateSummary{
+                       expected: catalog.PropertiesUpdateSummary{
                                Removed: []string{},
                                Updated: []string{"key3"},
                                Missing: []string{},
@@ -315,7 +316,7 @@ func TestGlueUpdateNamespaceProperties(t *testing.T) {
                                "key2": "new_value2",
                        },
                        removals: []string{"key4"},
-                       expected: PropertiesUpdateSummary{
+                       expected: catalog.PropertiesUpdateSummary{
                                Removed: []string{"key4"},
                                Updated: []string{"key2"},
                                Missing: []string{},
@@ -332,7 +333,7 @@ func TestGlueUpdateNamespaceProperties(t *testing.T) {
                                "key2": "new_value2",
                        },
                        removals: []string{},
-                       expected: PropertiesUpdateSummary{
+                       expected: catalog.PropertiesUpdateSummary{
                                Removed: []string{},
                                Updated: []string{"key2"},
                                Missing: []string{},
@@ -348,7 +349,7 @@ func TestGlueUpdateNamespaceProperties(t *testing.T) {
                        },
                        updates:  map[string]string{},
                        removals: []string{"key2", "key3"},
-                       expected: PropertiesUpdateSummary{
+                       expected: catalog.PropertiesUpdateSummary{
                                Removed: []string{"key2", "key3"},
                                Updated: []string{},
                                Missing: []string{},
@@ -378,11 +379,11 @@ func TestGlueUpdateNamespaceProperties(t *testing.T) {
                                mockGlueSvc.On("UpdateDatabase", mock.Anything, 
mock.Anything, mock.Anything).Return(&glue.UpdateDatabaseOutput{}, nil).Once()
                        }
 
-                       glueCatalog := &GlueCatalog{
+                       glueCatalog := &Catalog{
                                glueSvc: mockGlueSvc,
                        }
 
-                       summary, err := 
glueCatalog.UpdateNamespaceProperties(context.TODO(), 
GlueDatabaseIdentifier("test_namespace"), tt.removals, tt.updates)
+                       summary, err := 
glueCatalog.UpdateNamespaceProperties(context.TODO(), 
DatabaseIdentifier("test_namespace"), tt.removals, tt.updates)
                        if tt.shouldError {
                                assert.Error(err)
                        } else {
@@ -452,11 +453,11 @@ func TestGlueRenameTable(t *testing.T) {
                Name:         aws.String("test_table"),
        }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
 
-       glueCatalog := &GlueCatalog{
+       glueCatalog := &Catalog{
                glueSvc: mockGlueSvc,
        }
 
-       renamedTable, err := glueCatalog.RenameTable(context.TODO(), 
GlueTableIdentifier("test_database", "test_table"), 
GlueTableIdentifier("test_database", "new_test_table"))
+       renamedTable, err := glueCatalog.RenameTable(context.TODO(), 
TableIdentifier("test_database", "test_table"), 
TableIdentifier("test_database", "new_test_table"))
        assert.NoError(err)
        assert.Equal("new_test_table", renamedTable.Identifier()[1])
 }
@@ -506,11 +507,11 @@ func TestGlueRenameTable_DeleteTableFailureRollback(t 
*testing.T) {
                Name:         aws.String("new_test_table"),
        }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
 
-       glueCatalog := &GlueCatalog{
+       glueCatalog := &Catalog{
                glueSvc: mockGlueSvc,
        }
 
-       renamedTable, err := glueCatalog.RenameTable(context.TODO(), 
GlueTableIdentifier("test_database", "test_table"), 
GlueTableIdentifier("test_database", "new_test_table"))
+       renamedTable, err := glueCatalog.RenameTable(context.TODO(), 
TableIdentifier("test_database", "test_table"), 
TableIdentifier("test_database", "new_test_table"))
        assert.Error(err)
        assert.Nil(renamedTable)
        mockGlueSvc.AssertCalled(t, "DeleteTable", mock.Anything, 
&glue.DeleteTableInput{
@@ -531,9 +532,9 @@ func TestGlueListTablesIntegration(t *testing.T) {
        awscfg, err := config.LoadDefaultConfig(context.TODO(), 
config.WithClientLogMode(aws.LogRequest|aws.LogResponse))
        assert.NoError(err)
 
-       catalog := NewGlueCatalog(WithAwsConfig(awscfg))
+       catalog := NewCatalog(WithAwsConfig(awscfg))
 
-       tables, err := catalog.ListTables(context.TODO(), 
GlueDatabaseIdentifier(os.Getenv("TEST_DATABASE_NAME")))
+       tables, err := catalog.ListTables(context.TODO(), 
DatabaseIdentifier(os.Getenv("TEST_DATABASE_NAME")))
        assert.NoError(err)
        assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), 
os.Getenv("TEST_TABLE_NAME")}, tables[1])
 }
@@ -554,7 +555,7 @@ func TestGlueLoadTableIntegration(t *testing.T) {
        awscfg, err := config.LoadDefaultConfig(context.TODO(), 
config.WithClientLogMode(aws.LogRequest|aws.LogResponse))
        assert.NoError(err)
 
-       catalog := NewGlueCatalog(WithAwsConfig(awscfg))
+       catalog := NewCatalog(WithAwsConfig(awscfg))
 
        table, err := catalog.LoadTable(context.TODO(), 
[]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, nil)
        assert.NoError(err)
@@ -570,7 +571,7 @@ func TestGlueListNamespacesIntegration(t *testing.T) {
        awscfg, err := config.LoadDefaultConfig(context.TODO(), 
config.WithClientLogMode(aws.LogRequest|aws.LogResponse))
        assert.NoError(err)
 
-       catalog := NewGlueCatalog(WithAwsConfig(awscfg))
+       catalog := NewCatalog(WithAwsConfig(awscfg))
 
        namespaces, err := catalog.ListNamespaces(context.TODO(), nil)
        assert.NoError(err)
diff --git a/catalog/glue/options.go b/catalog/glue/options.go
new file mode 100644
index 0000000..5d4d6d0
--- /dev/null
+++ b/catalog/glue/options.go
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package glue
+
+import (
+       "github.com/aws/aws-sdk-go-v2/aws"
+)
+
+type AwsProperties map[string]string
+
+type Option func(*options)
+
+// WithAwsConfig sets the AWS configuration for the catalog.
+func WithAwsConfig(cfg aws.Config) Option {
+       return func(o *options) {
+               o.awsConfig = cfg
+       }
+}
+
+func WithAwsProperties(props AwsProperties) Option {
+       return func(o *options) {
+               o.awsProperties = props
+       }
+}
+
+type options struct {
+       awsConfig     aws.Config
+       awsProperties AwsProperties
+}
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
new file mode 100644
index 0000000..1ad2f97
--- /dev/null
+++ b/catalog/internal/utils.go
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table"
+)
+
+type CreateTableCfg struct {
+       Location      string
+       PartitionSpec *iceberg.PartitionSpec
+       SortOrder     table.SortOrder
+       Properties    iceberg.Properties
+}
diff --git a/catalog/registry_test.go b/catalog/registry_test.go
index 9bace89..1bc9271 100644
--- a/catalog/registry_test.go
+++ b/catalog/registry_test.go
@@ -26,6 +26,8 @@ import (
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
+       _ "github.com/apache/iceberg-go/catalog/glue"
+       "github.com/apache/iceberg-go/catalog/rest"
        "github.com/apache/iceberg-go/config"
        "github.com/stretchr/testify/assert"
 )
@@ -178,14 +180,14 @@ func TestRegistryFromConfig(t *testing.T) {
 
        c, err := catalog.Load("foobar", nil)
        assert.NoError(t, err)
-       assert.IsType(t, &catalog.RestCatalog{}, c)
-       assert.Equal(t, "foobar", c.(*catalog.RestCatalog).Name())
+       assert.IsType(t, &rest.Catalog{}, c)
+       assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
        assert.Equal(t, "catalog_name", params.Get("warehouse"))
 
        c, err = catalog.Load("foobar", iceberg.Properties{"warehouse": 
"overriden"})
        assert.NoError(t, err)
-       assert.IsType(t, &catalog.RestCatalog{}, c)
-       assert.Equal(t, "foobar", c.(*catalog.RestCatalog).Name())
+       assert.IsType(t, &rest.Catalog{}, c)
+       assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
        assert.Equal(t, "overriden", params.Get("warehouse"))
 
        srv.Close()
@@ -195,7 +197,7 @@ func TestRegistryFromConfig(t *testing.T) {
 
        c, err = catalog.Load("foobar", iceberg.Properties{"uri": srv2.URL})
        assert.NoError(t, err)
-       assert.IsType(t, &catalog.RestCatalog{}, c)
-       assert.Equal(t, "foobar", c.(*catalog.RestCatalog).Name())
+       assert.IsType(t, &rest.Catalog{}, c)
+       assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
        assert.Equal(t, "catalog_name", params.Get("warehouse"))
 }
diff --git a/catalog/rest/options.go b/catalog/rest/options.go
new file mode 100644
index 0000000..5aae122
--- /dev/null
+++ b/catalog/rest/options.go
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package rest
+
+import (
+       "crypto/tls"
+       "net/url"
+
+       "github.com/aws/aws-sdk-go-v2/aws"
+)
+
+type Option func(*options)
+
+func WithCredential(cred string) Option {
+       return func(o *options) {
+               o.credential = cred
+       }
+}
+
+func WithOAuthToken(token string) Option {
+       return func(o *options) {
+               o.oauthToken = token
+       }
+}
+
+func WithTLSConfig(config *tls.Config) Option {
+       return func(o *options) {
+               o.tlsConfig = config
+       }
+}
+
+func WithWarehouseLocation(loc string) Option {
+       return func(o *options) {
+               o.warehouseLocation = loc
+       }
+}
+
+func WithMetadataLocation(loc string) Option {
+       return func(o *options) {
+               o.metadataLocation = loc
+       }
+}
+
+func WithSigV4() Option {
+       return func(o *options) {
+               o.enableSigv4 = true
+               o.sigv4Service = "execute-api"
+       }
+}
+
+func WithSigV4RegionSvc(region, service string) Option {
+       return func(o *options) {
+               o.enableSigv4 = true
+               o.sigv4Region = region
+
+               if service == "" {
+                       o.sigv4Service = "execute-api"
+               } else {
+                       o.sigv4Service = service
+               }
+       }
+}
+
+func WithAuthURI(uri *url.URL) Option {
+       return func(o *options) {
+               o.authUri = uri
+       }
+}
+
+func WithPrefix(prefix string) Option {
+       return func(o *options) {
+               o.prefix = prefix
+       }
+}
+
+func WithAwsConfig(cfg aws.Config) Option {
+       return func(o *options) {
+               o.awsConfig = cfg
+       }
+}
+
+func WithScope(scope string) Option {
+       return func(o *options) {
+               o.scope = scope
+       }
+}
+
+type options struct {
+       awsConfig         aws.Config
+       tlsConfig         *tls.Config
+       credential        string
+       oauthToken        string
+       warehouseLocation string
+       metadataLocation  string
+       enableSigv4       bool
+       sigv4Region       string
+       sigv4Service      string
+       prefix            string
+       authUri           *url.URL
+       scope             string
+}
diff --git a/catalog/rest.go b/catalog/rest/rest.go
similarity index 79%
rename from catalog/rest.go
rename to catalog/rest/rest.go
index 3326a3d..5a5f151 100644
--- a/catalog/rest.go
+++ b/catalog/rest/rest.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package catalog
+package rest
 
 import (
        "bytes"
@@ -34,6 +34,8 @@ import (
        "time"
 
        "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/catalog/internal"
        iceio "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/aws/aws-sdk-go-v2/aws"
@@ -42,10 +44,15 @@ import (
 )
 
 var (
-       _ Catalog = (*RestCatalog)(nil)
+       _ catalog.Catalog = (*Catalog)(nil)
 )
 
 const (
+       keyOauthToken        = "token"
+       keyWarehouseLocation = "warehouse"
+       keyMetadataLocation  = "metadata_location"
+       keyOauthCredential   = "credential"
+
        authorizationHeader = "Authorization"
        bearerPrefix        = "Bearer"
        namespaceSeparator  = "\x1F"
@@ -74,13 +81,13 @@ var (
 )
 
 func init() {
-       reg := RegistrarFunc(func(name string, p iceberg.Properties) (Catalog, 
error) {
-               return newRestCatalogFromProps(name, p.Get("uri", ""), p)
+       reg := catalog.RegistrarFunc(func(name string, p iceberg.Properties) 
(catalog.Catalog, error) {
+               return newCatalogFromProps(name, p.Get("uri", ""), p)
        })
 
-       Register(string(REST), reg)
-       Register("http", reg)
-       Register("https", reg)
+       catalog.Register(string(catalog.REST), reg)
+       catalog.Register("http", reg)
+       catalog.Register("https", reg)
 }
 
 type errorResponse struct {
@@ -349,17 +356,6 @@ func handleNon200(rsp *http.Response, override 
map[int]error) error {
        return e
 }
 
-func ToRestIdentifier(ident ...string) table.Identifier {
-       if len(ident) == 1 {
-               if ident[0] == "" {
-                       return nil
-               }
-               return table.Identifier(strings.Split(ident[0], "."))
-       }
-
-       return table.Identifier(ident)
-}
-
 func fromProps(props iceberg.Properties) *options {
        o := &options{}
        for k, v := range props {
@@ -426,7 +422,7 @@ func toProps(o *options) iceberg.Properties {
        return props
 }
 
-type RestCatalog struct {
+type Catalog struct {
        baseURI *url.URL
        cl      *http.Client
 
@@ -434,10 +430,10 @@ type RestCatalog struct {
        props iceberg.Properties
 }
 
-func newRestCatalogFromProps(name string, uri string, p iceberg.Properties) 
(*RestCatalog, error) {
+func newCatalogFromProps(name string, uri string, p iceberg.Properties) 
(*Catalog, error) {
        ops := fromProps(p)
 
-       r := &RestCatalog{name: name}
+       r := &Catalog{name: name}
        if err := r.init(ops, uri); err != nil {
                return nil, err
        }
@@ -445,13 +441,13 @@ func newRestCatalogFromProps(name string, uri string, p 
iceberg.Properties) (*Re
        return r, nil
 }
 
-func NewRestCatalog(name, uri string, opts ...Option[RestCatalog]) 
(*RestCatalog, error) {
+func NewCatalog(name, uri string, opts ...Option) (*Catalog, error) {
        ops := &options{}
        for _, o := range opts {
                o(ops)
        }
 
-       r := &RestCatalog{name: name}
+       r := &Catalog{name: name}
        if err := r.init(ops, uri); err != nil {
                return nil, err
        }
@@ -459,7 +455,7 @@ func NewRestCatalog(name, uri string, opts 
...Option[RestCatalog]) (*RestCatalog
        return r, nil
 }
 
-func (r *RestCatalog) init(ops *options, uri string) error {
+func (r *Catalog) init(ops *options, uri string) error {
        baseuri, err := url.Parse(uri)
        if err != nil {
                return err
@@ -477,7 +473,7 @@ func (r *RestCatalog) init(ops *options, uri string) error {
        return nil
 }
 
-func (r *RestCatalog) fetchAccessToken(cl *http.Client, creds string, opts 
*options) (string, error) {
+func (r *Catalog) fetchAccessToken(cl *http.Client, creds string, opts 
*options) (string, error) {
        clientID, clientSecret, hasID := strings.Cut(creds, ":")
        if !hasID {
                clientID, clientSecret = "", clientID
@@ -530,7 +526,7 @@ func (r *RestCatalog) fetchAccessToken(cl *http.Client, 
creds string, opts *opti
        }
 }
 
-func (r *RestCatalog) createSession(opts *options) (*http.Client, error) {
+func (r *Catalog) createSession(opts *options) (*http.Client, error) {
        session := &sessionTransport{
                Transport:      http.Transport{TLSClientConfig: opts.tlsConfig},
                defaultHeaders: http.Header{},
@@ -571,7 +567,7 @@ func (r *RestCatalog) createSession(opts *options) 
(*http.Client, error) {
        return cl, nil
 }
 
-func (r *RestCatalog) fetchConfig(opts *options) (*http.Client, *options, 
error) {
+func (r *Catalog) fetchConfig(opts *options) (*http.Client, *options, error) {
        params := url.Values{}
        if opts.warehouseLocation != "" {
                params.Set(keyWarehouseLocation, opts.warehouseLocation)
@@ -612,17 +608,17 @@ func (r *RestCatalog) fetchConfig(opts *options) 
(*http.Client, *options, error)
        return sess, o, nil
 }
 
-func (r *RestCatalog) Name() string             { return r.name }
-func (r *RestCatalog) CatalogType() CatalogType { return REST }
+func (r *Catalog) Name() string              { return r.name }
+func (r *Catalog) CatalogType() catalog.Type { return catalog.REST }
 
 func checkValidNamespace(ident table.Identifier) error {
        if len(ident) < 1 {
-               return fmt.Errorf("%w: empty namespace identifier", 
ErrNoSuchNamespace)
+               return fmt.Errorf("%w: empty namespace identifier", 
catalog.ErrNoSuchNamespace)
        }
        return nil
 }
 
-func (r *RestCatalog) tableFromResponse(identifier []string, metadata 
table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
+func (r *Catalog) tableFromResponse(identifier []string, metadata 
table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
        id := identifier
        if r.name != "" {
                id = append([]string{r.name}, identifier...)
@@ -636,7 +632,7 @@ func (r *RestCatalog) tableFromResponse(identifier 
[]string, metadata table.Meta
        return table.New(id, metadata, loc, iofs), nil
 }
 
-func (r *RestCatalog) ListTables(ctx context.Context, namespace 
table.Identifier) ([]table.Identifier, error) {
+func (r *Catalog) ListTables(ctx context.Context, namespace table.Identifier) 
([]table.Identifier, error) {
        if err := checkValidNamespace(namespace); err != nil {
                return nil, err
        }
@@ -647,7 +643,7 @@ func (r *RestCatalog) ListTables(ctx context.Context, 
namespace table.Identifier
        type resp struct {
                Identifiers []identifier `json:"identifiers"`
        }
-       rsp, err := doGet[resp](ctx, r.baseURI, path, r.cl, 
map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
+       rsp, err := doGet[resp](ctx, r.baseURI, path, r.cl, 
map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})
        if err != nil {
                return nil, err
        }
@@ -662,19 +658,19 @@ func (r *RestCatalog) ListTables(ctx context.Context, 
namespace table.Identifier
 func splitIdentForPath(ident table.Identifier) (string, string, error) {
        if len(ident) < 1 {
                return "", "", fmt.Errorf("%w: missing namespace or invalid 
identifier %v",
-                       ErrNoSuchTable, strings.Join(ident, "."))
+                       catalog.ErrNoSuchTable, strings.Join(ident, "."))
        }
 
-       return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), 
TableNameFromIdent(ident), nil
+       return strings.Join(catalog.NamespaceFromIdent(ident), 
namespaceSeparator), catalog.TableNameFromIdent(ident), nil
 }
 
-func (r *RestCatalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...createTableOpt) 
(*table.Table, error) {
+func (r *Catalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) 
(*table.Table, error) {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
                return nil, err
        }
 
-       var cfg createTableCfg
+       var cfg internal.CreateTableCfg
        for _, o := range opts {
                o(&cfg)
        }
@@ -684,12 +680,12 @@ func (r *RestCatalog) CreateTable(ctx context.Context, 
identifier table.Identifi
                return nil, err
        }
 
-       freshPartitionSpec, err := 
iceberg.AssignFreshPartitionSpecIDs(cfg.partitionSpec, schema, freshSchema)
+       freshPartitionSpec, err := 
iceberg.AssignFreshPartitionSpecIDs(cfg.PartitionSpec, schema, freshSchema)
        if err != nil {
                return nil, err
        }
 
-       freshSortOrder, err := table.AssignFreshSortOrderIDs(cfg.sortOrder, 
schema, freshSchema)
+       freshSortOrder, err := table.AssignFreshSortOrderIDs(cfg.SortOrder, 
schema, freshSchema)
        if err != nil {
                return nil, err
        }
@@ -697,15 +693,15 @@ func (r *RestCatalog) CreateTable(ctx context.Context, 
identifier table.Identifi
        payload := createTableRequest{
                Name:          tbl,
                Schema:        freshSchema,
-               Location:      cfg.location,
+               Location:      cfg.Location,
                PartitionSpec: &freshPartitionSpec,
                WriteOrder:    &freshSortOrder,
                StageCreate:   false,
-               Props:         cfg.properties,
+               Props:         cfg.Properties,
        }
 
        ret, err := doPost[createTableRequest, loadTableResponse](ctx, 
r.baseURI, []string{"namespaces", ns, "tables"}, payload,
-               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, 
http.StatusConflict: ErrTableAlreadyExists})
+               r.cl, map[int]error{http.StatusNotFound: 
catalog.ErrNoSuchNamespace, http.StatusConflict: catalog.ErrTableAlreadyExists})
        if err != nil {
                return nil, err
        }
@@ -717,7 +713,7 @@ func (r *RestCatalog) CreateTable(ctx context.Context, 
identifier table.Identifi
        return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, 
config)
 }
 
-func (r *RestCatalog) CommitTable(ctx context.Context, tbl *table.Table, 
requirements []table.Requirement, updates []table.Update) (table.Metadata, 
string, error) {
+func (r *Catalog) CommitTable(ctx context.Context, tbl *table.Table, 
requirements []table.Requirement, updates []table.Update) (table.Metadata, 
string, error) {
        ident := tbl.Identifier()
 
        ns, tblName, err := splitIdentForPath(ident)
@@ -726,7 +722,7 @@ func (r *RestCatalog) CommitTable(ctx context.Context, tbl 
*table.Table, require
        }
 
        restIdentifier := identifier{
-               Namespace: NamespaceFromIdent(ident),
+               Namespace: catalog.NamespaceFromIdent(ident),
                Name:      tblName,
        }
 
@@ -738,7 +734,7 @@ func (r *RestCatalog) CommitTable(ctx context.Context, tbl 
*table.Table, require
 
        ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tblName},
                payload{Identifier: restIdentifier, Requirements: requirements, 
Updates: updates}, r.cl,
-               map[int]error{http.StatusNotFound: ErrNoSuchTable, 
http.StatusConflict: ErrCommitFailed})
+               map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable, 
http.StatusConflict: ErrCommitFailed})
        if err != nil {
                return nil, "", err
        }
@@ -748,7 +744,7 @@ func (r *RestCatalog) CommitTable(ctx context.Context, tbl 
*table.Table, require
        return ret.Metadata, ret.MetadataLoc, nil
 }
 
-func (r *RestCatalog) RegisterTable(ctx context.Context, identifier 
table.Identifier, metadataLoc string) (*table.Table, error) {
+func (r *Catalog) RegisterTable(ctx context.Context, identifier 
table.Identifier, metadataLoc string) (*table.Table, error) {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
                return nil, err
@@ -760,7 +756,8 @@ func (r *RestCatalog) RegisterTable(ctx context.Context, 
identifier table.Identi
        }
 
        ret, err := doPost[payload, loadTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl},
-               payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl, 
map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: 
ErrTableAlreadyExists})
+               payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl, 
map[int]error{
+                       http.StatusNotFound: catalog.ErrNoSuchNamespace, 
http.StatusConflict: catalog.ErrTableAlreadyExists})
        if err != nil {
                return nil, err
        }
@@ -771,14 +768,14 @@ func (r *RestCatalog) RegisterTable(ctx context.Context, 
identifier table.Identi
        return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, 
config)
 }
 
-func (r *RestCatalog) LoadTable(ctx context.Context, identifier 
table.Identifier, props iceberg.Properties) (*table.Table, error) {
+func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, 
props iceberg.Properties) (*table.Table, error) {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
                return nil, err
        }
 
        ret, err := doGet[loadTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl},
-               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
+               r.cl, map[int]error{http.StatusNotFound: 
catalog.ErrNoSuchTable})
        if err != nil {
                return nil, err
        }
@@ -793,14 +790,14 @@ func (r *RestCatalog) LoadTable(ctx context.Context, 
identifier table.Identifier
        return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, 
config)
 }
 
-func (r *RestCatalog) UpdateTable(ctx context.Context, ident table.Identifier, 
requirements []table.Requirement, updates []table.Update) (*table.Table, error) 
{
+func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier, 
requirements []table.Requirement, updates []table.Update) (*table.Table, error) 
{
        ns, tbl, err := splitIdentForPath(ident)
        if err != nil {
                return nil, err
        }
 
        restIdentifier := identifier{
-               Namespace: NamespaceFromIdent(ident),
+               Namespace: catalog.NamespaceFromIdent(ident),
                Name:      tbl,
        }
        type payload struct {
@@ -810,7 +807,7 @@ func (r *RestCatalog) UpdateTable(ctx context.Context, 
ident table.Identifier, r
        }
        ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl},
                payload{Identifier: restIdentifier, Requirements: requirements, 
Updates: updates}, r.cl,
-               map[int]error{http.StatusNotFound: ErrNoSuchTable, 
http.StatusConflict: ErrCommitFailed})
+               map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable, 
http.StatusConflict: ErrCommitFailed})
        if err != nil {
                return nil, err
        }
@@ -821,19 +818,19 @@ func (r *RestCatalog) UpdateTable(ctx context.Context, 
ident table.Identifier, r
        return r.tableFromResponse(ident, ret.Metadata, ret.MetadataLoc, config)
 }
 
-func (r *RestCatalog) DropTable(ctx context.Context, identifier 
table.Identifier) error {
+func (r *Catalog) DropTable(ctx context.Context, identifier table.Identifier) 
error {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
                return err
        }
 
        _, err = doDelete[struct{}](ctx, r.baseURI, []string{"namespaces", ns, 
"tables", tbl}, r.cl,
-               map[int]error{http.StatusNotFound: ErrNoSuchTable})
+               map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable})
 
        return err
 }
 
-func (r *RestCatalog) PurgeTable(ctx context.Context, identifier 
table.Identifier) error {
+func (r *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier) 
error {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
                return err
@@ -845,27 +842,27 @@ func (r *RestCatalog) PurgeTable(ctx context.Context, 
identifier table.Identifie
        uri.RawQuery = v.Encode()
 
        _, err = doDelete[struct{}](ctx, uri, []string{}, r.cl,
-               map[int]error{http.StatusNotFound: ErrNoSuchTable})
+               map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable})
 
        return err
 }
 
-func (r *RestCatalog) RenameTable(ctx context.Context, from, to 
table.Identifier) (*table.Table, error) {
+func (r *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) 
(*table.Table, error) {
        type payload struct {
                From identifier `json:"from"`
                To   identifier `json:"to"`
        }
        f := identifier{
-               Namespace: NamespaceFromIdent(from),
-               Name:      TableNameFromIdent(from),
+               Namespace: catalog.NamespaceFromIdent(from),
+               Name:      catalog.TableNameFromIdent(from),
        }
        t := identifier{
-               Namespace: NamespaceFromIdent(to),
-               Name:      TableNameFromIdent(to),
+               Namespace: catalog.NamespaceFromIdent(to),
+               Name:      catalog.TableNameFromIdent(to),
        }
 
        _, err := doPost[payload, any](ctx, r.baseURI, []string{"tables", 
"rename"}, payload{From: f, To: t}, r.cl,
-               map[int]error{http.StatusNotFound: ErrNoSuchTable})
+               map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable})
        if err != nil {
                return nil, err
        }
@@ -873,29 +870,29 @@ func (r *RestCatalog) RenameTable(ctx context.Context, 
from, to table.Identifier
        return r.LoadTable(ctx, to, nil)
 }
 
-func (r *RestCatalog) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
+func (r *Catalog) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
        if err := checkValidNamespace(namespace); err != nil {
                return err
        }
 
        _, err := doPost[map[string]any, struct{}](ctx, r.baseURI, 
[]string{"namespaces"},
                map[string]any{"namespace": namespace, "properties": props}, 
r.cl, map[int]error{
-                       http.StatusNotFound: ErrNoSuchNamespace, 
http.StatusConflict: ErrNamespaceAlreadyExists})
+                       http.StatusNotFound: catalog.ErrNoSuchNamespace, 
http.StatusConflict: catalog.ErrNamespaceAlreadyExists})
        return err
 }
 
-func (r *RestCatalog) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
+func (r *Catalog) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
        if err := checkValidNamespace(namespace); err != nil {
                return err
        }
 
        _, err := doDelete[struct{}](ctx, r.baseURI, []string{"namespaces", 
strings.Join(namespace, namespaceSeparator)},
-               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
+               r.cl, map[int]error{http.StatusNotFound: 
catalog.ErrNoSuchNamespace})
 
        return err
 }
 
-func (r *RestCatalog) ListNamespaces(ctx context.Context, parent 
table.Identifier) ([]table.Identifier, error) {
+func (r *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) 
([]table.Identifier, error) {
        uri := r.baseURI.JoinPath("namespaces")
        if len(parent) != 0 {
                v := url.Values{}
@@ -907,7 +904,7 @@ func (r *RestCatalog) ListNamespaces(ctx context.Context, 
parent table.Identifie
                Namespaces []table.Identifier `json:"namespaces"`
        }
 
-       rsp, err := doGet[rsptype](ctx, uri, []string{}, r.cl, 
map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
+       rsp, err := doGet[rsptype](ctx, uri, []string{}, r.cl, 
map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})
        if err != nil {
                return nil, err
        }
@@ -915,7 +912,7 @@ func (r *RestCatalog) ListNamespaces(ctx context.Context, 
parent table.Identifie
        return rsp.Namespaces, nil
 }
 
-func (r *RestCatalog) LoadNamespaceProperties(ctx context.Context, namespace 
table.Identifier) (iceberg.Properties, error) {
+func (r *Catalog) LoadNamespaceProperties(ctx context.Context, namespace 
table.Identifier) (iceberg.Properties, error) {
        if err := checkValidNamespace(namespace); err != nil {
                return nil, err
        }
@@ -926,7 +923,7 @@ func (r *RestCatalog) LoadNamespaceProperties(ctx 
context.Context, namespace tab
        }
 
        rsp, err := doGet[nsresponse](ctx, r.baseURI, []string{"namespaces", 
strings.Join(namespace, namespaceSeparator)},
-               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
+               r.cl, map[int]error{http.StatusNotFound: 
catalog.ErrNoSuchNamespace})
        if err != nil {
                return nil, err
        }
@@ -934,11 +931,11 @@ func (r *RestCatalog) LoadNamespaceProperties(ctx 
context.Context, namespace tab
        return rsp.Props, nil
 }
 
-func (r *RestCatalog) UpdateNamespaceProperties(ctx context.Context, namespace 
table.Identifier,
-       removals []string, updates iceberg.Properties) 
(PropertiesUpdateSummary, error) {
+func (r *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace 
table.Identifier,
+       removals []string, updates iceberg.Properties) 
(catalog.PropertiesUpdateSummary, error) {
 
        if err := checkValidNamespace(namespace); err != nil {
-               return PropertiesUpdateSummary{}, err
+               return catalog.PropertiesUpdateSummary{}, err
        }
 
        type payload struct {
@@ -947,19 +944,19 @@ func (r *RestCatalog) UpdateNamespaceProperties(ctx 
context.Context, namespace t
        }
 
        ns := strings.Join(namespace, namespaceSeparator)
-       return doPost[payload, PropertiesUpdateSummary](ctx, r.baseURI, 
[]string{"namespaces", ns, "properties"},
-               payload{Remove: removals, Updates: updates}, r.cl, 
map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
+       return doPost[payload, catalog.PropertiesUpdateSummary](ctx, r.baseURI, 
[]string{"namespaces", ns, "properties"},
+               payload{Remove: removals, Updates: updates}, r.cl, 
map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})
 }
 
-func (r *RestCatalog) CheckNamespaceExists(ctx context.Context, namespace 
table.Identifier) (bool, error) {
+func (r *Catalog) CheckNamespaceExists(ctx context.Context, namespace 
table.Identifier) (bool, error) {
        if err := checkValidNamespace(namespace); err != nil {
                return false, err
        }
 
        _, err := doGet[struct{}](ctx, r.baseURI, []string{"namespaces", 
strings.Join(namespace, namespaceSeparator)},
-               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
+               r.cl, map[int]error{http.StatusNotFound: 
catalog.ErrNoSuchNamespace})
        if err != nil {
-               if errors.Is(err, ErrNoSuchNamespace) {
+               if errors.Is(err, catalog.ErrNoSuchNamespace) {
                        return false, nil
                }
                return false, err
diff --git a/catalog/rest_internal_test.go b/catalog/rest/rest_internal_test.go
similarity index 97%
rename from catalog/rest_internal_test.go
rename to catalog/rest/rest_internal_test.go
index a03e2a5..126b6bd 100644
--- a/catalog/rest_internal_test.go
+++ b/catalog/rest/rest_internal_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package catalog
+package rest
 
 import (
        "encoding/json"
@@ -59,7 +59,7 @@ func TestAuthHeader(t *testing.T) {
                })
        })
 
-       cat, err := NewRestCatalog("rest", srv.URL,
+       cat, err := NewCatalog("rest", srv.URL,
                WithCredential("client:secret"))
        require.NoError(t, err)
        assert.NotNil(t, cat)
@@ -107,7 +107,7 @@ func TestAuthUriHeader(t *testing.T) {
 
        authUri, err := url.Parse(srv.URL)
        require.NoError(t, err)
-       cat, err := NewRestCatalog("rest", srv.URL,
+       cat, err := NewCatalog("rest", srv.URL,
                WithCredential("client:secret"), 
WithAuthURI(authUri.JoinPath("auth-token-url")))
        require.NoError(t, err)
        assert.NotNil(t, cat)
diff --git a/catalog/rest_test.go b/catalog/rest/rest_test.go
similarity index 86%
rename from catalog/rest_test.go
rename to catalog/rest/rest_test.go
index 2d0302c..1a86dea 100644
--- a/catalog/rest_test.go
+++ b/catalog/rest/rest_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package catalog_test
+package rest_test
 
 import (
        "context"
@@ -30,6 +30,7 @@ import (
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/catalog/rest"
        "github.com/apache/iceberg-go/table"
        "github.com/stretchr/testify/suite"
 )
@@ -106,10 +107,10 @@ func (r *RestCatalogSuite) TestToken200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL,
-               catalog.WithWarehouseLocation("s3://some-bucket"),
-               catalog.WithCredential(TestCreds),
-               catalog.WithScope(scope))
+       cat, err := rest.NewCatalog("rest", r.srv.URL,
+               rest.WithWarehouseLocation("s3://some-bucket"),
+               rest.WithCredential(TestCreds),
+               rest.WithScope(scope))
        r.Require().NoError(err)
 
        r.NotNil(cat)
@@ -164,11 +165,11 @@ func (r *RestCatalogSuite) TestToken400() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithCredential(TestCreds))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithCredential(TestCreds))
        r.Nil(cat)
 
-       r.ErrorIs(err, catalog.ErrRESTError)
-       r.ErrorIs(err, catalog.ErrOAuthError)
+       r.ErrorIs(err, rest.ErrRESTError)
+       r.ErrorIs(err, rest.ErrOAuthError)
        r.ErrorContains(err, "invalid_client: credentials for key invalid_key 
do not match")
 }
 
@@ -197,9 +198,9 @@ func (r *RestCatalogSuite) TestToken200AuthUrl() {
 
        authUri, err := url.Parse(r.srv.URL)
        r.Require().NoError(err)
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL,
-               catalog.WithWarehouseLocation("s3://some-bucket"),
-               catalog.WithCredential(TestCreds), 
catalog.WithAuthURI(authUri.JoinPath("auth-token-url")))
+       cat, err := rest.NewCatalog("rest", r.srv.URL,
+               rest.WithWarehouseLocation("s3://some-bucket"),
+               rest.WithCredential(TestCreds), 
rest.WithAuthURI(authUri.JoinPath("auth-token-url")))
 
        r.Require().NoError(err)
 
@@ -221,11 +222,11 @@ func (r *RestCatalogSuite) TestToken401() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithCredential(TestCreds))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithCredential(TestCreds))
        r.Nil(cat)
 
-       r.ErrorIs(err, catalog.ErrRESTError)
-       r.ErrorIs(err, catalog.ErrOAuthError)
+       r.ErrorIs(err, rest.ErrRESTError)
+       r.ErrorIs(err, rest.ErrOAuthError)
        r.ErrorContains(err, "invalid_client: credentials for key invalid_key 
do not match")
 }
 
@@ -248,10 +249,10 @@ func (r *RestCatalogSuite) TestListTables200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       tables, err := cat.ListTables(context.Background(), 
catalog.ToRestIdentifier(namespace))
+       tables, err := cat.ListTables(context.Background(), 
catalog.ToIdentifier(namespace))
        r.Require().NoError(err)
        r.Equal([]table.Identifier{{"examples", "fooshare"}}, tables)
 }
@@ -297,16 +298,16 @@ func (r *RestCatalogSuite) TestListTablesPrefixed200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL,
-               catalog.WithPrefix("prefix"),
-               catalog.WithWarehouseLocation("s3://some-bucket"),
-               catalog.WithCredential(TestCreds))
+       cat, err := rest.NewCatalog("rest", r.srv.URL,
+               rest.WithPrefix("prefix"),
+               rest.WithWarehouseLocation("s3://some-bucket"),
+               rest.WithCredential(TestCreds))
        r.Require().NoError(err)
 
        r.NotNil(cat)
        r.Equal(r.configVals.Get("warehouse"), "s3://some-bucket")
 
-       tables, err := cat.ListTables(context.Background(), 
catalog.ToRestIdentifier(namespace))
+       tables, err := cat.ListTables(context.Background(), 
catalog.ToIdentifier(namespace))
        r.Require().NoError(err)
        r.Equal([]table.Identifier{{"examples", "fooshare"}}, tables)
 }
@@ -330,10 +331,10 @@ func (r *RestCatalogSuite) TestListTables404() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       _, err = cat.ListTables(context.Background(), 
catalog.ToRestIdentifier(namespace))
+       _, err = cat.ListTables(context.Background(), 
catalog.ToIdentifier(namespace))
        r.ErrorIs(err, catalog.ErrNoSuchNamespace)
        r.ErrorContains(err, "Namespace does not exist: personal in warehouse 
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e")
 }
@@ -353,7 +354,7 @@ func (r *RestCatalogSuite) TestListNamespaces200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
        results, err := cat.ListNamespaces(context.Background(), nil)
@@ -378,10 +379,10 @@ func (r *RestCatalogSuite) 
TestListNamespaceWithParent200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       results, err := cat.ListNamespaces(context.Background(), 
catalog.ToRestIdentifier("accounting"))
+       results, err := cat.ListNamespaces(context.Background(), 
catalog.ToIdentifier("accounting"))
        r.Require().NoError(err)
 
        r.Equal([]table.Identifier{{"accounting", "tax"}}, results)
@@ -405,10 +406,10 @@ func (r *RestCatalogSuite) TestListNamespaces400() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       _, err = cat.ListNamespaces(context.Background(), 
catalog.ToRestIdentifier("accounting"))
+       _, err = cat.ListNamespaces(context.Background(), 
catalog.ToIdentifier("accounting"))
        r.ErrorIs(err, catalog.ErrNoSuchNamespace)
        r.ErrorContains(err, "Namespace does not exist: personal in warehouse 
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e")
 }
@@ -438,10 +439,10 @@ func (r *RestCatalogSuite) TestCreateNamespace200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       r.Require().NoError(cat.CreateNamespace(context.Background(), 
catalog.ToRestIdentifier("leden"), nil))
+       r.Require().NoError(cat.CreateNamespace(context.Background(), 
catalog.ToIdentifier("leden"), nil))
 }
 
 func (r *RestCatalogSuite) TestCreateNamespaceWithProps200() {
@@ -469,10 +470,10 @@ func (r *RestCatalogSuite) 
TestCreateNamespaceWithProps200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       r.Require().NoError(cat.CreateNamespace(context.Background(), 
catalog.ToRestIdentifier("leden"), iceberg.Properties{"foo": "bar", "super": 
"duper"}))
+       r.Require().NoError(cat.CreateNamespace(context.Background(), 
catalog.ToIdentifier("leden"), iceberg.Properties{"foo": "bar", "super": 
"duper"}))
 }
 
 func (r *RestCatalogSuite) TestCreateNamespace409() {
@@ -505,10 +506,10 @@ func (r *RestCatalogSuite) TestCreateNamespace409() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       err = cat.CreateNamespace(context.Background(), 
catalog.ToRestIdentifier("fokko"), nil)
+       err = cat.CreateNamespace(context.Background(), 
catalog.ToIdentifier("fokko"), nil)
        r.ErrorIs(err, catalog.ErrNamespaceAlreadyExists)
        r.ErrorContains(err, "fokko in warehouse")
 }
@@ -524,10 +525,10 @@ func (r *RestCatalogSuite) TestDropNamespace204() {
                w.WriteHeader(http.StatusNoContent)
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       r.NoError(cat.DropNamespace(context.Background(), 
catalog.ToRestIdentifier("examples")))
+       r.NoError(cat.DropNamespace(context.Background(), 
catalog.ToIdentifier("examples")))
 }
 
 func (r *RestCatalogSuite) TestDropNamespace404() {
@@ -548,10 +549,10 @@ func (r *RestCatalogSuite) TestDropNamespace404() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       err = cat.DropNamespace(context.Background(), 
catalog.ToRestIdentifier("examples"))
+       err = cat.DropNamespace(context.Background(), 
catalog.ToIdentifier("examples"))
        r.ErrorIs(err, catalog.ErrNoSuchNamespace)
        r.ErrorContains(err, "examples in warehouse")
 }
@@ -571,10 +572,10 @@ func (r *RestCatalogSuite) TestLoadNamespaceProps200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       props, err := cat.LoadNamespaceProperties(context.Background(), 
catalog.ToRestIdentifier("leden"))
+       props, err := cat.LoadNamespaceProperties(context.Background(), 
catalog.ToIdentifier("leden"))
        r.Require().NoError(err)
        r.Equal(iceberg.Properties{"prop": "yes"}, props)
 }
@@ -597,10 +598,10 @@ func (r *RestCatalogSuite) TestLoadNamespaceProps404() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       _, err = cat.LoadNamespaceProperties(context.Background(), 
catalog.ToRestIdentifier("leden"))
+       _, err = cat.LoadNamespaceProperties(context.Background(), 
catalog.ToIdentifier("leden"))
        r.ErrorIs(err, catalog.ErrNoSuchNamespace)
        r.ErrorContains(err, "Namespace does not exist: fokko22 in warehouse")
 }
@@ -620,7 +621,7 @@ func (r *RestCatalogSuite) TestUpdateNamespaceProps200() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
        summary, err := cat.UpdateNamespaceProperties(context.Background(), 
table.Identifier([]string{"fokko"}),
@@ -652,7 +653,7 @@ func (r *RestCatalogSuite) TestUpdateNamespaceProps404() {
                })
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
        _, err = cat.UpdateNamespaceProperties(context.Background(),
@@ -738,17 +739,17 @@ func (r *RestCatalogSuite) TestCreateTable200() {
 
        t := createTableRestExample
        _ = t
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
        tbl, err := cat.CreateTable(
                context.Background(),
-               catalog.ToRestIdentifier("fokko", "fokko2"),
+               catalog.ToIdentifier("fokko", "fokko2"),
                tableSchemaSimple,
        )
        r.Require().NoError(err)
 
-       r.Equal(catalog.ToRestIdentifier("rest", "fokko", "fokko2"), 
tbl.Identifier())
+       r.Equal(catalog.ToIdentifier("rest", "fokko", "fokko2"), 
tbl.Identifier())
        r.Equal("s3://warehouse/database/table/metadata.json", 
tbl.MetadataLocation())
        r.EqualValues(1, tbl.Metadata().Version())
        r.Equal("bf289591-dcc0-4234-ad4f-5c3eed811a29", 
tbl.Metadata().TableUUID().String())
@@ -780,13 +781,13 @@ func (r *RestCatalogSuite) TestCreateTable409() {
                json.NewEncoder(w).Encode(errorResponse)
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
        // Attempt to create table with properties
        _, err = cat.CreateTable(
                context.Background(),
-               catalog.ToRestIdentifier("fokko", "fokko2"),
+               catalog.ToIdentifier("fokko", "fokko2"),
                tableSchemaSimple,
                catalog.WithProperties(map[string]string{"owner": "fokko"}),
        )
@@ -874,13 +875,13 @@ func (r *RestCatalogSuite) TestLoadTable200() {
                }`))
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       tbl, err := cat.LoadTable(context.Background(), 
catalog.ToRestIdentifier("fokko", "table"), nil)
+       tbl, err := cat.LoadTable(context.Background(), 
catalog.ToIdentifier("fokko", "table"), nil)
        r.Require().NoError(err)
 
-       r.Equal(catalog.ToRestIdentifier("rest", "fokko", "table"), 
tbl.Identifier())
+       r.Equal(catalog.ToIdentifier("rest", "fokko", "table"), 
tbl.Identifier())
        
r.Equal("s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
 tbl.MetadataLocation())
        r.EqualValues(1, tbl.Metadata().Version())
        r.Equal("b55d9dda-6561-423a-8bfc-787980ce421f", 
tbl.Metadata().TableUUID().String())
@@ -955,16 +956,16 @@ func (r *RestCatalogSuite) TestRenameTable200() {
                w.Write([]byte(createTableRestExample))
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       fromIdent := catalog.ToRestIdentifier("fokko", "source")
-       toIdent := catalog.ToRestIdentifier("fokko", "destination")
+       fromIdent := catalog.ToIdentifier("fokko", "source")
+       toIdent := catalog.ToIdentifier("fokko", "destination")
 
        renamedTable, err := cat.RenameTable(context.Background(), fromIdent, 
toIdent)
        r.Require().NoError(err)
 
-       r.Equal(catalog.ToRestIdentifier("rest", "fokko", "destination"), 
renamedTable.Identifier())
+       r.Equal(catalog.ToIdentifier("rest", "fokko", "destination"), 
renamedTable.Identifier())
        r.Equal("s3://warehouse/database/table/metadata.json", 
renamedTable.MetadataLocation())
        r.EqualValues(1, renamedTable.Metadata().Version())
        r.Equal("bf289591-dcc0-4234-ad4f-5c3eed811a29", 
renamedTable.Metadata().TableUUID().String())
@@ -989,10 +990,10 @@ func (r *RestCatalogSuite) TestDropTable204() {
                w.WriteHeader(http.StatusNoContent)
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       err = cat.DropTable(context.Background(), 
catalog.ToRestIdentifier("fokko", "table"))
+       err = cat.DropTable(context.Background(), catalog.ToIdentifier("fokko", 
"table"))
        r.NoError(err)
 }
 
@@ -1016,10 +1017,10 @@ func (r *RestCatalogSuite) TestDropTable404() {
                json.NewEncoder(w).Encode(errorResponse)
        })
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Require().NoError(err)
 
-       err = cat.DropTable(context.Background(), 
catalog.ToRestIdentifier("fokko", "table"))
+       err = cat.DropTable(context.Background(), catalog.ToIdentifier("fokko", 
"table"))
        r.Error(err)
        r.ErrorIs(err, catalog.ErrNoSuchTable)
        r.ErrorContains(err, "Table does not exist: fokko.table")
@@ -1058,7 +1059,7 @@ func (r *RestTLSCatalogSuite) TearDownTest() {
 }
 
 func (r *RestTLSCatalogSuite) TestSSLFail() {
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken))
        r.Nil(cat)
 
        r.ErrorContains(err, "tls: failed to verify certificate")
@@ -1078,9 +1079,9 @@ func (r *RestTLSCatalogSuite) 
TestSSLLoadRegisteredCatalog() {
 }
 
 func (r *RestTLSCatalogSuite) TestSSLConfig() {
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken),
-               catalog.WithWarehouseLocation("s3://some-bucket"),
-               catalog.WithTLSConfig(&tls.Config{InsecureSkipVerify: true}))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken),
+               rest.WithWarehouseLocation("s3://some-bucket"),
+               rest.WithTLSConfig(&tls.Config{InsecureSkipVerify: true}))
        r.NoError(err)
 
        r.NotNil(cat)
@@ -1097,9 +1098,9 @@ func (r *RestTLSCatalogSuite) TestSSLCerts() {
                }
        }
 
-       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken),
-               catalog.WithWarehouseLocation("s3://some-bucket"),
-               catalog.WithTLSConfig(&tls.Config{RootCAs: certs}))
+       cat, err := rest.NewCatalog("rest", r.srv.URL, 
rest.WithOAuthToken(TestToken),
+               rest.WithWarehouseLocation("s3://some-bucket"),
+               rest.WithTLSConfig(&tls.Config{RootCAs: certs}))
        r.NoError(err)
 
        r.NotNil(cat)
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index 00ceec9..7fcac03 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -27,6 +27,8 @@ import (
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/catalog/glue"
+       "github.com/apache/iceberg-go/catalog/rest"
        "github.com/apache/iceberg-go/config"
        "github.com/apache/iceberg-go/table"
 
@@ -135,18 +137,18 @@ func main() {
        }
 
        var cat catalog.Catalog
-       switch catalog.CatalogType(cfg.Catalog) {
+       switch catalog.Type(cfg.Catalog) {
        case catalog.REST:
-               opts := []catalog.Option[catalog.RestCatalog]{}
+               opts := []rest.Option{}
                if len(cfg.Cred) > 0 {
-                       opts = append(opts, catalog.WithCredential(cfg.Cred))
+                       opts = append(opts, rest.WithCredential(cfg.Cred))
                }
 
                if len(cfg.Warehouse) > 0 {
-                       opts = append(opts, 
catalog.WithWarehouseLocation(cfg.Warehouse))
+                       opts = append(opts, 
rest.WithWarehouseLocation(cfg.Warehouse))
                }
 
-               if cat, err = catalog.NewRestCatalog("rest", cfg.URI, opts...); 
err != nil {
+               if cat, err = rest.NewCatalog("rest", cfg.URI, opts...); err != 
nil {
                        log.Fatal(err)
                }
        case catalog.Glue:
@@ -154,10 +156,10 @@ func main() {
                if err != nil {
                        log.Fatal(err)
                }
-               opts := []catalog.Option[catalog.GlueCatalog]{
-                       catalog.WithAwsConfig(awscfg),
+               opts := []glue.Option{
+                       glue.WithAwsConfig(awscfg),
                }
-               cat = catalog.NewGlueCatalog(opts...)
+               cat = glue.NewCatalog(opts...)
        default:
                log.Fatal("unrecognized catalog type")
        }
@@ -196,7 +198,7 @@ func main() {
                })
        case cfg.Rename:
                _, err := cat.RenameTable(context.Background(),
-                       catalog.ToRestIdentifier(cfg.RenameFrom), 
catalog.ToRestIdentifier(cfg.RenameTo))
+                       catalog.ToIdentifier(cfg.RenameFrom), 
catalog.ToIdentifier(cfg.RenameTo))
                if err != nil {
                        output.Error(err)
                        os.Exit(1)
@@ -206,13 +208,13 @@ func main() {
        case cfg.Drop:
                switch {
                case cfg.Namespace:
-                       err := cat.DropNamespace(context.Background(), 
catalog.ToRestIdentifier(cfg.Ident))
+                       err := cat.DropNamespace(context.Background(), 
catalog.ToIdentifier(cfg.Ident))
                        if err != nil {
                                output.Error(err)
                                os.Exit(1)
                        }
                case cfg.Table:
-                       err := cat.DropTable(context.Background(), 
catalog.ToRestIdentifier(cfg.Ident))
+                       err := cat.DropTable(context.Background(), 
catalog.ToIdentifier(cfg.Ident))
                        if err != nil {
                                output.Error(err)
                                os.Exit(1)
@@ -231,7 +233,7 @@ func main() {
                                props["Location"] = cfg.LocationURI
                        }
 
-                       err := cat.CreateNamespace(context.Background(), 
catalog.ToRestIdentifier(cfg.Ident), props)
+                       err := cat.CreateNamespace(context.Background(), 
catalog.ToIdentifier(cfg.Ident), props)
                        if err != nil {
                                output.Error(err)
                                os.Exit(1)
@@ -249,7 +251,7 @@ func main() {
 }
 
 func list(output Output, cat catalog.Catalog, parent string) {
-       prnt := catalog.ToRestIdentifier(parent)
+       prnt := catalog.ToIdentifier(parent)
 
        ids, err := cat.ListNamespaces(context.Background(), prnt)
        if err != nil {
@@ -270,7 +272,7 @@ func list(output Output, cat catalog.Catalog, parent 
string) {
 func describe(output Output, cat catalog.Catalog, id string, entityType 
string) {
        ctx := context.Background()
 
-       ident := catalog.ToRestIdentifier(id)
+       ident := catalog.ToIdentifier(id)
 
        isNS, isTbl := false, false
        if (entityType == "any" || entityType == "ns") && len(ident) > 0 {
@@ -312,7 +314,7 @@ func describe(output Output, cat catalog.Catalog, id 
string, entityType string)
 }
 
 func loadTable(output Output, cat catalog.Catalog, id string) *table.Table {
-       tbl, err := cat.LoadTable(context.Background(), 
catalog.ToRestIdentifier(id), nil)
+       tbl, err := cat.LoadTable(context.Background(), 
catalog.ToIdentifier(id), nil)
        if err != nil {
                output.Error(err)
                os.Exit(1)
@@ -329,7 +331,7 @@ type propCmd struct {
 }
 
 func properties(output Output, cat catalog.Catalog, args propCmd) {
-       ctx, ident := context.Background(), 
catalog.ToRestIdentifier(args.identifier)
+       ctx, ident := context.Background(), 
catalog.ToIdentifier(args.identifier)
 
        switch {
        case args.get:
diff --git a/table/scanner_test.go b/table/scanner_test.go
index dcb7ed1..9775ca9 100644
--- a/table/scanner_test.go
+++ b/table/scanner_test.go
@@ -35,6 +35,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/catalog/rest"
        "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/stretchr/testify/suite"
@@ -51,7 +52,7 @@ type ScannerSuite struct {
 func (s *ScannerSuite) SetupTest() {
        s.ctx = context.Background()
 
-       cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181";)
+       cat, err := rest.NewCatalog("rest", "http://localhost:8181";)
        s.Require().NoError(err)
 
        s.cat = cat
@@ -86,7 +87,7 @@ func (s *ScannerSuite) TestScanner() {
 
        for _, tt := range tests {
                s.Run(tt.table+" "+tt.expr.String(), func() {
-                       ident := catalog.ToRestIdentifier("default", tt.table)
+                       ident := catalog.ToIdentifier("default", tt.table)
 
                        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
                        s.Require().NoError(err)
@@ -101,7 +102,7 @@ func (s *ScannerSuite) TestScanner() {
 }
 
 func (s *ScannerSuite) TestScannerWithDeletes() {
-       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
+       ident := catalog.ToIdentifier("default", "test_positional_mor_deletes")
 
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
@@ -141,7 +142,7 @@ func (s *ScannerSuite) TestArrowNan() {
 
        for _, name := range []string{"test_null_nan", 
"test_null_nan_rewritten"} {
                s.Run(name, func() {
-                       ident := catalog.ToRestIdentifier("default", name)
+                       ident := catalog.ToIdentifier("default", name)
                        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
                        s.Require().NoError(err)
 
@@ -164,7 +165,7 @@ func (s *ScannerSuite) TestArrowNotNanCount() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(s.T(), 0)
 
-       ident := catalog.ToRestIdentifier("default", "test_null_nan")
+       ident := catalog.ToIdentifier("default", "test_null_nan")
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
 
@@ -182,7 +183,7 @@ func (s *ScannerSuite) TestScanWithLimit() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(s.T(), 0)
 
-       ident := catalog.ToRestIdentifier("default", "test_limit")
+       ident := catalog.ToIdentifier("default", "test_limit")
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
 
@@ -225,7 +226,7 @@ func (s *ScannerSuite) TestScannerRecordsDeletes() {
        //  (10, 'j'),
        //  (11, 'k'),
        //  (12, 'l')
-       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
+       ident := catalog.ToIdentifier("default", "test_positional_mor_deletes")
 
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
@@ -314,7 +315,7 @@ func (s *ScannerSuite) TestScannerRecordsDoubleDeletes() {
        //  (10, 'j'),
        //  (11, 'k'),
        //  (12, 'l')
-       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_double_deletes")
+       ident := catalog.ToIdentifier("default", 
"test_positional_mor_double_deletes")
 
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
@@ -437,7 +438,7 @@ func (s *ScannerSuite) TestPartitionedTables() {
                        defer scopedMem.CheckSize(s.T())
                        ctx := compute.WithAllocator(s.ctx, mem)
 
-                       ident := catalog.ToRestIdentifier("default", tt.table)
+                       ident := catalog.ToIdentifier("default", tt.table)
 
                        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
                        s.Require().NoError(err)
@@ -465,7 +466,7 @@ func (s *ScannerSuite) TestUnpartitionedUUIDTable() {
                {Name: "uuid_col", Type: extensions.NewUUIDType(), Nullable: 
true},
        }, nil)
 
-       ident := catalog.ToRestIdentifier("default", 
"test_uuid_and_fixed_unpartitioned")
+       ident := catalog.ToIdentifier("default", 
"test_uuid_and_fixed_unpartitioned")
 
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
@@ -505,7 +506,7 @@ func (s *ScannerSuite) TestUnpartitionedFixedTable() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(s.T(), 0)
 
-       ident := catalog.ToRestIdentifier("default", 
"test_uuid_and_fixed_unpartitioned")
+       ident := catalog.ToIdentifier("default", 
"test_uuid_and_fixed_unpartitioned")
 
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
@@ -544,7 +545,7 @@ func (s *ScannerSuite) TestScanTag() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(s.T(), 0)
 
-       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
+       ident := catalog.ToIdentifier("default", "test_positional_mor_deletes")
 
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
@@ -565,7 +566,7 @@ func (s *ScannerSuite) TestScanBranch() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(s.T(), 0)
 
-       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
+       ident := catalog.ToIdentifier("default", "test_positional_mor_deletes")
 
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)
@@ -586,7 +587,7 @@ func (s *ScannerSuite) TestFilterOnNewColumn() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(s.T(), 0)
 
-       ident := catalog.ToRestIdentifier("default", "test_table_add_column")
+       ident := catalog.ToIdentifier("default", "test_table_add_column")
 
        tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
        s.Require().NoError(err)


Reply via email to