This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 97c04be feat(catalog/glue): add support for glue catalog namespace
operations (#173)
97c04be is described below
commit 97c04bebb158f410b348bf10eb927746ae431b15
Author: natusioe <[email protected]>
AuthorDate: Thu Oct 24 07:41:47 2024 -0700
feat(catalog/glue): add support for glue catalog namespace operations (#173)
---
catalog/catalog.go | 11 +-
catalog/glue.go | 320 +++++++++++++++++++++++++++++++++++---
catalog/glue_test.go | 429 ++++++++++++++++++++++++++++++++++++++++++++++++---
cmd/iceberg/main.go | 48 +++++-
utils.go | 16 ++
utils_test.go | 78 ++++++++++
6 files changed, 845 insertions(+), 57 deletions(-)
diff --git a/catalog/catalog.go b/catalog/catalog.go
index d6d7f1e..65da7e5 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -30,6 +30,8 @@ import (
type CatalogType string
+type AwsProperties map[string]string
+
const (
REST CatalogType = "rest"
Hive CatalogType = "hive"
@@ -52,6 +54,12 @@ func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] {
}
}
+func WithAwsProperties(props AwsProperties) Option[GlueCatalog] {
+ return func(o *options) {
+ o.awsProperties = props
+ }
+}
+
func WithCredential(cred string) Option[RestCatalog] {
return func(o *options) {
o.credential = cred
@@ -117,7 +125,8 @@ func WithPrefix(prefix string) Option[RestCatalog] {
type Option[T GlueCatalog | RestCatalog] func(*options)
type options struct {
- awsConfig aws.Config
+ awsConfig aws.Config
+ awsProperties AwsProperties
tlsConfig *tls.Config
credential string
diff --git a/catalog/glue.go b/catalog/glue.go
index 91b21ff..c970e5a 100644
--- a/catalog/glue.go
+++ b/catalog/glue.go
@@ -30,22 +30,48 @@ import (
"github.com/aws/aws-sdk-go-v2/service/glue/types"
)
-const glueTypeIceberg = "ICEBERG"
+const (
+ // Use the same conventions as in the pyiceberg project.
+ // See:
https://github.com/apache/iceberg-python/blob/main/pyiceberg/catalog/__init__.py#L82-L96
+ glueTypeIceberg = "ICEBERG"
+ databaseTypePropsKey = "database_type"
+ tableTypePropsKey = "table_type"
+ descriptionPropsKey = "Description"
+
+ // Database location.
+ locationPropsKey = "Location"
+
+ // Table metadata location pointer.
+ metadataLocationPropsKey = "metadata_location"
+
+ // The ID of the Glue Data Catalog where the tables reside. If none is
provided, Glue
+ // automatically uses the caller's AWS account ID by default.
+ // See:
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
+ glueCatalogIdKey = "glue.id"
+)
var (
_ Catalog = (*GlueCatalog)(nil)
)
type glueAPI interface {
+ CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns
...func(*glue.Options)) (*glue.CreateTableOutput, error)
GetTable(ctx context.Context, params *glue.GetTableInput, optFns
...func(*glue.Options)) (*glue.GetTableOutput, error)
GetTables(ctx context.Context, params *glue.GetTablesInput, optFns
...func(*glue.Options)) (*glue.GetTablesOutput, error)
+ DeleteTable(ctx context.Context, params *glue.DeleteTableInput, optFns
...func(*glue.Options)) (*glue.DeleteTableOutput, error)
+ GetDatabase(ctx context.Context, params *glue.GetDatabaseInput, optFns
...func(*glue.Options)) (*glue.GetDatabaseOutput, error)
GetDatabases(ctx context.Context, params *glue.GetDatabasesInput,
optFns ...func(*glue.Options)) (*glue.GetDatabasesOutput, error)
+ CreateDatabase(ctx context.Context, params *glue.CreateDatabaseInput,
optFns ...func(*glue.Options)) (*glue.CreateDatabaseOutput, error)
+ DeleteDatabase(ctx context.Context, params *glue.DeleteDatabaseInput,
optFns ...func(*glue.Options)) (*glue.DeleteDatabaseOutput, error)
+ UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput,
optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error)
}
type GlueCatalog struct {
- glueSvc glueAPI
+ glueSvc glueAPI
+ catalogId *string
}
+// NewGlueCatalog creates a new instance of GlueCatalog with the given options.
func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
glueOps := &options{}
@@ -53,12 +79,20 @@ func NewGlueCatalog(opts ...Option[GlueCatalog])
*GlueCatalog {
o(glueOps)
}
+ var catalogId *string
+ if val, ok := glueOps.awsProperties[glueCatalogIdKey]; ok {
+ catalogId = &val
+ } else {
+ catalogId = nil
+ }
+
return &GlueCatalog{
- glueSvc: glue.NewFromConfig(glueOps.awsConfig),
+ glueSvc: glue.NewFromConfig(glueOps.awsConfig),
+ catalogId: catalogId,
}
}
-// ListTables returns a list of iceberg tables in the given Glue database.
+// ListTables returns a list of Iceberg tables in the given Glue database.
//
// The namespace should just contain the Glue database name.
func (c *GlueCatalog) ListTables(ctx context.Context, namespace
table.Identifier) ([]table.Identifier, error) {
@@ -67,7 +101,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context,
namespace table.Identifier
return nil, err
}
- params := &glue.GetTablesInput{DatabaseName: aws.String(database)}
+ params := &glue.GetTablesInput{CatalogId: c.catalogId, DatabaseName:
aws.String(database)}
var icebergTables []table.Identifier
@@ -92,7 +126,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context,
namespace table.Identifier
// LoadTable loads a table from the catalog table details.
//
-// The identifier should contain the Glue database name, then glue table name.
+// The identifier should contain the Glue database name, then Glue table name.
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier
table.Identifier, props iceberg.Properties) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
@@ -103,11 +137,16 @@ func (c *GlueCatalog) LoadTable(ctx context.Context,
identifier table.Identifier
props = map[string]string{}
}
- location, err := c.getTable(ctx, database, tableName)
+ glueTable, err := c.getTable(ctx, database, tableName)
if err != nil {
return nil, err
}
+ location, ok := glueTable.Parameters[metadataLocationPropsKey]
+ if !ok {
+ return nil, fmt.Errorf("missing metadata location for table
%s.%s", database, tableName)
+ }
+
// TODO: consider providing a way to directly access the S3 iofs to
enable testing of the catalog.
iofs, err := io.LoadFS(props, location)
if err != nil {
@@ -126,34 +165,251 @@ func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}
+// DropTable deletes an Iceberg table from the Glue catalog.
func (c *GlueCatalog) DropTable(ctx context.Context, identifier
table.Identifier) error {
- return fmt.Errorf("%w: [Glue Catalog] drop table",
iceberg.ErrNotImplemented)
+ database, tableName, err := identifierToGlueTable(identifier)
+ if err != nil {
+ return err
+ }
+
+ // Check if the table exists and is an Iceberg table.
+ _, err = c.getTable(ctx, database, tableName)
+ if err != nil {
+ return err
+ }
+
+ params := &glue.DeleteTableInput{
+ CatalogId: c.catalogId,
+ DatabaseName: aws.String(database),
+ Name: aws.String(tableName),
+ }
+ _, err = c.glueSvc.DeleteTable(ctx, params)
+ if err != nil {
+ return fmt.Errorf("failed to drop table %s.%s: %w", database,
tableName, err)
+ }
+
+ return nil
}
+// RenameTable renames an Iceberg table in the Glue catalog.
func (c *GlueCatalog) RenameTable(ctx context.Context, from, to
table.Identifier) (*table.Table, error) {
- return nil, fmt.Errorf("%w: [Glue Catalog] rename table",
iceberg.ErrNotImplemented)
+ fromDatabase, fromTable, err := identifierToGlueTable(from)
+ if err != nil {
+ return nil, err
+ }
+
+ toDatabase, toTable, err := identifierToGlueTable(to)
+ if err != nil {
+ return nil, err
+ }
+
+ if fromDatabase != toDatabase {
+ return nil, fmt.Errorf("cannot rename table across namespaces:
%s -> %s", fromDatabase, toDatabase)
+ }
+
+ // Fetch the existing Glue table to copy the metadata into the new
table.
+ fromGlueTable, err := c.getTable(ctx, fromDatabase, fromTable)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch the table %s.%s: %w",
fromDatabase, fromTable, err)
+ }
+
+ // Create the new table.
+ _, err = c.glueSvc.CreateTable(ctx, &glue.CreateTableInput{
+ CatalogId: c.catalogId,
+ DatabaseName: aws.String(toDatabase),
+ TableInput: &types.TableInput{
+ Name: aws.String(toTable),
+ Owner: fromGlueTable.Owner,
+ Description: fromGlueTable.Description,
+ Parameters: fromGlueTable.Parameters,
+ StorageDescriptor: fromGlueTable.StorageDescriptor,
+ },
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to create the table %s.%s: %w",
fromDatabase, fromTable, err)
+ }
+
+ // Drop the old table.
+ _, err = c.glueSvc.DeleteTable(ctx, &glue.DeleteTableInput{
+ CatalogId: c.catalogId,
+ DatabaseName: aws.String(fromDatabase),
+ Name: aws.String(fromTable),
+ })
+ if err != nil {
+ // Best-effort rollback the table creation.
+ _, rollbackErr := c.glueSvc.DeleteTable(ctx,
&glue.DeleteTableInput{
+ CatalogId: c.catalogId,
+ DatabaseName: aws.String(toDatabase),
+ Name: aws.String(toTable),
+ })
+ if rollbackErr != nil {
+ fmt.Printf("failed to rollback the new table %s.%s:
%v", toDatabase, toTable, rollbackErr)
+ }
+
+ return nil, fmt.Errorf("failed to rename the table %s.%s: %w",
fromDatabase, fromTable, err)
+ }
+
+ // Load the new table to return.
+ renamedTable, err := c.LoadTable(ctx, GlueTableIdentifier(toDatabase,
toTable), nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to load renamed table %s.%s:
%w", toDatabase, toTable, err)
+ }
+
+ return renamedTable, nil
}
+// CreateNamespace creates a new Iceberg namespace in the Glue catalog.
func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace
table.Identifier, props iceberg.Properties) error {
- return fmt.Errorf("%w: [Glue Catalog] create namespace",
iceberg.ErrNotImplemented)
+ database, err := identifierToGlueDatabase(namespace)
+ if err != nil {
+ return err
+ }
+
+ databaseParameters := map[string]string{
+ databaseTypePropsKey: glueTypeIceberg,
+ }
+
+ description := props[descriptionPropsKey]
+ locationURI := props[locationPropsKey]
+
+ if description != "" {
+ databaseParameters[descriptionPropsKey] = description
+ }
+ if locationURI != "" {
+ databaseParameters[locationPropsKey] = locationURI
+ }
+
+ databaseInput := &types.DatabaseInput{
+ Name: aws.String(database),
+ Parameters: databaseParameters,
+ }
+
+ params := &glue.CreateDatabaseInput{CatalogId: c.catalogId,
DatabaseInput: databaseInput}
+ _, err = c.glueSvc.CreateDatabase(ctx, params)
+
+ if err != nil {
+ return fmt.Errorf("failed to create database %s: %w", database,
err)
+ }
+
+ return nil
}
+// DropNamespace deletes an Iceberg namespace from the Glue catalog.
func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace
table.Identifier) error {
- return fmt.Errorf("%w: [Glue Catalog] drop namespace",
iceberg.ErrNotImplemented)
+ databaseName, err := identifierToGlueDatabase(namespace)
+ if err != nil {
+ return err
+ }
+
+ // Check if the database exists and is an iceberg database.
+ _, err = c.getDatabase(ctx, databaseName)
+ if err != nil {
+ return err
+ }
+
+ params := &glue.DeleteDatabaseInput{CatalogId: c.catalogId, Name:
aws.String(databaseName)}
+ _, err = c.glueSvc.DeleteDatabase(ctx, params)
+ if err != nil {
+ return fmt.Errorf("failed to drop namespace %s: %w",
databaseName, err)
+ }
+
+ return nil
}
+// LoadNamespaceProperties loads the properties of an Iceberg namespace from
the Glue catalog.
func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace
table.Identifier) (iceberg.Properties, error) {
- return nil, fmt.Errorf("%w: [Glue Catalog] load namespace properties",
iceberg.ErrNotImplemented)
+ databaseName, err := identifierToGlueDatabase(namespace)
+ if err != nil {
+ return nil, err
+ }
+
+ database, err := c.getDatabase(ctx, databaseName)
+ if err != nil {
+ return nil, err
+ }
+
+ props := make(map[string]string)
+ if database.Parameters != nil {
+ for k, v := range database.Parameters {
+ props[k] = v
+ }
+ }
+
+ return props, nil
}
+// UpdateNamespaceProperties updates the properties of an Iceberg namespace in
the Glue catalog.
+// The removals list contains the keys to remove, and the updates map contains
the keys and values to update.
func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace
table.Identifier,
removals []string, updates iceberg.Properties)
(PropertiesUpdateSummary, error) {
- return PropertiesUpdateSummary{}, fmt.Errorf("%w: [Glue Catalog] update
namespace properties", iceberg.ErrNotImplemented)
+
+ databaseName, err := identifierToGlueDatabase(namespace)
+ if err != nil {
+ return PropertiesUpdateSummary{}, err
+ }
+
+ database, err := c.getDatabase(ctx, databaseName)
+ if err != nil {
+ return PropertiesUpdateSummary{}, err
+ }
+
+ overlap := []string{}
+ for _, key := range removals {
+ if _, exists := updates[key]; exists {
+ overlap = append(overlap, key)
+ }
+ }
+ if len(overlap) > 0 {
+ return PropertiesUpdateSummary{}, fmt.Errorf("conflict between
removals and updates for keys: %v", overlap)
+ }
+
+ updatedProperties := make(map[string]string)
+ if database.Parameters != nil {
+ for k, v := range database.Parameters {
+ updatedProperties[k] = v
+ }
+ }
+
+ // Removals.
+ removed := []string{}
+ for _, key := range removals {
+ if _, exists := updatedProperties[key]; exists {
+ delete(updatedProperties, key)
+ removed = append(removed, key)
+ }
+ }
+
+ // Updates.
+ updated := []string{}
+ for key, value := range updates {
+ if updatedProperties[key] != value {
+ updatedProperties[key] = value
+ updated = append(updated, key)
+ }
+ }
+
+ _, err = c.glueSvc.UpdateDatabase(ctx,
&glue.UpdateDatabaseInput{CatalogId: c.catalogId, Name:
aws.String(databaseName), DatabaseInput: &types.DatabaseInput{
+ Name: aws.String(databaseName),
+ Parameters: updatedProperties,
+ }})
+ if err != nil {
+ return PropertiesUpdateSummary{}, fmt.Errorf("failed to update
namespace properties %s: %w", databaseName, err)
+ }
+
+ propertiesUpdateSummary := PropertiesUpdateSummary{
+ Removed: removed,
+ Updated: updated,
+ Missing: iceberg.Difference(removals, removed),
+ }
+
+ return propertiesUpdateSummary, nil
}
// ListNamespaces returns a list of Iceberg namespaces from the given Glue
catalog.
func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent
table.Identifier) ([]table.Identifier, error) {
- params := &glue.GetDatabasesInput{}
+ params := &glue.GetDatabasesInput{
+ CatalogId: c.catalogId,
+ }
if parent != nil {
return nil, fmt.Errorf("hierarchical namespace is not
supported")
@@ -181,25 +437,43 @@ func (c *GlueCatalog) ListNamespaces(ctx context.Context,
parent table.Identifie
}
// GetTable loads a table from the Glue Catalog using the given database and
table name.
-func (c *GlueCatalog) getTable(ctx context.Context, database, tableName
string) (string, error) {
+func (c *GlueCatalog) getTable(ctx context.Context, database, tableName
string) (*types.Table, error) {
tblRes, err := c.glueSvc.GetTable(ctx,
&glue.GetTableInput{
+ CatalogId: c.catalogId,
DatabaseName: aws.String(database),
Name: aws.String(tableName),
},
)
if err != nil {
if errors.Is(err, &types.EntityNotFoundException{}) {
- return "", fmt.Errorf("failed to get table %s.%s: %w",
database, tableName, ErrNoSuchTable)
+ return nil, fmt.Errorf("failed to get table %s.%s: %w",
database, tableName, ErrNoSuchTable)
+ }
+ return nil, fmt.Errorf("failed to get table %s.%s: %w",
database, tableName, err)
+ }
+
+ if tblRes.Table.Parameters[tableTypePropsKey] != glueTypeIceberg {
+ return nil, fmt.Errorf("table %s.%s is not an iceberg table",
database, tableName)
+ }
+
+ return tblRes.Table, nil
+}
+
+// GetDatabase loads a database from the Glue Catalog using the given database
name.
+func (c *GlueCatalog) getDatabase(ctx context.Context, databaseName string)
(*types.Database, error) {
+ database, err := c.glueSvc.GetDatabase(ctx,
&glue.GetDatabaseInput{CatalogId: c.catalogId, Name: aws.String(databaseName)})
+ if err != nil {
+ if errors.Is(err, &types.EntityNotFoundException{}) {
+ return nil, fmt.Errorf("failed to get namespace %s:
%w", databaseName, ErrNoSuchNamespace)
}
- return "", fmt.Errorf("failed to get table %s.%s: %w",
database, tableName, err)
+ return nil, fmt.Errorf("failed to get namespace %s: %w",
databaseName, err)
}
- if tblRes.Table.Parameters["table_type"] != "ICEBERG" {
- return "", errors.New("table is not an iceberg table")
+ if database.Database.Parameters[databaseTypePropsKey] !=
glueTypeIceberg {
+ return nil, fmt.Errorf("namespace %s is not an iceberg
namespace", databaseName)
}
- return tblRes.Table.Parameters["metadata_location"], nil
+ return database.Database, nil
}
func identifierToGlueTable(identifier table.Identifier) (string, string,
error) {
@@ -218,7 +492,7 @@ func identifierToGlueDatabase(identifier table.Identifier)
(string, error) {
return identifier[0], nil
}
-// GlueTableIdentifier returns a glue table identifier for an iceberg table in
the format [database, table].
+// GlueTableIdentifier returns a glue table identifier for an Iceberg table in
the format [database, table].
func GlueTableIdentifier(database string, tableName string) table.Identifier {
return []string{database, tableName}
}
@@ -232,7 +506,7 @@ func filterTableListByType(database string, tableList
[]types.Table, tableType s
var filtered []table.Identifier
for _, tbl := range tableList {
- if tbl.Parameters["table_type"] != tableType {
+ if tbl.Parameters[tableTypePropsKey] != tableType {
continue
}
filtered = append(filtered, GlueTableIdentifier(database,
aws.ToString(tbl.Name)))
@@ -245,7 +519,7 @@ func filterDatabaseListByType(databases []types.Database,
databaseType string) [
var filtered []table.Identifier
for _, database := range databases {
- if database.Parameters["database_type"] != databaseType {
+ if database.Parameters[databaseTypePropsKey] != databaseType {
continue
}
filtered = append(filtered,
GlueDatabaseIdentifier(aws.ToString(database.Name)))
diff --git a/catalog/glue_test.go b/catalog/glue_test.go
index 5889537..c08f4cc 100644
--- a/catalog/glue_test.go
+++ b/catalog/glue_test.go
@@ -19,6 +19,7 @@ package catalog
import (
"context"
+ "errors"
"os"
"testing"
@@ -34,6 +35,11 @@ type mockGlueClient struct {
mock.Mock
}
+func (m *mockGlueClient) CreateTable(ctx context.Context, params
*glue.CreateTableInput, optFns ...func(*glue.Options))
(*glue.CreateTableOutput, error) {
+ args := m.Called(ctx, params, optFns)
+ return args.Get(0).(*glue.CreateTableOutput), args.Error(1)
+}
+
func (m *mockGlueClient) GetTable(ctx context.Context, params
*glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput,
error) {
args := m.Called(ctx, params, optFns)
return args.Get(0).(*glue.GetTableOutput), args.Error(1)
@@ -44,11 +50,51 @@ func (m *mockGlueClient) GetTables(ctx context.Context,
params *glue.GetTablesIn
return args.Get(0).(*glue.GetTablesOutput), args.Error(1)
}
+func (m *mockGlueClient) DeleteTable(ctx context.Context, params
*glue.DeleteTableInput, optFns ...func(*glue.Options))
(*glue.DeleteTableOutput, error) {
+ args := m.Called(ctx, params, optFns)
+ return args.Get(0).(*glue.DeleteTableOutput), args.Error(1)
+}
+
+func (m *mockGlueClient) GetDatabase(ctx context.Context, params
*glue.GetDatabaseInput, optFns ...func(*glue.Options))
(*glue.GetDatabaseOutput, error) {
+ args := m.Called(ctx, params, optFns)
+ return args.Get(0).(*glue.GetDatabaseOutput), args.Error(1)
+}
+
func (m *mockGlueClient) GetDatabases(ctx context.Context, params
*glue.GetDatabasesInput, optFns ...func(*glue.Options))
(*glue.GetDatabasesOutput, error) {
args := m.Called(ctx, params, optFns)
return args.Get(0).(*glue.GetDatabasesOutput), args.Error(1)
}
+func (m *mockGlueClient) CreateDatabase(ctx context.Context, params
*glue.CreateDatabaseInput, optFns ...func(*glue.Options))
(*glue.CreateDatabaseOutput, error) {
+ args := m.Called(ctx, params, optFns)
+ return args.Get(0).(*glue.CreateDatabaseOutput), args.Error(1)
+}
+
+func (m *mockGlueClient) DeleteDatabase(ctx context.Context, params
*glue.DeleteDatabaseInput, optFns ...func(*glue.Options))
(*glue.DeleteDatabaseOutput, error) {
+ args := m.Called(ctx, params, optFns)
+ return args.Get(0).(*glue.DeleteDatabaseOutput), args.Error(1)
+}
+
+func (m *mockGlueClient) UpdateDatabase(ctx context.Context, params
*glue.UpdateDatabaseInput, optFns ...func(*glue.Options))
(*glue.UpdateDatabaseOutput, error) {
+ args := m.Called(ctx, params, optFns)
+ return args.Get(0).(*glue.UpdateDatabaseOutput), args.Error(1)
+}
+
+var testIcebergGlueTable = types.Table{
+ Name: aws.String("test_table"),
+ Parameters: map[string]string{
+ tableTypePropsKey: "ICEBERG",
+ metadataLocationPropsKey:
"s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
+ },
+}
+
+var testNonIcebergGlueTable = types.Table{
+ Name: aws.String("other_table"),
+ Parameters: map[string]string{
+ metadataLocationPropsKey: "s3://test-bucket/other_table/",
+ },
+}
+
func TestGlueGetTable(t *testing.T) {
assert := require.New(t)
@@ -57,22 +103,15 @@ func TestGlueGetTable(t *testing.T) {
mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
DatabaseName: aws.String("test_database"),
Name: aws.String("test_table"),
- }, mock.Anything).Return(&glue.GetTableOutput{
- Table: &types.Table{
- Parameters: map[string]string{
- "table_type": "ICEBERG",
- "metadata_location":
"s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
- },
- },
- }, nil)
+ }, mock.Anything).Return(&glue.GetTableOutput{Table:
&testIcebergGlueTable}, nil)
glueCatalog := &GlueCatalog{
glueSvc: mockGlueSvc,
}
- location, err := glueCatalog.getTable(context.TODO(), "test_database",
"test_table")
+ table, err := glueCatalog.getTable(context.TODO(), "test_database",
"test_table")
assert.NoError(err)
-
assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
location)
+
assert.Equal("s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
table.Parameters[metadataLocationPropsKey])
}
func TestGlueListTables(t *testing.T) {
@@ -83,21 +122,7 @@ func TestGlueListTables(t *testing.T) {
mockGlueSvc.On("GetTables", mock.Anything, &glue.GetTablesInput{
DatabaseName: aws.String("test_database"),
}, mock.Anything).Return(&glue.GetTablesOutput{
- TableList: []types.Table{
- {
- Name: aws.String("test_table"),
- Parameters: map[string]string{
- "table_type": "ICEBERG",
- "metadata_location":
"s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
- },
- },
- {
- Name: aws.String("other_table"),
- Parameters: map[string]string{
- "metadata_location":
"s3://test-bucket/other_table/",
- },
- },
- },
+ TableList: []types.Table{testIcebergGlueTable,
testNonIcebergGlueTable},
}, nil).Once()
glueCatalog := &GlueCatalog{
@@ -140,6 +165,360 @@ func TestGlueListNamespaces(t *testing.T) {
assert.Equal([]string{"test_database"}, databases[0])
}
+func TestGlueDropTable(t *testing.T) {
+ assert := require.New(t)
+
+ mockGlueSvc := &mockGlueClient{}
+
+ mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("test_table"),
+ }, mock.Anything).Return(&glue.GetTableOutput{
+ Table: &testIcebergGlueTable,
+ }, nil).Once()
+
+ mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("test_table"),
+ }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
+
+ glueCatalog := &GlueCatalog{
+ glueSvc: mockGlueSvc,
+ }
+
+ err := glueCatalog.DropTable(context.TODO(),
GlueTableIdentifier("test_database", "test_table"))
+ assert.NoError(err)
+}
+
+func TestGlueCreateNamespace(t *testing.T) {
+ assert := require.New(t)
+
+ mockGlueSvc := &mockGlueClient{}
+
+ mockGlueSvc.On("CreateDatabase", mock.Anything,
&glue.CreateDatabaseInput{
+ DatabaseInput: &types.DatabaseInput{
+ Name: aws.String("test_namespace"),
+ Parameters: map[string]string{
+ databaseTypePropsKey: glueTypeIceberg,
+ descriptionPropsKey: "Test Description",
+ locationPropsKey: "s3://test-location",
+ },
+ },
+ }, mock.Anything).Return(&glue.CreateDatabaseOutput{}, nil).Once()
+
+ glueCatalog := &GlueCatalog{
+ glueSvc: mockGlueSvc,
+ }
+
+ props := map[string]string{
+ descriptionPropsKey: "Test Description",
+ locationPropsKey: "s3://test-location",
+ }
+
+ err := glueCatalog.CreateNamespace(context.TODO(),
GlueDatabaseIdentifier("test_namespace"), props)
+ assert.NoError(err)
+}
+
+func TestGlueDropNamespace(t *testing.T) {
+ assert := require.New(t)
+
+ mockGlueSvc := &mockGlueClient{}
+
+ mockGlueSvc.On("GetDatabase", mock.Anything, &glue.GetDatabaseInput{
+ Name: aws.String("test_namespace"),
+ }, mock.Anything).Return(&glue.GetDatabaseOutput{
+ Database: &types.Database{
+ Name: aws.String("test_namespace"),
+ Parameters: map[string]string{
+ "database_type": "ICEBERG",
+ },
+ },
+ }, nil).Once()
+
+ mockGlueSvc.On("DeleteDatabase", mock.Anything,
&glue.DeleteDatabaseInput{
+ Name: aws.String("test_namespace"),
+ }, mock.Anything).Return(&glue.DeleteDatabaseOutput{}, nil).Once()
+
+ glueCatalog := &GlueCatalog{
+ glueSvc: mockGlueSvc,
+ }
+
+ err := glueCatalog.DropNamespace(context.TODO(),
GlueDatabaseIdentifier("test_namespace"))
+ assert.NoError(err)
+}
+
+func TestGlueUpdateNamespaceProperties(t *testing.T) {
+ tests := []struct {
+ name string
+ initial map[string]string
+ updates map[string]string
+ removals []string
+ expected PropertiesUpdateSummary
+ shouldError bool
+ }{
+ {
+ name: "Overlapping removals and updates",
+ initial: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ },
+ updates: map[string]string{
+ "key1": "new_value1",
+ "key3": "value3",
+ },
+ removals: []string{"key1"},
+ shouldError: true,
+ },
+ {
+ name: "Some keys in removals are missing",
+ initial: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ },
+ updates: map[string]string{
+ "key3": "value3",
+ },
+ removals: []string{"key4"},
+ expected: PropertiesUpdateSummary{
+ Removed: []string{},
+ Updated: []string{"key3"},
+ Missing: []string{"key4"},
+ },
+ shouldError: false,
+ },
+ {
+ name: "No changes to some properties",
+ initial: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ },
+ updates: map[string]string{
+ "key1": "value1",
+ "key3": "value3",
+ },
+ removals: []string{},
+ expected: PropertiesUpdateSummary{
+ Removed: []string{},
+ Updated: []string{"key3"},
+ Missing: []string{},
+ },
+ shouldError: false,
+ },
+ {
+ name: "Happy path with updates and removals",
+ initial: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ "key4": "value4",
+ },
+ updates: map[string]string{
+ "key2": "new_value2",
+ },
+ removals: []string{"key4"},
+ expected: PropertiesUpdateSummary{
+ Removed: []string{"key4"},
+ Updated: []string{"key2"},
+ Missing: []string{},
+ },
+ shouldError: false,
+ },
+ {
+ name: "Happy path with only updates",
+ initial: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ },
+ updates: map[string]string{
+ "key2": "new_value2",
+ },
+ removals: []string{},
+ expected: PropertiesUpdateSummary{
+ Removed: []string{},
+ Updated: []string{"key2"},
+ Missing: []string{},
+ },
+ shouldError: false,
+ },
+ {
+ name: "Happy path with only removals",
+ initial: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ "key3": "value3",
+ },
+ updates: map[string]string{},
+ removals: []string{"key2", "key3"},
+ expected: PropertiesUpdateSummary{
+ Removed: []string{"key2", "key3"},
+ Updated: []string{},
+ Missing: []string{},
+ },
+ shouldError: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert := require.New(t)
+
+ mockGlueSvc := &mockGlueClient{}
+
+ tt.initial[databaseTypePropsKey] = glueTypeIceberg
+
+ mockGlueSvc.On("GetDatabase", mock.Anything,
&glue.GetDatabaseInput{
+ Name: aws.String("test_namespace"),
+ }, mock.Anything).Return(&glue.GetDatabaseOutput{
+ Database: &types.Database{
+ Name:
aws.String("test_namespace"),
+ Parameters: tt.initial,
+ },
+ }, nil).Once()
+
+ if !tt.shouldError {
+ mockGlueSvc.On("UpdateDatabase", mock.Anything,
mock.Anything, mock.Anything).Return(&glue.UpdateDatabaseOutput{}, nil).Once()
+ }
+
+ glueCatalog := &GlueCatalog{
+ glueSvc: mockGlueSvc,
+ }
+
+ summary, err :=
glueCatalog.UpdateNamespaceProperties(context.TODO(),
GlueDatabaseIdentifier("test_namespace"), tt.removals, tt.updates)
+ if tt.shouldError {
+ assert.Error(err)
+ } else {
+ assert.NoError(err)
+ assert.EqualValues(tt.expected.Removed,
summary.Removed)
+ assert.EqualValues(tt.expected.Updated,
summary.Updated)
+ assert.EqualValues(tt.expected.Missing,
summary.Missing)
+ }
+ })
+ }
+}
+
+func TestGlueRenameTable(t *testing.T) {
+ t.Skip("Skipping this test temporarily because LoadTable is not
testable due to the dependency on the IO.")
+
+ assert := require.New(t)
+
+ mockGlueSvc := &mockGlueClient{}
+
+ // Mock GetTable response
+ mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("test_table"),
+ }, mock.Anything).Return(&glue.GetTableOutput{
+ Table: &types.Table{
+ Name: aws.String("test_table"),
+ Parameters: map[string]string{
+ tableTypePropsKey: glueTypeIceberg,
+ },
+ Owner: aws.String("owner"),
+ Description: aws.String("description"),
+ StorageDescriptor: &types.StorageDescriptor{},
+ },
+ }, nil).Once()
+
+ mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("new_test_table"),
+ }, mock.Anything).Return(&glue.GetTableOutput{
+ Table: &types.Table{
+ Name: aws.String("new_test_table"),
+ Parameters: map[string]string{
+ tableTypePropsKey: glueTypeIceberg,
+ metadataLocationPropsKey:
"s3://test-bucket/new_test_table/metadata/abc123-123.metadata.json",
+ },
+ Owner: aws.String("owner"),
+ Description: aws.String("description"),
+ StorageDescriptor: &types.StorageDescriptor{},
+ },
+ }, nil).Once()
+
+ // Mock CreateTable response
+ mockGlueSvc.On("CreateTable", mock.Anything, &glue.CreateTableInput{
+ DatabaseName: aws.String("test_database"),
+ TableInput: &types.TableInput{
+ Name: aws.String("new_test_table"),
+ Owner: aws.String("owner"),
+ Description: aws.String("description"),
+ Parameters: map[string]string{tableTypePropsKey:
glueTypeIceberg},
+ StorageDescriptor: &types.StorageDescriptor{},
+ },
+ }, mock.Anything).Return(&glue.CreateTableOutput{}, nil).Once()
+
+ // Mock DeleteTable response for old table
+ mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("test_table"),
+ }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
+
+ glueCatalog := &GlueCatalog{
+ glueSvc: mockGlueSvc,
+ }
+
+ renamedTable, err := glueCatalog.RenameTable(context.TODO(),
GlueTableIdentifier("test_database", "test_table"),
GlueTableIdentifier("test_database", "new_test_table"))
+ assert.NoError(err)
+ assert.Equal("new_test_table", renamedTable.Identifier()[1])
+}
+
+func TestGlueRenameTable_DeleteTableFailureRollback(t *testing.T) {
+ assert := require.New(t)
+
+ mockGlueSvc := &mockGlueClient{}
+
+ // Mock GetTable response
+ mockGlueSvc.On("GetTable", mock.Anything, &glue.GetTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("test_table"),
+ }, mock.Anything).Return(&glue.GetTableOutput{
+ Table: &types.Table{
+ Name: aws.String("test_table"),
+ Parameters: map[string]string{
+ tableTypePropsKey: glueTypeIceberg,
+ },
+ Owner: aws.String("owner"),
+ Description: aws.String("description"),
+ StorageDescriptor: &types.StorageDescriptor{},
+ },
+ }, nil).Once()
+
+ // Mock CreateTable response
+ mockGlueSvc.On("CreateTable", mock.Anything, &glue.CreateTableInput{
+ DatabaseName: aws.String("test_database"),
+ TableInput: &types.TableInput{
+ Name: aws.String("new_test_table"),
+ Owner: aws.String("owner"),
+ Description: aws.String("description"),
+ Parameters: map[string]string{tableTypePropsKey:
glueTypeIceberg},
+ StorageDescriptor: &types.StorageDescriptor{},
+ },
+ }, mock.Anything).Return(&glue.CreateTableOutput{}, nil).Once()
+
+ // Mock DeleteTable response for old table (fail)
+ mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("test_table"),
+ }, mock.Anything).Return(&glue.DeleteTableOutput{}, errors.New("delete
table failed")).Once()
+
+ // Mock DeleteTable response for rollback (new table)
+ mockGlueSvc.On("DeleteTable", mock.Anything, &glue.DeleteTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("new_test_table"),
+ }, mock.Anything).Return(&glue.DeleteTableOutput{}, nil).Once()
+
+ glueCatalog := &GlueCatalog{
+ glueSvc: mockGlueSvc,
+ }
+
+ renamedTable, err := glueCatalog.RenameTable(context.TODO(),
GlueTableIdentifier("test_database", "test_table"),
GlueTableIdentifier("test_database", "new_test_table"))
+ assert.Error(err)
+ assert.Nil(renamedTable)
+ mockGlueSvc.AssertCalled(t, "DeleteTable", mock.Anything,
&glue.DeleteTableInput{
+ DatabaseName: aws.String("test_database"),
+ Name: aws.String("new_test_table"),
+ }, mock.Anything)
+}
+
func TestGlueListTablesIntegration(t *testing.T) {
if os.Getenv("TEST_DATABASE_NAME") == "" {
t.Skip()
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index 7ec4a8e..a44ffb6 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -38,6 +38,8 @@ Usage:
iceberg list [options] [PARENT]
iceberg describe [options] [namespace | table] IDENTIFIER
iceberg (schema | spec | uuid | location) [options] TABLE_ID
+ iceberg create [options] (namespace | table) IDENTIFIER
+ iceberg create [options] (namespace | table) IDENTIFIER
iceberg drop [options] (namespace | table) IDENTIFIER
iceberg files [options] TABLE_ID [--history]
iceberg rename [options] <from> <to>
@@ -60,7 +62,9 @@ Options:
--output TYPE output type (json/text) [default: text]
--credential TEXT specify credentials for the catalog
--warehouse TEXT specify the warehouse to use
- --config TEXT specify the path to the configuration file`
+ --config TEXT specify the path to the configuration file
+ --description TEXT specify a description for the namespace
+ --location-uri TEXT specify a location URI for the namespace`
type Config struct {
List bool `docopt:"list"`
@@ -70,6 +74,7 @@ type Config struct {
Uuid bool `docopt:"uuid"`
Location bool `docopt:"location"`
Props bool `docopt:"properties"`
+ Create bool `docopt:"create"`
Drop bool `docopt:"drop"`
Files bool `docopt:"files"`
Rename bool `docopt:"rename"`
@@ -90,13 +95,15 @@ type Config struct {
PropName string `docopt:"PROPNAME"`
Value string `docopt:"VALUE"`
- Catalog string `docopt:"--catalog"`
- URI string `docopt:"--uri"`
- Output string `docopt:"--output"`
- History bool `docopt:"--history"`
- Cred string `docopt:"--credential"`
- Warehouse string `docopt:"--warehouse"`
- Config string `docopt:"--config"`
+ Catalog string `docopt:"--catalog"`
+ URI string `docopt:"--uri"`
+ Output string `docopt:"--output"`
+ History bool `docopt:"--history"`
+ Cred string `docopt:"--credential"`
+ Warehouse string `docopt:"--warehouse"`
+ Config string `docopt:"--config"`
+ Description string `docopt:"--description"`
+ LocationURI string `docopt:"--location-uri"`
}
func main() {
@@ -141,6 +148,9 @@ func main() {
if cat, err = catalog.NewRestCatalog("rest", cfg.URI, opts...);
err != nil {
log.Fatal(err)
}
+ case catalog.Glue:
+ opts := []catalog.Option[catalog.GlueCatalog]{}
+ cat = catalog.NewGlueCatalog(opts...)
default:
log.Fatal("unrecognized catalog type")
}
@@ -201,6 +211,28 @@ func main() {
os.Exit(1)
}
}
+
+ case cfg.Create:
+ switch {
+ case cfg.Namespace:
+ props := iceberg.Properties{}
+ if cfg.Description != "" {
+ props["Description"] = cfg.Description
+ }
+
+ if cfg.LocationURI != "" {
+ props["Location"] = cfg.LocationURI
+ }
+
+ err := cat.CreateNamespace(context.Background(),
catalog.ToRestIdentifier(cfg.Ident), props)
+ if err != nil {
+ output.Error(err)
+ os.Exit(1)
+ }
+ default:
+ output.Error(errors.New("not implemented"))
+ os.Exit(1)
+ }
case cfg.Files:
tbl := loadTable(output, cat, cfg.TableID)
output.Files(tbl, cfg.History)
diff --git a/utils.go b/utils.go
index c0a00fe..7807acb 100644
--- a/utils.go
+++ b/utils.go
@@ -196,3 +196,19 @@ func (l literalSet) All(fn func(Literal) bool) bool {
}
return true
}
+
+// Helper function to find the difference between two slices (a - b).
+func Difference(a, b []string) []string {
+ m := make(map[string]bool)
+ for _, item := range b {
+ m[item] = true
+ }
+
+ diff := make([]string, 0)
+ for _, item := range a {
+ if !m[item] {
+ diff = append(diff, item)
+ }
+ }
+ return diff
+}
diff --git a/utils_test.go b/utils_test.go
new file mode 100644
index 0000000..9f4f888
--- /dev/null
+++ b/utils_test.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestDifference(t *testing.T) {
+ tests := []struct {
+ name string
+ a []string
+ b []string
+ expected []string
+ }{
+ {
+ name: "No elements in common",
+ a: []string{"a", "b", "c"},
+ b: []string{"d", "e", "f"},
+ expected: []string{"a", "b", "c"},
+ },
+ {
+ name: "Some elements in common",
+ a: []string{"a", "b", "c"},
+ b: []string{"b"},
+ expected: []string{"a", "c"},
+ },
+ {
+ name: "All elements in common",
+ a: []string{"a", "b", "c"},
+ b: []string{"a", "b", "c"},
+ expected: []string{},
+ },
+ {
+ name: "Empty slice a",
+ a: []string{},
+ b: []string{"a", "b", "c"},
+ expected: []string{},
+ },
+ {
+ name: "Empty slice b",
+ a: []string{"a", "b", "c"},
+ b: []string{},
+ expected: []string{"a", "b", "c"},
+ },
+ {
+ name: "No elements in slice b present in slice a",
+ a: []string{"a", "b", "c"},
+ b: []string{"x", "y", "z"},
+ expected: []string{"a", "b", "c"},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert := require.New(t)
+ result := Difference(tt.a, tt.b)
+ assert.ElementsMatch(tt.expected, result)
+ })
+ }
+}