This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 97c04be  feat(catalog/glue): add support for glue catalog namespace 
operations (#173)
97c04be is described below

commit 97c04bebb158f410b348bf10eb927746ae431b15
Author: natusioe <[email protected]>
AuthorDate: Thu Oct 24 07:41:47 2024 -0700

    feat(catalog/glue): add support for glue catalog namespace operations (#173)
---
 catalog/catalog.go   |  11 +-
 catalog/glue.go      | 320 +++++++++++++++++++++++++++++++++++---
 catalog/glue_test.go | 429 ++++++++++++++++++++++++++++++++++++++++++++++++---
 cmd/iceberg/main.go  |  48 +++++-
 utils.go             |  16 ++
 utils_test.go        |  78 ++++++++++
 6 files changed, 845 insertions(+), 57 deletions(-)

diff --git a/catalog/catalog.go b/catalog/catalog.go
index d6d7f1e..65da7e5 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -30,6 +30,8 @@ import (
 
 type CatalogType string
 
+type AwsProperties map[string]string
+
 const (
        REST     CatalogType = "rest"
        Hive     CatalogType = "hive"
@@ -52,6 +54,12 @@ func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] {
        }
 }
 
+func WithAwsProperties(props AwsProperties) Option[GlueCatalog] {
+       return func(o *options) {
+               o.awsProperties = props
+       }
+}
+
 func WithCredential(cred string) Option[RestCatalog] {
        return func(o *options) {
                o.credential = cred
@@ -117,7 +125,8 @@ func WithPrefix(prefix string) Option[RestCatalog] {
 type Option[T GlueCatalog | RestCatalog] func(*options)
 
 type options struct {
-       awsConfig aws.Config
+       awsConfig     aws.Config
+       awsProperties AwsProperties
 
        tlsConfig         *tls.Config
        credential        string
diff --git a/catalog/glue.go b/catalog/glue.go
index 91b21ff..c970e5a 100644
--- a/catalog/glue.go
+++ b/catalog/glue.go
@@ -30,22 +30,48 @@ import (
        "github.com/aws/aws-sdk-go-v2/service/glue/types"
 )
 
-const glueTypeIceberg = "ICEBERG"
+const (
+       // Use the same conventions as in the pyiceberg project.
+       // See: 
https://github.com/apache/iceberg-python/blob/main/pyiceberg/catalog/__init__.py#L82-L96
+       glueTypeIceberg      = "ICEBERG"
+       databaseTypePropsKey = "database_type"
+       tableTypePropsKey    = "table_type"
+       descriptionPropsKey  = "Description"
+
+       // Database location.
+       locationPropsKey = "Location"
+
+       // Table metadata location pointer.
+       metadataLocationPropsKey = "metadata_location"
+
+       // The ID of the Glue Data Catalog where the tables reside. If none is 
provided, Glue
+       // automatically uses the caller's AWS account ID by default.
+       // See: 
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
+       glueCatalogIdKey = "glue.id"
+)
 
 var (
        _ Catalog = (*GlueCatalog)(nil)
 )
 
 type glueAPI interface {
+       CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns 
...func(*glue.Options)) (*glue.CreateTableOutput, error)
        GetTable(ctx context.Context, params *glue.GetTableInput, optFns 
...func(*glue.Options)) (*glue.GetTableOutput, error)
        GetTables(ctx context.Context, params *glue.GetTablesInput, optFns 
...func(*glue.Options)) (*glue.GetTablesOutput, error)
+       DeleteTable(ctx context.Context, params *glue.DeleteTableInput, optFns 
...func(*glue.Options)) (*glue.DeleteTableOutput, error)
+       GetDatabase(ctx context.Context, params *glue.GetDatabaseInput, optFns 
...func(*glue.Options)) (*glue.GetDatabaseOutput, error)
        GetDatabases(ctx context.Context, params *glue.GetDatabasesInput, 
optFns ...func(*glue.Options)) (*glue.GetDatabasesOutput, error)
+       CreateDatabase(ctx context.Context, params *glue.CreateDatabaseInput, 
optFns ...func(*glue.Options)) (*glue.CreateDatabaseOutput, error)
+       DeleteDatabase(ctx context.Context, params *glue.DeleteDatabaseInput, 
optFns ...func(*glue.Options)) (*glue.DeleteDatabaseOutput, error)
+       UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput, 
optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error)
 }
 
 type GlueCatalog struct {
-       glueSvc glueAPI
+       glueSvc   glueAPI
+       catalogId *string
 }
 
+// NewGlueCatalog creates a new instance of GlueCatalog with the given options.
 func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
        glueOps := &options{}
 
@@ -53,12 +79,20 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) 
*GlueCatalog {
                o(glueOps)
        }
 
+       var catalogId *string
+       if val, ok := glueOps.awsProperties[glueCatalogIdKey]; ok {
+               catalogId = &val
+       } else {
+               catalogId = nil
+       }
+
        return &GlueCatalog{
-               glueSvc: glue.NewFromConfig(glueOps.awsConfig),
+               glueSvc:   glue.NewFromConfig(glueOps.awsConfig),
+               catalogId: catalogId,
        }
 }
 
-// ListTables returns a list of iceberg tables in the given Glue database.
+// ListTables returns a list of Iceberg tables in the given Glue database.
 //
 // The namespace should just contain the Glue database name.
 func (c *GlueCatalog) ListTables(ctx context.Context, namespace 
table.Identifier) ([]table.Identifier, error) {
@@ -67,7 +101,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, 
namespace table.Identifier
                return nil, err
        }
 
-       params := &glue.GetTablesInput{DatabaseName: aws.String(database)}
+       params := &glue.GetTablesInput{CatalogId: c.catalogId, DatabaseName: 
aws.String(database)}
 
        var icebergTables []table.Identifier
 
@@ -92,7 +126,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, 
namespace table.Identifier
 
 // LoadTable loads a table from the catalog table details.
 //
-// The identifier should contain the Glue database name, then glue table name.
+// The identifier should contain the Glue database name, then Glue table name.
 func (c *GlueCatalog) LoadTable(ctx context.Context, identifier 
table.Identifier, props iceberg.Properties) (*table.Table, error) {
        database, tableName, err := identifierToGlueTable(identifier)
        if err != nil {
@@ -103,11 +137,16 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, 
identifier table.Identifier
                props = map[string]string{}
        }
 
-       location, err := c.getTable(ctx, database, tableName)
+       glueTable, err := c.getTable(ctx, database, tableName)
        if err != nil {
                return nil, err
        }
 
+       location, ok := glueTable.Parameters[metadataLocationPropsKey]
+       if !ok {
+               return nil, fmt.Errorf("missing metadata location for table 
%s.%s", database, tableName)
+       }
+
        // TODO: consider providing a way to directly access the S3 iofs to 
enable testing of the catalog.
        iofs, err := io.LoadFS(props, location)
        if err != nil {
@@ -126,34 +165,251 @@ func (c *GlueCatalog) CatalogType() CatalogType {
        return Glue
 }
 
+// DropTable deletes an Iceberg table from the Glue catalog.
 func (c *GlueCatalog) DropTable(ctx context.Context, identifier 
table.Identifier) error {
-       return fmt.Errorf("%w: [Glue Catalog] drop table", 
iceberg.ErrNotImplemented)
+       database, tableName, err := identifierToGlueTable(identifier)
+       if err != nil {
+               return err
+       }
+
+       // Check if the table exists and is an Iceberg table.
+       _, err = c.getTable(ctx, database, tableName)
+       if err != nil {
+               return err
+       }
+
+       params := &glue.DeleteTableInput{
+               CatalogId:    c.catalogId,
+               DatabaseName: aws.String(database),
+               Name:         aws.String(tableName),
+       }
+       _, err = c.glueSvc.DeleteTable(ctx, params)
+       if err != nil {
+               return fmt.Errorf("failed to drop table %s.%s: %w", database, 
tableName, err)
+       }
+
+       return nil
 }
 
+// RenameTable renames an Iceberg table in the Glue catalog.
 func (c *GlueCatalog) RenameTable(ctx context.Context, from, to 
table.Identifier) (*table.Table, error) {
-       return nil, fmt.Errorf("%w: [Glue Catalog] rename table", 
iceberg.ErrNotImplemented)
+       fromDatabase, fromTable, err := identifierToGlueTable(from)
+       if err != nil {
+               return nil, err
+       }
+
+       toDatabase, toTable, err := identifierToGlueTable(to)
+       if err != nil {
+               return nil, err
+       }
+
+       if fromDatabase != toDatabase {
+               return nil, fmt.Errorf("cannot rename table across namespaces: 
%s -> %s", fromDatabase, toDatabase)
+       }
+
+       // Fetch the existing Glue table to copy the metadata into the new 
table.
+       fromGlueTable, err := c.getTable(ctx, fromDatabase, fromTable)
+       if err != nil {
+               return nil, fmt.Errorf("failed to fetch the table %s.%s: %w", 
fromDatabase, fromTable, err)
+       }
+
+       // Create the new table.
+       _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
+               CatalogId:    c.catalogId,
+               DatabaseName: aws.String(toDatabase),
+               TableInput: &types.TableInput{
+                       Name:              aws.String(toTable),
+                       Owner:             fromGlueTable.Owner,
+                       Description:       fromGlueTable.Description,
+                       Parameters:        fromGlueTable.Parameters,
+                       StorageDescriptor: fromGlueTable.StorageDescriptor,
+               },
+       })
+       if err != nil {
+               return nil, fmt.Errorf("failed to create the table %s.%s: %w", 
fromDatabase, fromTable, err)
+       }
+
+       // Drop the old table.
+       _, err = c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{
+               CatalogId:    c.catalogId,
+               DatabaseName: aws.String(fromDatabase),
+               Name:         aws.String(fromTable),
+       })
+       if err != nil {
+               // Best-effort rollback the table creation.
+               _, rollbackErr := c.glueSvc.DeleteTable(ctx, 
&glue.DeleteTableInput{
+                       CatalogId:    c.catalogId,
+                       DatabaseName: aws.String(toDatabase),
+                       Name:         aws.String(toTable),
+               })
+               if rollbackErr != nil {
+                       fmt.Printf("failed to rollback the new table %s.%s: 
%v", toDatabase, toTable, rollbackErr)
+               }
+
+               return nil, fmt.Errorf("failed to rename the table %s.%s: %w", 
fromDatabase, fromTable, err)
+       }
+
+       // Load the new table to return.
+       renamedTable, err := c.LoadTable(ctx, GlueTableIdentifier(toDatabase, 
toTable), nil)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load renamed table %s.%s: 
%w", toDatabase, toTable, err)
+       }
+
+       return renamedTable, nil
 }
 
+// CreateNamespace creates a new Iceberg namespace in the Glue catalog.
 func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
-       return fmt.Errorf("%w: [Glue Catalog] create namespace", 
iceberg.ErrNotImplemented)
+       database, err := identifierToGlueDatabase(namespace)
+       if err != nil {
+               return err
+       }
+
+       databaseParameters := map[string]string{
+               databaseTypePropsKey: glueTypeIceberg,
+       }
+
+       description := props[descriptionPropsKey]
+       locationURI := props[locationPropsKey]
+
+       if description != "" {
+               databaseParameters[descriptionPropsKey] = description
+       }
+       if locationURI != "" {
+               databaseParameters[locationPropsKey] = locationURI
+       }
+
+       databaseInput := &types.DatabaseInput{
+               Name:       aws.String(database),
+               Parameters: databaseParameters,
+       }
+
+       params := &glue.CreateDatabaseInput{CatalogId: c.catalogId, 
DatabaseInput: databaseInput}
+       _, err = c.glueSvc.CreateDatabase(ctx, params)
+
+       if err != nil {
+               return fmt.Errorf("failed to create database %s: %w", database, 
err)
+       }
+
+       return nil
 }
 
+// DropNamespace deletes an Iceberg namespace from the Glue catalog.
 func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
-       return fmt.Errorf("%w: [Glue Catalog] drop namespace", 
iceberg.ErrNotImplemented)
+       databaseName, err := identifierToGlueDatabase(namespace)
+       if err != nil {
+               return err
+       }
+
+       // Check if the database exists and is an iceberg database.
+       _, err = c.getDatabase(ctx, databaseName)
+       if err != nil {
+               return err
+       }
+
+       params := &glue.DeleteDatabaseInput{CatalogId: c.catalogId, Name: 
aws.String(databaseName)}
+       _, err = c.glueSvc.DeleteDatabase(ctx, params)
+       if err != nil {
+               return fmt.Errorf("failed to drop namespace %s: %w", 
databaseName, err)
+       }
+
+       return nil
 }
 
+// LoadNamespaceProperties loads the properties of an Iceberg namespace from 
the Glue catalog.
 func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace 
table.Identifier) (iceberg.Properties, error) {
-       return nil, fmt.Errorf("%w: [Glue Catalog] load namespace properties", 
iceberg.ErrNotImplemented)
+       databaseName, err := identifierToGlueDatabase(namespace)
+       if err != nil {
+               return nil, err
+       }
+
+       database, err := c.getDatabase(ctx, databaseName)
+       if err != nil {
+               return nil, err
+       }
+
+       props := make(map[string]string)
+       if database.Parameters != nil {
+               for k, v := range database.Parameters {
+                       props[k] = v
+               }
+       }
+
+       return props, nil
 }
 
+// UpdateNamespaceProperties updates the properties of an Iceberg namespace in 
the Glue catalog.
+// The removals list contains the keys to remove, and the updates map contains 
the keys and values to update.
 func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace 
table.Identifier,
        removals []string, updates iceberg.Properties) 
(PropertiesUpdateSummary, error) {
-       return PropertiesUpdateSummary{}, fmt.Errorf("%w: [Glue Catalog] update 
namespace properties", iceberg.ErrNotImplemented)
+
+       databaseName, err := identifierToGlueDatabase(namespace)
+       if err != nil {
+               return PropertiesUpdateSummary{}, err
+       }
+
+       database, err := c.getDatabase(ctx, databaseName)
+       if err != nil {
+               return PropertiesUpdateSummary{}, err
+       }
+
+       overlap := []string{}
+       for _, key := range removals {
+               if _, exists := updates[key]; exists {
+                       overlap = append(overlap, key)
+               }
+       }
+       if len(overlap) > 0 {
+               return PropertiesUpdateSummary{}, fmt.Errorf("conflict between 
removals and updates for keys: %v", overlap)
+       }
+
+       updatedProperties := make(map[string]string)
+       if database.Parameters != nil {
+               for k, v := range database.Parameters {
+                       updatedProperties[k] = v
+               }
+       }
+
+       // Removals.
+       removed := []string{}
+       for _, key := range removals {
+               if _, exists := updatedProperties[key]; exists {
+                       delete(updatedProperties, key)
+                       removed = append(removed, key)
+               }
+       }
+
+       // Updates.
+       updated := []string{}
+       for key, value := range updates {
+               if updatedProperties[key] != value {
+                       updatedProperties[key] = value
+                       updated = append(updated, key)
+               }
+       }
+
+       _, err = c.glueSvc.UpdateDatabase(ctx, 
&glue.UpdateDatabaseInput{CatalogId: c.catalogId, Name: 
aws.String(databaseName), DatabaseInput: &types.DatabaseInput{
+               Name:       aws.String(databaseName),
+               Parameters: updatedProperties,
+       }})
+       if err != nil {
+               return PropertiesUpdateSummary{}, fmt.Errorf("failed to update 
namespace properties %s: %w", databaseName, err)
+       }
+
+       propertiesUpdateSummary := PropertiesUpdateSummary{
+               Removed: removed,
+               Updated: updated,
+               Missing: iceberg.Difference(removals, removed),
+       }
+
+       return propertiesUpdateSummary, nil
 }
 
 // ListNamespaces returns a list of Iceberg namespaces from the given Glue 
catalog.
 func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent 
table.Identifier) ([]table.Identifier, error) {
-       params := &glue.GetDatabasesInput{}
+       params := &glue.GetDatabasesInput{
+               CatalogId: c.catalogId,
+       }
 
        if parent != nil {
                return nil, fmt.Errorf("hierarchical namespace is not 
supported")
@@ -181,25 +437,43 @@ func (c *GlueCatalog) ListNamespaces(ctx context.Context, 
parent table.Identifie
 }
 
 // GetTable loads a table from the Glue Catalog using the given database and 
table name.
-func (c *GlueCatalog) getTable(ctx context.Context, database, tableName 
string) (string, error) {
+func (c *GlueCatalog) getTable(ctx context.Context, database, tableName 
string) (*types.Table, error) {
        tblRes, err := c.glueSvc.GetTable(ctx,
                &glue.GetTableInput{
+                       CatalogId:    c.catalogId,
                        DatabaseName: aws.String(database),
                        Name:         aws.String(tableName),
                },
        )
        if err != nil {
                if errors.Is(err, &types.EntityNotFoundException{}) {
-                       return "", fmt.Errorf("failed to get table %s.%s: %w", 
database, tableName, ErrNoSuchTable)
+                       return nil, fmt.Errorf("failed to get table %s.%s: %w", 
database, tableName, ErrNoSuchTable)
+               }
+               return nil, fmt.Errorf("failed to get table %s.%s: %w", 
database, tableName, err)
+       }
+
+       if tblRes.Table.Parameters[tableTypePropsKey] != glueTypeIceberg {
+               return nil, fmt.Errorf("table %s.%s is not an iceberg table", 
database, tableName)
+       }
+
+       return tblRes.Table, nil
+}
+
+// GetDatabase loads a database from the Glue Catalog using the given database 
name.
+func (c *GlueCatalog) getDatabase(ctx context.Context, databaseName string) 
(*types.Database, error) {
+       database, err := c.glueSvc.GetDatabase(ctx, 
&glue.GetDatabaseInput{CatalogId: c.catalogId, Name: aws.String(databaseName)})
+       if err != nil {
+               if errors.Is(err, &types.EntityNotFoundException{}) {
+                       return nil, fmt.Errorf("failed to get namespace %s: 
%w", databaseName, ErrNoSuchNamespace)
                }
-               return "", fmt.Errorf("failed to get table %s.%s: %w", 
database, tableName, err)
+               return nil, fmt.Errorf("failed to get namespace %s: %w", 
databaseName, err)
        }
 
-       if tblRes.Table.Parameters["table_type"] != "ICEBERG" {
-               return "", errors.New("table is not an iceberg table")
+       if database.Database.Parameters[databaseTypePropsKey] != 
glueTypeIceberg {
+               return nil, fmt.Errorf("namespace %s is not an iceberg 
namespace", databaseName)
        }
 
-       return tblRes.Table.Parameters["metadata_location"], nil
+       return database.Database, nil
 }
 
 func identifierToGlueTable(identifier table.Identifier) (string, string, 
error) {
@@ -218,7 +492,7 @@ func identifierToGlueDatabase(identifier table.Identifier) 
(string, error) {
        return identifier[0], nil
 }
 
-// GlueTableIdentifier returns a glue table identifier for an iceberg table in 
the format [database, table].
+// GlueTableIdentifier returns a glue table identifier for an Iceberg table in 
the format [database, table].
 func GlueTableIdentifier(database string, tableName string) table.Identifier {
        return []string{database, tableName}
 }
@@ -232,7 +506,7 @@ func filterTableListByType(database string, tableList 
[]types.Table, tableType s
        var filtered []table.Identifier
 
        for _, tbl := range tableList {
-               if tbl.Parameters["table_type"] != tableType {
+               if tbl.Parameters[tableTypePropsKey] != tableType {
                        continue
                }
                filtered = append(filtered, GlueTableIdentifier(database, 
aws.ToString(tbl.Name)))
@@ -245,7 +519,7 @@ func filterDatabaseListByType(databases []types.Database, 
databaseType string) [
        var filtered []table.Identifier
 
        for _, database := range databases {
-               if database.Parameters["database_type"] != databaseType {
+               if database.Parameters[databaseTypePropsKey] != databaseType {
                        continue
                }
                filtered = append(filtered, 
GlueDatabaseIdentifier(aws.ToString(database.Name)))
diff --git a/catalog/glue_test.go b/catalog/glue_test.go
index 5889537..c08f4cc 100644
--- a/catalog/glue_test.go
+++ b/catalog/glue_test.go
@@ -19,6 +19,7 @@ package catalog
 
 import (
        "context"
+       "errors"
        "os"
        "testing"
 
@@ -34,6 +35,11 @@ type mockGlueClient struct {
        mock.Mock
 }
 
+func (m *mockGlueClient) CreateTable(ctx context.Context, params 
*glue.CreateTableInput, optFns ...func(*glue.Options)) 
(*glue.CreateTableOutput, error) {
+       args := m.Called(ctx, params, optFns)
+       return args.Get(0).(*glue.CreateTableOutput), args.Error(1)
+}
+
 func (m *mockGlueClient) GetTable(ctx context.Context, params 
*glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, 
error) {
        args := m.Called(ctx, params, optFns)
        return args.Get(0).(*glue.GetTableOutput), args.Error(1)
@@ -44,11 +50,51 @@ func (m *mockGlueClient) GetTables(ctx context.Context, 
params *glue.GetTablesIn
        return args.Get(0).(*glue.GetTablesOutput), args.Error(1)
 }
 
+func (m *mockGlueClient) DeleteTable(ctx context.Context, params 
*glue.DeleteTableInput, optFns ...func(*glue.Options)) 
(*glue.DeleteTableOutput, error) {
+       args := m.Called(ctx, params, optFns)
+       return args.Get(0).(*glue.DeleteTableOutput), args.Error(1)
+}
+
+func (m *mockGlueClient) GetDatabase(ctx context.Context, params 
*glue.GetDatabaseInput, optFns ...func(*glue.Options)) 
(*glue.GetDatabaseOutput, error) {
+       args := m.Called(ctx, params, optFns)
+       return args.Get(0).(*glue.GetDatabaseOutput), args.Error(1)
+}
+
 func (m *mockGlueClient) GetDatabases(ctx context.Context, params 
*glue.GetDatabasesInput, optFns ...func(*glue.Options)) 
(*glue.GetDatabasesOutput, error) {
        args := m.Called(ctx, params, optFns)
        return args.Get(0).(*glue.GetDatabasesOutput), args.Error(1)
 }
 
+func (m *mockGlueClient) CreateDatabase(ctx context.Context, params 
*glue.CreateDatabaseInput, optFns ...func(*glue.Options)) 
(*glue.CreateDatabaseOutput, error) {
+       args := m.Called(ctx, params, optFns)
+       return args.Get(0).(*glue.CreateDatabaseOutput), args.Error(1)
+}
+
+func (m *mockGlueClient) DeleteDatabase(ctx context.Context, params 
*glue.DeleteDatabaseInput, optFns ...func(*glue.Options)) 
(*glue.DeleteDatabaseOutput, error) {
+       args := m.Called(ctx, params, optFns)
+       return args.Get(0).(*glue.DeleteDatabaseOutput), args.Error(1)
+}
+
+func (m *mockGlueClient) UpdateDatabase(ctx context.Context, params 
*glue.UpdateDatabaseInput, optFns ...func(*glue.Options)) 
(*glue.UpdateDatabaseOutput, error) {
+       args := m.Called(ctx, params, optFns)
+       return args.Get(0).(*glue.UpdateDatabaseOutput), args.Error(1)
+}
+
+var testIcebergGlueTable = types.Table{
+       Name: aws.String("test_table"),
+       Parameters: map[string]string{
+               tableTypePropsKey:        "ICEBERG",
+               metadataLocationPropsKey: 
"s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
+       },
+}
+
+var testNonIcebergGlueTable = types.Table{
+       Name: aws.String("other_table"),
+       Parameters: map[string]string{
+               metadataLocationPropsKey: "s3://test-bucket/other_table/",
+       },
+}
+
 func TestGlueGetTable(t *testing.T) {
        assert := require.New(t)
 
@@ -57,22 +103,15 @@ func TestGlueGetTable(t *testing.T) {
        mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
                DatabaseName: aws.String("test_database"),
                Name:         aws.String("test_table"),
-       }, mock.Anything).Return(&glue.GetTableOutput{
-               Table: &types.Table{
-                       Parameters: map[string]string{
-                               "table_type":        "ICEBERG",
-                               "metadata_location": 
"s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
-                       },
-               },
-       }, nil)
+       }, mock.Anything).Return(&glue.GetTableOutput{Table: 
&testIcebergGlueTable}, nil)
 
        glueCatalog := &GlueCatalog{
                glueSvc: mockGlueSvc,
        }
 
-       location, err := glueCatalog.getTable(context.TODO(), "test_database", 
"test_table")
+       table, err := glueCatalog.getTable(context.TODO(), "test_database", 
"test_table")
        assert.NoError(err)
-       
assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", 
location)
+       
assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json", 
table.Parameters[metadataLocationPropsKey])
 }
 
 func TestGlueListTables(t *testing.T) {
@@ -83,21 +122,7 @@ func TestGlueListTables(t *testing.T) {
        mockGlueSvc.On("GetTables", mock.Anything, &glue.GetTablesInput{
                DatabaseName: aws.String("test_database"),
        }, mock.Anything).Return(&glue.GetTablesOutput{
-               TableList: []types.Table{
-                       {
-                               Name: aws.String("test_table"),
-                               Parameters: map[string]string{
-                                       "table_type":        "ICEBERG",
-                                       "metadata_location": 
"s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
-                               },
-                       },
-                       {
-                               Name: aws.String("other_table"),
-                               Parameters: map[string]string{
-                                       "metadata_location": 
"s3://test-bucket/other_table/",
-                               },
-                       },
-               },
+               TableList: []types.Table{testIcebergGlueTable, 
testNonIcebergGlueTable},
        }, nil).Once()
 
        glueCatalog := &GlueCatalog{
@@ -140,6 +165,360 @@ func TestGlueListNamespaces(t *testing.T) {
        assert.Equal([]string{"test_database"}, databases[0])
 }
 
+func TestGlueDropTable(t *testing.T) {
+       assert := require.New(t)
+
+       mockGlueSvc := &mockGlueClient{}
+
+       mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("test_table"),
+       }, mock.Anything).Return(&glue.GetTableOutput{
+               Table: &testIcebergGlueTable,
+       }, nil).Once()
+
+       mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("test_table"),
+       }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
+
+       glueCatalog := &GlueCatalog{
+               glueSvc: mockGlueSvc,
+       }
+
+       err := glueCatalog.DropTable(context.TODO(), 
GlueTableIdentifier("test_database", "test_table"))
+       assert.NoError(err)
+}
+
+func TestGlueCreateNamespace(t *testing.T) {
+       assert := require.New(t)
+
+       mockGlueSvc := &mockGlueClient{}
+
+       mockGlueSvc.On("CreateDatabase", mock.Anything, 
&glue.CreateDatabaseInput{
+               DatabaseInput: &types.DatabaseInput{
+                       Name: aws.String("test_namespace"),
+                       Parameters: map[string]string{
+                               databaseTypePropsKey: glueTypeIceberg,
+                               descriptionPropsKey:  "Test Description",
+                               locationPropsKey:     "s3://test-location",
+                       },
+               },
+       }, mock.Anything).Return(&glue.CreateDatabaseOutput{}, nil).Once()
+
+       glueCatalog := &GlueCatalog{
+               glueSvc: mockGlueSvc,
+       }
+
+       props := map[string]string{
+               descriptionPropsKey: "Test Description",
+               locationPropsKey:    "s3://test-location",
+       }
+
+       err := glueCatalog.CreateNamespace(context.TODO(), 
GlueDatabaseIdentifier("test_namespace"), props)
+       assert.NoError(err)
+}
+
+func TestGlueDropNamespace(t *testing.T) {
+       assert := require.New(t)
+
+       mockGlueSvc := &mockGlueClient{}
+
+       mockGlueSvc.On("GetDatabase", mock.Anything, &glue.GetDatabaseInput{
+               Name: aws.String("test_namespace"),
+       }, mock.Anything).Return(&glue.GetDatabaseOutput{
+               Database: &types.Database{
+                       Name: aws.String("test_namespace"),
+                       Parameters: map[string]string{
+                               "database_type": "ICEBERG",
+                       },
+               },
+       }, nil).Once()
+
+       mockGlueSvc.On("DeleteDatabase", mock.Anything, 
&glue.DeleteDatabaseInput{
+               Name: aws.String("test_namespace"),
+       }, mock.Anything).Return(&glue.DeleteDatabaseOutput{}, nil).Once()
+
+       glueCatalog := &GlueCatalog{
+               glueSvc: mockGlueSvc,
+       }
+
+       err := glueCatalog.DropNamespace(context.TODO(), 
GlueDatabaseIdentifier("test_namespace"))
+       assert.NoError(err)
+}
+
+func TestGlueUpdateNamespaceProperties(t *testing.T) {
+       tests := []struct {
+               name        string
+               initial     map[string]string
+               updates     map[string]string
+               removals    []string
+               expected    PropertiesUpdateSummary
+               shouldError bool
+       }{
+               {
+                       name: "Overlapping removals and updates",
+                       initial: map[string]string{
+                               "key1": "value1",
+                               "key2": "value2",
+                       },
+                       updates: map[string]string{
+                               "key1": "new_value1",
+                               "key3": "value3",
+                       },
+                       removals:    []string{"key1"},
+                       shouldError: true,
+               },
+               {
+                       name: "Some keys in removals are missing",
+                       initial: map[string]string{
+                               "key1": "value1",
+                               "key2": "value2",
+                       },
+                       updates: map[string]string{
+                               "key3": "value3",
+                       },
+                       removals: []string{"key4"},
+                       expected: PropertiesUpdateSummary{
+                               Removed: []string{},
+                               Updated: []string{"key3"},
+                               Missing: []string{"key4"},
+                       },
+                       shouldError: false,
+               },
+               {
+                       name: "No changes to some properties",
+                       initial: map[string]string{
+                               "key1": "value1",
+                               "key2": "value2",
+                       },
+                       updates: map[string]string{
+                               "key1": "value1",
+                               "key3": "value3",
+                       },
+                       removals: []string{},
+                       expected: PropertiesUpdateSummary{
+                               Removed: []string{},
+                               Updated: []string{"key3"},
+                               Missing: []string{},
+                       },
+                       shouldError: false,
+               },
+               {
+                       name: "Happy path with updates and removals",
+                       initial: map[string]string{
+                               "key1": "value1",
+                               "key2": "value2",
+                               "key4": "value4",
+                       },
+                       updates: map[string]string{
+                               "key2": "new_value2",
+                       },
+                       removals: []string{"key4"},
+                       expected: PropertiesUpdateSummary{
+                               Removed: []string{"key4"},
+                               Updated: []string{"key2"},
+                               Missing: []string{},
+                       },
+                       shouldError: false,
+               },
+               {
+                       name: "Happy path with only updates",
+                       initial: map[string]string{
+                               "key1": "value1",
+                               "key2": "value2",
+                       },
+                       updates: map[string]string{
+                               "key2": "new_value2",
+                       },
+                       removals: []string{},
+                       expected: PropertiesUpdateSummary{
+                               Removed: []string{},
+                               Updated: []string{"key2"},
+                               Missing: []string{},
+                       },
+                       shouldError: false,
+               },
+               {
+                       name: "Happy path with only removals",
+                       initial: map[string]string{
+                               "key1": "value1",
+                               "key2": "value2",
+                               "key3": "value3",
+                       },
+                       updates:  map[string]string{},
+                       removals: []string{"key2", "key3"},
+                       expected: PropertiesUpdateSummary{
+                               Removed: []string{"key2", "key3"},
+                               Updated: []string{},
+                               Missing: []string{},
+                       },
+                       shouldError: false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert := require.New(t)
+
+                       mockGlueSvc := &mockGlueClient{}
+
+                       tt.initial[databaseTypePropsKey] = glueTypeIceberg
+
+                       mockGlueSvc.On("GetDatabase", mock.Anything, 
&glue.GetDatabaseInput{
+                               Name: aws.String("test_namespace"),
+                       }, mock.Anything).Return(&glue.GetDatabaseOutput{
+                               Database: &types.Database{
+                                       Name:       
aws.String("test_namespace"),
+                                       Parameters: tt.initial,
+                               },
+                       }, nil).Once()
+
+                       if !tt.shouldError {
+                               mockGlueSvc.On("UpdateDatabase", mock.Anything, 
mock.Anything, mock.Anything).Return(&glue.UpdateDatabaseOutput{}, nil).Once()
+                       }
+
+                       glueCatalog := &GlueCatalog{
+                               glueSvc: mockGlueSvc,
+                       }
+
+                       summary, err := 
glueCatalog.UpdateNamespaceProperties(context.TODO(), 
GlueDatabaseIdentifier("test_namespace"), tt.removals, tt.updates)
+                       if tt.shouldError {
+                               assert.Error(err)
+                       } else {
+                               assert.NoError(err)
+                               assert.EqualValues(tt.expected.Removed, 
summary.Removed)
+                               assert.EqualValues(tt.expected.Updated, 
summary.Updated)
+                               assert.EqualValues(tt.expected.Missing, 
summary.Missing)
+                       }
+               })
+       }
+}
+
+func TestGlueRenameTable(t *testing.T) {
+       t.Skip("Skipping this test temporarily because LoadTable is not 
testable due to the dependency on the IO.")
+
+       assert := require.New(t)
+
+       mockGlueSvc := &mockGlueClient{}
+
+       // Mock GetTable response
+       mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("test_table"),
+       }, mock.Anything).Return(&glue.GetTableOutput{
+               Table: &types.Table{
+                       Name: aws.String("test_table"),
+                       Parameters: map[string]string{
+                               tableTypePropsKey: glueTypeIceberg,
+                       },
+                       Owner:             aws.String("owner"),
+                       Description:       aws.String("description"),
+                       StorageDescriptor: &types.StorageDescriptor{},
+               },
+       }, nil).Once()
+
+       mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("new_test_table"),
+       }, mock.Anything).Return(&glue.GetTableOutput{
+               Table: &types.Table{
+                       Name: aws.String("new_test_table"),
+                       Parameters: map[string]string{
+                               tableTypePropsKey:        glueTypeIceberg,
+                               metadataLocationPropsKey: 
"s3://test-bucket/new_test_table/metadata/abc123-123.metadata.json",
+                       },
+                       Owner:             aws.String("owner"),
+                       Description:       aws.String("description"),
+                       StorageDescriptor: &types.StorageDescriptor{},
+               },
+       }, nil).Once()
+
+       // Mock CreateTable response
+       mockGlueSvc.On("CreateTable", mock.Anything, &glue.CreateTableInput{
+               DatabaseName: aws.String("test_database"),
+               TableInput: &types.TableInput{
+                       Name:              aws.String("new_test_table"),
+                       Owner:             aws.String("owner"),
+                       Description:       aws.String("description"),
+                       Parameters:        map[string]string{tableTypePropsKey: 
glueTypeIceberg},
+                       StorageDescriptor: &types.StorageDescriptor{},
+               },
+       }, mock.Anything).Return(&glue.CreateTableOutput{}, nil).Once()
+
+       // Mock DeleteTable response for old table
+       mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("test_table"),
+       }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
+
+       glueCatalog := &GlueCatalog{
+               glueSvc: mockGlueSvc,
+       }
+
+       renamedTable, err := glueCatalog.RenameTable(context.TODO(), 
GlueTableIdentifier("test_database", "test_table"), 
GlueTableIdentifier("test_database", "new_test_table"))
+       assert.NoError(err)
+       assert.Equal("new_test_table", renamedTable.Identifier()[1])
+}
+
+func TestGlueRenameTable_DeleteTableFailureRollback(t *testing.T) {
+       assert := require.New(t)
+
+       mockGlueSvc := &mockGlueClient{}
+
+       // Mock GetTable response
+       mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("test_table"),
+       }, mock.Anything).Return(&glue.GetTableOutput{
+               Table: &types.Table{
+                       Name: aws.String("test_table"),
+                       Parameters: map[string]string{
+                               tableTypePropsKey: glueTypeIceberg,
+                       },
+                       Owner:             aws.String("owner"),
+                       Description:       aws.String("description"),
+                       StorageDescriptor: &types.StorageDescriptor{},
+               },
+       }, nil).Once()
+
+       // Mock CreateTable response
+       mockGlueSvc.On("CreateTable", mock.Anything, &glue.CreateTableInput{
+               DatabaseName: aws.String("test_database"),
+               TableInput: &types.TableInput{
+                       Name:              aws.String("new_test_table"),
+                       Owner:             aws.String("owner"),
+                       Description:       aws.String("description"),
+                       Parameters:        map[string]string{tableTypePropsKey: 
glueTypeIceberg},
+                       StorageDescriptor: &types.StorageDescriptor{},
+               },
+       }, mock.Anything).Return(&glue.CreateTableOutput{}, nil).Once()
+
+       // Mock DeleteTable response for old table (fail)
+       mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("test_table"),
+       }, mock.Anything).Return(&glue.DeleteTableOutput{}, errors.New("delete 
table failed")).Once()
+
+       // Mock DeleteTable response for rollback (new table)
+       mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("new_test_table"),
+       }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
+
+       glueCatalog := &GlueCatalog{
+               glueSvc: mockGlueSvc,
+       }
+
+       renamedTable, err := glueCatalog.RenameTable(context.TODO(), 
GlueTableIdentifier("test_database", "test_table"), 
GlueTableIdentifier("test_database", "new_test_table"))
+       assert.Error(err)
+       assert.Nil(renamedTable)
+       mockGlueSvc.AssertCalled(t, "DeleteTable", mock.Anything, 
&glue.DeleteTableInput{
+               DatabaseName: aws.String("test_database"),
+               Name:         aws.String("new_test_table"),
+       }, mock.Anything)
+}
+
 func TestGlueListTablesIntegration(t *testing.T) {
        if os.Getenv("TEST_DATABASE_NAME") == "" {
                t.Skip()
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index 7ec4a8e..a44ffb6 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -38,6 +38,8 @@ Usage:
   iceberg list [options] [PARENT]
   iceberg describe [options] [namespace | table] IDENTIFIER
   iceberg (schema | spec | uuid | location) [options] TABLE_ID
+  iceberg create [options] (namespace | table) IDENTIFIER
+  iceberg create [options] (namespace | table) IDENTIFIER
   iceberg drop [options] (namespace | table) IDENTIFIER
   iceberg files [options] TABLE_ID [--history]
   iceberg rename [options] <from> <to>
@@ -60,7 +62,9 @@ Options:
   --output TYPE      output type (json/text) [default: text]
   --credential TEXT  specify credentials for the catalog
   --warehouse TEXT   specify the warehouse to use
-  --config TEXT      specify the path to the configuration file`
+  --config TEXT      specify the path to the configuration file
+  --description TEXT   specify a description for the namespace
+  --location-uri TEXT          specify a location URI for the namespace`
 
 type Config struct {
        List     bool `docopt:"list"`
@@ -70,6 +74,7 @@ type Config struct {
        Uuid     bool `docopt:"uuid"`
        Location bool `docopt:"location"`
        Props    bool `docopt:"properties"`
+       Create   bool `docopt:"create"`
        Drop     bool `docopt:"drop"`
        Files    bool `docopt:"files"`
        Rename   bool `docopt:"rename"`
@@ -90,13 +95,15 @@ type Config struct {
        PropName string `docopt:"PROPNAME"`
        Value    string `docopt:"VALUE"`
 
-       Catalog   string `docopt:"--catalog"`
-       URI       string `docopt:"--uri"`
-       Output    string `docopt:"--output"`
-       History   bool   `docopt:"--history"`
-       Cred      string `docopt:"--credential"`
-       Warehouse string `docopt:"--warehouse"`
-       Config    string `docopt:"--config"`
+       Catalog     string `docopt:"--catalog"`
+       URI         string `docopt:"--uri"`
+       Output      string `docopt:"--output"`
+       History     bool   `docopt:"--history"`
+       Cred        string `docopt:"--credential"`
+       Warehouse   string `docopt:"--warehouse"`
+       Config      string `docopt:"--config"`
+       Description string `docopt:"--description"`
+       LocationURI string `docopt:"--location-uri"`
 }
 
 func main() {
@@ -141,6 +148,9 @@ func main() {
                if cat, err = catalog.NewRestCatalog("rest", cfg.URI, opts...); 
err != nil {
                        log.Fatal(err)
                }
+       case catalog.Glue:
+               opts := []catalog.Option[catalog.GlueCatalog]{}
+               cat = catalog.NewGlueCatalog(opts...)
        default:
                log.Fatal("unrecognized catalog type")
        }
@@ -201,6 +211,28 @@ func main() {
                                os.Exit(1)
                        }
                }
+
+       case cfg.Create:
+               switch {
+               case cfg.Namespace:
+                       props := iceberg.Properties{}
+                       if cfg.Description != "" {
+                               props["Description"] = cfg.Description
+                       }
+
+                       if cfg.LocationURI != "" {
+                               props["Location"] = cfg.LocationURI
+                       }
+
+                       err := cat.CreateNamespace(context.Background(), 
catalog.ToRestIdentifier(cfg.Ident), props)
+                       if err != nil {
+                               output.Error(err)
+                               os.Exit(1)
+                       }
+               default:
+                       output.Error(errors.New("not implemented"))
+                       os.Exit(1)
+               }
        case cfg.Files:
                tbl := loadTable(output, cat, cfg.TableID)
                output.Files(tbl, cfg.History)
diff --git a/utils.go b/utils.go
index c0a00fe..7807acb 100644
--- a/utils.go
+++ b/utils.go
@@ -196,3 +196,19 @@ func (l literalSet) All(fn func(Literal) bool) bool {
        }
        return true
 }
+
+// Helper function to find the difference between two slices (a - b).
+func Difference(a, b []string) []string {
+       m := make(map[string]bool)
+       for _, item := range b {
+               m[item] = true
+       }
+
+       diff := make([]string, 0)
+       for _, item := range a {
+               if !m[item] {
+                       diff = append(diff, item)
+               }
+       }
+       return diff
+}
diff --git a/utils_test.go b/utils_test.go
new file mode 100644
index 0000000..9f4f888
--- /dev/null
+++ b/utils_test.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+)
+
+func TestDifference(t *testing.T) {
+       tests := []struct {
+               name     string
+               a        []string
+               b        []string
+               expected []string
+       }{
+               {
+                       name:     "No elements in common",
+                       a:        []string{"a", "b", "c"},
+                       b:        []string{"d", "e", "f"},
+                       expected: []string{"a", "b", "c"},
+               },
+               {
+                       name:     "Some elements in common",
+                       a:        []string{"a", "b", "c"},
+                       b:        []string{"b"},
+                       expected: []string{"a", "c"},
+               },
+               {
+                       name:     "All elements in common",
+                       a:        []string{"a", "b", "c"},
+                       b:        []string{"a", "b", "c"},
+                       expected: []string{},
+               },
+               {
+                       name:     "Empty slice a",
+                       a:        []string{},
+                       b:        []string{"a", "b", "c"},
+                       expected: []string{},
+               },
+               {
+                       name:     "Empty slice b",
+                       a:        []string{"a", "b", "c"},
+                       b:        []string{},
+                       expected: []string{"a", "b", "c"},
+               },
+               {
+                       name:     "No elements in slice b present in slice a",
+                       a:        []string{"a", "b", "c"},
+                       b:        []string{"x", "y", "z"},
+                       expected: []string{"a", "b", "c"},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert := require.New(t)
+                       result := Difference(tt.a, tt.b)
+                       assert.ElementsMatch(tt.expected, result)
+               })
+       }
+}


Reply via email to