This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 55492586 feat(catalog): initial support for Hive Catalog (#678)
55492586 is described below

commit 55492586ff935c0404a261235649c95fc4d56c03
Author: Kanthi <[email protected]>
AuthorDate: Tue Feb 10 11:38:18 2026 -0600

    feat(catalog): initial support for Hive Catalog (#678)
    
    closes: #616
---
 .github/workflows/go-integration.yml  |   2 +-
 catalog/hive/client.go                | 144 +++++++
 catalog/hive/hive.go                  | 571 +++++++++++++++++++++++++
 catalog/hive/hive_integration_test.go | 320 ++++++++++++++
 catalog/hive/hive_test.go             | 758 ++++++++++++++++++++++++++++++++++
 catalog/hive/lock.go                  | 123 ++++++
 catalog/hive/lock_test.go             | 294 +++++++++++++
 catalog/hive/options.go               | 119 ++++++
 catalog/hive/schema.go                | 164 ++++++++
 cmd/iceberg/main.go                   |  11 +
 go.mod                                |   4 +
 go.sum                                |   8 +
 internal/recipe/docker-compose.yml    |  37 ++
 13 files changed, 2554 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/go-integration.yml 
b/.github/workflows/go-integration.yml
index d8d94427..76941e45 100644
--- a/.github/workflows/go-integration.yml
+++ b/.github/workflows/go-integration.yml
@@ -69,7 +69,7 @@ jobs:
           go test -tags integration -v -run="^TestScanner" ./table
           go test -tags integration -v ./io
           go test -tags integration -v -run="^TestRestIntegration$" 
./catalog/rest
-
+          go test -tags=integration -v ./catalog/hive/...
       - name: Run spark integration tests
         env:
           AWS_S3_ENDPOINT: "${{ env.AWS_S3_ENDPOINT }}"
diff --git a/catalog/hive/client.go b/catalog/hive/client.go
new file mode 100644
index 00000000..06a696d0
--- /dev/null
+++ b/catalog/hive/client.go
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+       "context"
+       "fmt"
+       "net/url"
+       "strconv"
+
+       "github.com/beltran/gohive"
+       "github.com/beltran/gohive/hive_metastore"
+)
+
+type HiveClient interface {
+       Close() error
+
+       GetDatabase(ctx context.Context, name string) 
(*hive_metastore.Database, error)
+       CreateDatabase(ctx context.Context, database *hive_metastore.Database) 
error
+       AlterDatabase(ctx context.Context, dbname string, db 
*hive_metastore.Database) error
+       DropDatabase(ctx context.Context, name string, deleteData, cascade 
bool) error
+       GetAllDatabases(ctx context.Context) ([]string, error)
+
+       GetTable(ctx context.Context, dbName, tableName string) 
(*hive_metastore.Table, error)
+       CreateTable(ctx context.Context, tbl *hive_metastore.Table) error
+       AlterTable(ctx context.Context, dbName, tableName string, newTbl 
*hive_metastore.Table) error
+       DropTable(ctx context.Context, dbName, tableName string, deleteData 
bool) error
+       GetTables(ctx context.Context, dbName, pattern string) ([]string, error)
+
+       Lock(ctx context.Context, request *hive_metastore.LockRequest) 
(*hive_metastore.LockResponse, error)
+       CheckLock(ctx context.Context, lockId int64) 
(*hive_metastore.LockResponse, error)
+       Unlock(ctx context.Context, lockId int64) error
+}
+
+type thriftClient struct {
+       *gohive.HiveMetastoreClient
+}
+
+func newHiveClient(uri string, opts *HiveOptions) (HiveClient, error) {
+       parsed, err := url.Parse(uri)
+       if err != nil {
+               return nil, fmt.Errorf("invalid URI: %w", err)
+       }
+
+       host := parsed.Hostname()
+       portStr := parsed.Port()
+       if portStr == "" {
+               portStr = "9083"
+       }
+       port, err := strconv.Atoi(portStr)
+       if err != nil {
+               return nil, fmt.Errorf("invalid port: %w", err)
+       }
+
+       config := gohive.NewMetastoreConnectConfiguration()
+       config.TransportMode = "binary"
+
+       client, err := gohive.ConnectToMetastore(host, port, "NOSASL", config)
+       if err != nil {
+               return nil, fmt.Errorf("failed to connect to metastore: %w", 
err)
+       }
+
+       return &thriftClient{HiveMetastoreClient: client}, nil
+}
+
+func (c *thriftClient) Close() error {
+       if c.HiveMetastoreClient != nil {
+               c.HiveMetastoreClient.Close()
+               c.HiveMetastoreClient = nil
+       }
+
+       return nil
+}
+
+func (c *thriftClient) GetDatabase(ctx context.Context, name string) 
(*hive_metastore.Database, error) {
+       return c.Client.GetDatabase(ctx, name)
+}
+
+func (c *thriftClient) CreateDatabase(ctx context.Context, database 
*hive_metastore.Database) error {
+       return c.Client.CreateDatabase(ctx, database)
+}
+
+func (c *thriftClient) AlterDatabase(ctx context.Context, dbname string, db 
*hive_metastore.Database) error {
+       return c.Client.AlterDatabase(ctx, dbname, db)
+}
+
+func (c *thriftClient) DropDatabase(ctx context.Context, name string, 
deleteData, cascade bool) error {
+       return c.Client.DropDatabase(ctx, name, deleteData, cascade)
+}
+
+func (c *thriftClient) GetAllDatabases(ctx context.Context) ([]string, error) {
+       return c.Client.GetAllDatabases(ctx)
+}
+
+func (c *thriftClient) GetTable(ctx context.Context, dbName, tableName string) 
(*hive_metastore.Table, error) {
+       return c.Client.GetTable(ctx, dbName, tableName)
+}
+
+func (c *thriftClient) CreateTable(ctx context.Context, tbl 
*hive_metastore.Table) error {
+       return c.Client.CreateTable(ctx, tbl)
+}
+
+func (c *thriftClient) AlterTable(ctx context.Context, dbName, tableName 
string, newTbl *hive_metastore.Table) error {
+       return c.Client.AlterTable(ctx, dbName, tableName, newTbl)
+}
+
+func (c *thriftClient) DropTable(ctx context.Context, dbName, tableName 
string, deleteData bool) error {
+       return c.Client.DropTable(ctx, dbName, tableName, deleteData)
+}
+
+func (c *thriftClient) GetTables(ctx context.Context, dbName, pattern string) 
([]string, error) {
+       return c.Client.GetTables(ctx, dbName, pattern)
+}
+
+func (c *thriftClient) Lock(ctx context.Context, request 
*hive_metastore.LockRequest) (*hive_metastore.LockResponse, error) {
+       return c.Client.Lock(ctx, request)
+}
+
+func (c *thriftClient) CheckLock(ctx context.Context, lockId int64) 
(*hive_metastore.LockResponse, error) {
+       return c.Client.CheckLock(ctx, &hive_metastore.CheckLockRequest{
+               Lockid: lockId,
+       })
+}
+
+func (c *thriftClient) Unlock(ctx context.Context, lockId int64) error {
+       return c.Client.Unlock(ctx, &hive_metastore.UnlockRequest{
+               Lockid: lockId,
+       })
+}
diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go
new file mode 100644
index 00000000..2236ab56
--- /dev/null
+++ b/catalog/hive/hive.go
@@ -0,0 +1,571 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "iter"
+       "maps"
+       "strings"
+       _ "unsafe"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/catalog/internal"
+       "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/beltran/gohive/hive_metastore"
+)
+
+const (
+       locationKey    = "location"
+       commentKey     = "comment"
+       descriptionKey = "description"
+)
+
+var _ catalog.Catalog = (*Catalog)(nil)
+
+func init() {
+       catalog.Register("hive", catalog.RegistrarFunc(func(ctx 
context.Context, _ string, props iceberg.Properties) (catalog.Catalog, error) {
+               return NewCatalog(props)
+       }))
+}
+
+type Catalog struct {
+       client HiveClient
+       opts   *HiveOptions
+}
+
+func NewCatalog(props iceberg.Properties, opts ...Option) (*Catalog, error) {
+       o := NewHiveOptions()
+       o.ApplyProperties(props)
+
+       for _, opt := range opts {
+               opt(o)
+       }
+
+       if o.URI == "" {
+               return nil, errors.New("hive.uri is required")
+       }
+
+       client, err := newHiveClient(o.URI, o)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create Hive client: %w", err)
+       }
+
+       return &Catalog{
+               client: client,
+               opts:   o,
+       }, nil
+}
+
+func NewCatalogWithClient(client HiveClient, props iceberg.Properties) 
*Catalog {
+       o := NewHiveOptions()
+       o.ApplyProperties(props)
+
+       return &Catalog{
+               client: client,
+               opts:   o,
+       }
+}
+
+func (c *Catalog) CatalogType() catalog.Type {
+       return catalog.Hive
+}
+
+func (c *Catalog) Close() error {
+       return c.client.Close()
+}
+
+// ListTables returns a list of table identifiers in the given namespace.
+func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) 
iter.Seq2[table.Identifier, error] {
+       return func(yield func(table.Identifier, error) bool) {
+               database, err := identifierToDatabase(namespace)
+               if err != nil {
+                       yield(nil, err)
+
+                       return
+               }
+
+               tableNames, err := c.client.GetTables(ctx, database, "*")
+               if err != nil {
+                       yield(nil, fmt.Errorf("failed to list tables in %s: 
%w", database, err))
+
+                       return
+               }
+
+               if len(tableNames) == 0 {
+                       return
+               }
+
+               for _, tableName := range tableNames {
+                       tbl, err := c.client.GetTable(ctx, database, tableName)
+                       if err != nil {
+                               continue
+                       }
+                       if isIcebergTable(tbl) {
+                               if !yield(TableIdentifier(database, tableName), 
nil) {
+                                       return
+                               }
+                       }
+               }
+       }
+}
+
+func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier) 
(*table.Table, error) {
+       database, tableName, err := identifierToTableName(identifier)
+       if err != nil {
+               return nil, err
+       }
+
+       hiveTbl, err := c.getIcebergTable(ctx, database, tableName)
+       if err != nil {
+               return nil, err
+       }
+
+       metadataLocation, err := getMetadataLocation(hiveTbl)
+       if err != nil {
+               return nil, fmt.Errorf("failed to get metadata location: %w", 
err)
+       }
+
+       return table.NewFromLocation(
+               ctx,
+               identifier,
+               metadataLocation,
+               io.LoadFSFunc(c.opts.props, metadataLocation),
+               c,
+       )
+}
+
+func (c *Catalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) 
(*table.Table, error) {
+       staged, err := internal.CreateStagedTable(ctx, c.opts.props, 
c.LoadNamespaceProperties, identifier, schema, opts...)
+       if err != nil {
+               return nil, err
+       }
+
+       database, tableName, err := identifierToTableName(identifier)
+       if err != nil {
+               return nil, err
+       }
+
+       afs, err := staged.FS(ctx)
+       if err != nil {
+               return nil, err
+       }
+       wfs, ok := afs.(io.WriteFileIO)
+       if !ok {
+               return nil, errors.New("loaded filesystem IO does not support 
writing")
+       }
+
+       compression := 
staged.Table.Properties().Get(table.MetadataCompressionKey, 
table.MetadataCompressionDefault)
+       if err := internal.WriteTableMetadata(staged.Metadata(), wfs, 
staged.MetadataLocation(), compression); err != nil {
+               return nil, err
+       }
+
+       hiveTbl := constructHiveTable(database, tableName, staged.Location(), 
staged.MetadataLocation(), schema, staged.Properties())
+
+       if err := c.client.CreateTable(ctx, hiveTbl); err != nil {
+               if isAlreadyExistsError(err) {
+                       return nil, fmt.Errorf("%w: %s.%s", 
catalog.ErrTableAlreadyExists, database, tableName)
+               }
+
+               return nil, fmt.Errorf("failed to create table %s.%s: %w", 
database, tableName, err)
+       }
+
+       return c.LoadTable(ctx, identifier)
+}
+
+func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) 
error {
+       database, tableName, err := identifierToTableName(identifier)
+       if err != nil {
+               return err
+       }
+
+       if _, err := c.getIcebergTable(ctx, database, tableName); err != nil {
+               return err
+       }
+
+       if err := c.client.DropTable(ctx, database, tableName, false); err != 
nil {
+               return fmt.Errorf("failed to drop table %s.%s: %w", database, 
tableName, err)
+       }
+
+       return nil
+}
+
+func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) 
(*table.Table, error) {
+       fromDB, fromTable, err := identifierToTableName(from)
+       if err != nil {
+               return nil, err
+       }
+
+       toDB, toTable, err := identifierToTableName(to)
+       if err != nil {
+               return nil, err
+       }
+
+       exists, err := c.CheckNamespaceExists(ctx, DatabaseIdentifier(toDB))
+       if err != nil {
+               return nil, err
+       }
+       if !exists {
+               return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, 
toDB)
+       }
+
+       hiveTbl, err := c.getIcebergTable(ctx, fromDB, fromTable)
+       if err != nil {
+               return nil, err
+       }
+
+       hiveTbl.TableName = toTable
+       hiveTbl.DbName = toDB
+
+       if err := c.client.AlterTable(ctx, fromDB, fromTable, hiveTbl); err != 
nil {
+               return nil, fmt.Errorf("failed to rename table %s.%s to %s.%s: 
%w", fromDB, fromTable, toDB, toTable, err)
+       }
+
+       return c.LoadTable(ctx, to)
+}
+
+func (c *Catalog) CommitTable(ctx context.Context, identifier 
table.Identifier, requirements []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
+       database, tableName, err := identifierToTableName(identifier)
+       if err != nil {
+               return nil, "", err
+       }
+
+       lock, err := acquireLock(ctx, c.client, database, tableName, c.opts)
+       if err != nil {
+               return nil, "", fmt.Errorf("failed to acquire lock for %s.%s: 
%w", database, tableName, err)
+       }
+       defer func() {
+               _ = lock.Release(ctx)
+       }()
+
+       currentHiveTbl, err := c.client.GetTable(ctx, database, tableName)
+       if err != nil && !isNoSuchObjectError(err) {
+               return nil, "", err
+       }
+
+       var current *table.Table
+       if currentHiveTbl != nil && isIcebergTable(currentHiveTbl) {
+               metadataLoc, err := getMetadataLocation(currentHiveTbl)
+               if err != nil {
+                       return nil, "", err
+               }
+               current, err = table.NewFromLocation(ctx, identifier, 
metadataLoc, io.LoadFSFunc(c.opts.props, metadataLoc), c)
+               if err != nil {
+                       return nil, "", err
+               }
+       }
+
+       staged, err := internal.UpdateAndStageTable(ctx, current, identifier, 
requirements, updates, c)
+       if err != nil {
+               return nil, "", err
+       }
+
+       if current != nil && staged.Metadata().Equals(current.Metadata()) {
+               return current.Metadata(), current.MetadataLocation(), nil
+       }
+
+       if err := internal.WriteMetadata(ctx, staged.Metadata(), 
staged.MetadataLocation(), staged.Properties()); err != nil {
+               return nil, "", err
+       }
+
+       if current != nil {
+               updatedHiveTbl := updateHiveTableForCommit(currentHiveTbl, 
staged.MetadataLocation())
+
+               if err := c.client.AlterTable(ctx, database, tableName, 
updatedHiveTbl); err != nil {
+                       return nil, "", fmt.Errorf("failed to commit table 
%s.%s: %w", database, tableName, err)
+               }
+       } else {
+               hiveTbl := constructHiveTable(database, tableName, 
staged.Location(), staged.MetadataLocation(), 
staged.Metadata().CurrentSchema(), staged.Properties())
+               if err := c.client.CreateTable(ctx, hiveTbl); err != nil {
+                       return nil, "", fmt.Errorf("failed to create table 
%s.%s: %w", database, tableName, err)
+               }
+       }
+
+       return staged.Metadata(), staged.MetadataLocation(), nil
+}
+
+// CheckTableExists checks if a table exists in the catalog.
+func (c *Catalog) CheckTableExists(ctx context.Context, identifier 
table.Identifier) (bool, error) {
+       database, tableName, err := identifierToTableName(identifier)
+       if err != nil {
+               return false, err
+       }
+
+       hiveTbl, err := c.client.GetTable(ctx, database, tableName)
+       if err != nil {
+               if isNoSuchObjectError(err) {
+                       return false, nil
+               }
+
+               return false, err
+       }
+
+       return isIcebergTable(hiveTbl), nil
+}
+
+func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) 
([]table.Identifier, error) {
+       if len(parent) > 0 {
+               return nil, errors.New("hierarchical namespace is not 
supported")
+       }
+
+       databases, err := c.client.GetAllDatabases(ctx)
+       if err != nil {
+               return nil, fmt.Errorf("failed to list namespaces: %w", err)
+       }
+
+       namespaces := make([]table.Identifier, len(databases))
+       for i, db := range databases {
+               namespaces[i] = DatabaseIdentifier(db)
+       }
+
+       return namespaces, nil
+}
+
+// CreateNamespace creates a new namespace in the catalog.
+func (c *Catalog) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
+       database, err := identifierToDatabase(namespace)
+       if err != nil {
+               return err
+       }
+
+       db := &hive_metastore.Database{
+               Name:       database,
+               Parameters: make(map[string]string),
+       }
+
+       for k, v := range props {
+               switch k {
+               case locationKey, "Location":
+                       db.LocationUri = v
+               case commentKey, descriptionKey, "Description":
+                       db.Description = v
+               default:
+                       db.Parameters[k] = v
+               }
+       }
+
+       if err := c.client.CreateDatabase(ctx, db); err != nil {
+               if isAlreadyExistsError(err) {
+                       return fmt.Errorf("%w: %s", 
catalog.ErrNamespaceAlreadyExists, database)
+               }
+
+               return fmt.Errorf("failed to create namespace %s: %w", 
database, err)
+       }
+
+       return nil
+}
+
+// DropNamespace drops a namespace from the catalog.
+func (c *Catalog) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
+       database, err := identifierToDatabase(namespace)
+       if err != nil {
+               return err
+       }
+
+       _, err = c.client.GetDatabase(ctx, database)
+       if err != nil {
+               if isNoSuchObjectError(err) {
+                       return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, 
database)
+               }
+
+               return err
+       }
+
+       if err := c.client.DropDatabase(ctx, database, false, false); err != 
nil {
+               if isInvalidOperationError(err) {
+                       return fmt.Errorf("%w: %s", 
catalog.ErrNamespaceNotEmpty, database)
+               }
+
+               return fmt.Errorf("failed to drop namespace %s: %w", database, 
err)
+       }
+
+       return nil
+}
+
+// CheckNamespaceExists checks if a namespace exists in the catalog.
+func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace 
table.Identifier) (bool, error) {
+       database, err := identifierToDatabase(namespace)
+       if err != nil {
+               return false, err
+       }
+
+       _, err = c.client.GetDatabase(ctx, database)
+       if err != nil {
+               if isNoSuchObjectError(err) {
+                       return false, nil
+               }
+
+               return false, err
+       }
+
+       return true, nil
+}
+
+// LoadNamespaceProperties loads the properties for a namespace.
+func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace 
table.Identifier) (iceberg.Properties, error) {
+       database, err := identifierToDatabase(namespace)
+       if err != nil {
+               return nil, err
+       }
+
+       db, err := c.client.GetDatabase(ctx, database)
+       if err != nil {
+               if isNoSuchObjectError(err) {
+                       return nil, fmt.Errorf("%w: %s", 
catalog.ErrNoSuchNamespace, database)
+               }
+
+               return nil, fmt.Errorf("failed to get namespace %s: %w", 
database, err)
+       }
+
+       props := make(iceberg.Properties)
+       if db.Parameters != nil {
+               maps.Copy(props, db.Parameters)
+       }
+       if db.LocationUri != "" {
+               props[locationKey] = db.LocationUri
+       }
+       if db.Description != "" {
+               props[commentKey] = db.Description
+       }
+
+       return props, nil
+}
+
+//go:linkname getUpdatedPropsAndUpdateSummary 
github.com/apache/iceberg-go/catalog.getUpdatedPropsAndUpdateSummary
+func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals 
[]string, updates iceberg.Properties) (iceberg.Properties, 
catalog.PropertiesUpdateSummary, error)
+
+// UpdateNamespaceProperties updates the properties for a namespace.
+func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace 
table.Identifier,
+       removals []string, updates iceberg.Properties,
+) (catalog.PropertiesUpdateSummary, error) {
+       currentProps, err := c.LoadNamespaceProperties(ctx, namespace)
+       if err != nil {
+               return catalog.PropertiesUpdateSummary{}, err
+       }
+
+       updatedProperties, propertiesUpdateSummary, err := 
getUpdatedPropsAndUpdateSummary(currentProps, removals, updates)
+       if err != nil {
+               return catalog.PropertiesUpdateSummary{}, err
+       }
+
+       database, _ := identifierToDatabase(namespace)
+
+       db := &hive_metastore.Database{
+               Name:       database,
+               Parameters: make(map[string]string),
+       }
+
+       for k, v := range updatedProperties {
+               switch k {
+               case locationKey, "Location":
+                       db.LocationUri = v
+               case commentKey, descriptionKey, "Description":
+                       db.Description = v
+               default:
+                       db.Parameters[k] = v
+               }
+       }
+
+       if err := c.client.AlterDatabase(ctx, database, db); err != nil {
+               return catalog.PropertiesUpdateSummary{}, fmt.Errorf("failed to 
update namespace properties %s: %w", database, err)
+       }
+
+       return propertiesUpdateSummary, nil
+}
+
+// getIcebergTable retrieves a table and validates it's an Iceberg table.
+func (c *Catalog) getIcebergTable(ctx context.Context, database, tableName 
string) (*hive_metastore.Table, error) {
+       hiveTbl, err := c.client.GetTable(ctx, database, tableName)
+       if err != nil {
+               if isNoSuchObjectError(err) {
+                       return nil, fmt.Errorf("%w: %s.%s", 
catalog.ErrNoSuchTable, database, tableName)
+               }
+
+               return nil, fmt.Errorf("failed to get table %s.%s: %w", 
database, tableName, err)
+       }
+
+       if !isIcebergTable(hiveTbl) {
+               return nil, fmt.Errorf("table %s.%s is not an Iceberg table", 
database, tableName)
+       }
+
+       return hiveTbl, nil
+}
+
+func identifierToTableName(identifier table.Identifier) (string, string, 
error) {
+       if len(identifier) != 2 {
+               return "", "", fmt.Errorf("invalid identifier, expected 
[database, table]: %v", identifier)
+       }
+
+       return identifier[0], identifier[1], nil
+}
+
+func identifierToDatabase(identifier table.Identifier) (string, error) {
+       if len(identifier) != 1 {
+               return "", fmt.Errorf("invalid identifier, expected [database]: 
%v", identifier)
+       }
+
+       return identifier[0], nil
+}
+
+// TableIdentifier returns a table identifier for a Hive table.
+func TableIdentifier(database, tableName string) table.Identifier {
+       return []string{database, tableName}
+}
+
+// DatabaseIdentifier returns a database identifier for a Hive database.
+func DatabaseIdentifier(database string) table.Identifier {
+       return []string{database}
+}
+
+func isNoSuchObjectError(err error) bool {
+       if err == nil {
+               return false
+       }
+
+       errStr := err.Error()
+
+       return strings.Contains(errStr, "NoSuchObjectException") ||
+               strings.Contains(errStr, "not found") ||
+               strings.Contains(errStr, "does not exist")
+}
+
+func isAlreadyExistsError(err error) bool {
+       if err == nil {
+               return false
+       }
+
+       errStr := err.Error()
+
+       return strings.Contains(errStr, "AlreadyExistsException") ||
+               strings.Contains(errStr, "already exists")
+}
+
+func isInvalidOperationError(err error) bool {
+       if err == nil {
+               return false
+       }
+
+       errStr := err.Error()
+
+       return strings.Contains(errStr, "InvalidOperationException") ||
+               strings.Contains(errStr, "is not empty")
+}
diff --git a/catalog/hive/hive_integration_test.go 
b/catalog/hive/hive_integration_test.go
new file mode 100644
index 00000000..cc0e6f1f
--- /dev/null
+++ b/catalog/hive/hive_integration_test.go
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build integration
+
+package hive
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "testing"
+       "time"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/stretchr/testify/require"
+)
+
+// Integration tests for the Hive Metastore catalog.
+// These tests require a running Hive Metastore instance.
+//
+// To run these tests:
+// 1. Start the Hive Metastore using Docker:
+//    cd internal/recipe && docker-compose up -d hive-metastore
+//
+// 2. Set the required environment variables:
+//    export TEST_HIVE_URI=thrift://localhost:9083
+//    export TEST_HIVE_DATABASE=test_db
+//    export TEST_TABLE_LOCATION=/tmp/iceberg/warehouse
+//
+// 3. Run the tests:
+//    go test -tags=integration -v ./catalog/hive/...
+
+func getTestHiveURI() string {
+       uri := os.Getenv("TEST_HIVE_URI")
+       if uri == "" {
+               return "thrift://localhost:9083"
+       }
+       return uri
+}
+
+func getTestDatabase() string {
+       db := os.Getenv("TEST_HIVE_DATABASE")
+       if db == "" {
+               return "test_iceberg_db"
+       }
+       return db
+}
+
+func getTestTableLocation() string {
+       loc := os.Getenv("TEST_TABLE_LOCATION")
+       if loc == "" {
+               return "/tmp/iceberg/warehouse"
+       }
+       return loc
+}
+
+func createTestCatalog(t *testing.T) *Catalog {
+       t.Helper()
+
+       props := iceberg.Properties{
+               URI:       getTestHiveURI(),
+               Warehouse: getTestTableLocation(),
+       }
+
+       cat, err := NewCatalog(props)
+       require.NoError(t, err)
+
+       return cat
+}
+
+func TestHiveIntegrationListNamespaces(t *testing.T) {
+       assert := require.New(t)
+
+       cat := createTestCatalog(t)
+       defer cat.Close()
+
+       namespaces, err := cat.ListNamespaces(context.TODO(), nil)
+       assert.NoError(err)
+       assert.NotNil(namespaces)
+
+       t.Logf("Found %d namespaces", len(namespaces))
+       for _, ns := range namespaces {
+               t.Logf("  - %v", ns)
+       }
+}
+
+func TestHiveIntegrationCreateAndDropNamespace(t *testing.T) {
+       assert := require.New(t)
+
+       cat := createTestCatalog(t)
+       defer cat.Close()
+
+       dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+
+       // Create namespace
+       props := iceberg.Properties{
+               "comment":  "Test database for integration tests",
+               "location": getTestTableLocation() + "/" + dbName,
+       }
+
+       err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), 
props)
+       assert.NoError(err)
+
+       // Check it exists
+       exists, err := cat.CheckNamespaceExists(context.TODO(), 
DatabaseIdentifier(dbName))
+       assert.NoError(err)
+       assert.True(exists)
+
+       // Load properties
+       loadedProps, err := cat.LoadNamespaceProperties(context.TODO(), 
DatabaseIdentifier(dbName))
+       assert.NoError(err)
+       assert.Equal("Test database for integration tests", 
loadedProps["comment"])
+
+       // Drop namespace
+       err = cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+       assert.NoError(err)
+
+       // Verify it's gone
+       exists, err = cat.CheckNamespaceExists(context.TODO(), 
DatabaseIdentifier(dbName))
+       assert.NoError(err)
+       assert.False(exists)
+}
+
+func TestHiveIntegrationUpdateNamespaceProperties(t *testing.T) {
+       assert := require.New(t)
+
+       cat := createTestCatalog(t)
+       defer cat.Close()
+
+       dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+
+       // Create namespace with initial properties
+       initialProps := iceberg.Properties{
+               "location": fmt.Sprintf("/tmp/iceberg-warehouse/%s", dbName),
+               "key1":     "value1",
+               "key2":     "value2",
+       }
+
+       err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), 
initialProps)
+       assert.NoError(err)
+       defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+       // Update properties
+       updates := iceberg.Properties{
+               "key2": "updated_value2",
+               "key3": "value3",
+       }
+       removals := []string{"key1"}
+
+       summary, err := cat.UpdateNamespaceProperties(context.TODO(), 
DatabaseIdentifier(dbName), removals, updates)
+       assert.NoError(err)
+       assert.Contains(summary.Removed, "key1")
+       assert.Contains(summary.Updated, "key2")
+       assert.Contains(summary.Updated, "key3")
+
+       // Verify updates
+       props, err := cat.LoadNamespaceProperties(context.TODO(), 
DatabaseIdentifier(dbName))
+       assert.NoError(err)
+       assert.Equal("updated_value2", props["key2"])
+       assert.Equal("value3", props["key3"])
+       _, exists := props["key1"]
+       assert.False(exists)
+}
+
+func TestHiveIntegrationCreateAndListTables(t *testing.T) {
+       assert := require.New(t)
+
+       cat := createTestCatalog(t)
+       defer cat.Close()
+
+       dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+       tableName := "test_table"
+
+       // Create namespace
+       err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), 
iceberg.Properties{
+               "location": getTestTableLocation() + "/" + dbName,
+       })
+       assert.NoError(err)
+       defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+       // Create table
+       schema := iceberg.NewSchemaWithIdentifiers(0, []int{1},
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String},
+       )
+
+       tableLocation := getTestTableLocation() + "/" + dbName + "/" + tableName
+       tbl, err := cat.CreateTable(context.TODO(), TableIdentifier(dbName, 
tableName), schema,
+               catalog.WithLocation(tableLocation),
+       )
+       assert.NoError(err)
+       assert.NotNil(tbl)
+       defer cat.DropTable(context.TODO(), TableIdentifier(dbName, tableName))
+
+       // Verify table exists
+       exists, err := cat.CheckTableExists(context.TODO(), 
TableIdentifier(dbName, tableName))
+       assert.NoError(err)
+       assert.True(exists)
+
+       // List tables
+       tables := make([][]string, 0)
+       for tblIdent, err := range cat.ListTables(context.TODO(), 
DatabaseIdentifier(dbName)) {
+               assert.NoError(err)
+               tables = append(tables, tblIdent)
+       }
+       assert.Len(tables, 1)
+       assert.Equal([]string{dbName, tableName}, tables[0])
+
+       // Load table
+       loadedTable, err := cat.LoadTable(context.TODO(), 
TableIdentifier(dbName, tableName))
+       assert.NoError(err)
+       assert.NotNil(loadedTable)
+       assert.True(schema.Equals(loadedTable.Schema()))
+}
+
+func TestHiveIntegrationRenameTable(t *testing.T) {
+       assert := require.New(t)
+
+       cat := createTestCatalog(t)
+       defer cat.Close()
+
+       dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+       oldTableName := "old_table"
+       newTableName := "new_table"
+
+       // Create namespace
+       err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), 
iceberg.Properties{
+               "location": getTestTableLocation() + "/" + dbName,
+       })
+       assert.NoError(err)
+       defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+       // Create table
+       schema := iceberg.NewSchemaWithIdentifiers(0, []int{1},
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+
+       tableLocation := getTestTableLocation() + "/" + dbName + "/" + 
oldTableName
+       _, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName, 
oldTableName), schema,
+               catalog.WithLocation(tableLocation),
+       )
+       assert.NoError(err)
+
+       // Rename table
+       renamedTable, err := cat.RenameTable(context.TODO(),
+               TableIdentifier(dbName, oldTableName),
+               TableIdentifier(dbName, newTableName),
+       )
+       assert.NoError(err)
+       assert.NotNil(renamedTable)
+       defer cat.DropTable(context.TODO(), TableIdentifier(dbName, 
newTableName))
+
+       // Verify old table doesn't exist
+       exists, err := cat.CheckTableExists(context.TODO(), 
TableIdentifier(dbName, oldTableName))
+       assert.NoError(err)
+       assert.False(exists)
+
+       // Verify new table exists
+       exists, err = cat.CheckTableExists(context.TODO(), 
TableIdentifier(dbName, newTableName))
+       assert.NoError(err)
+       assert.True(exists)
+}
+
+func TestHiveIntegrationDropTable(t *testing.T) {
+       assert := require.New(t)
+
+       cat := createTestCatalog(t)
+       defer cat.Close()
+
+       dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+       tableName := "table_to_drop"
+
+       // Create namespace
+       err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), 
iceberg.Properties{
+               "location": getTestTableLocation() + "/" + dbName,
+       })
+       assert.NoError(err)
+       defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+       // Create table
+       schema := iceberg.NewSchemaWithIdentifiers(0, []int{1},
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+
+       tableLocation := getTestTableLocation() + "/" + dbName + "/" + tableName
+       _, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName, 
tableName), schema,
+               catalog.WithLocation(tableLocation),
+       )
+       assert.NoError(err)
+
+       // Verify table exists
+       exists, err := cat.CheckTableExists(context.TODO(), 
TableIdentifier(dbName, tableName))
+       assert.NoError(err)
+       assert.True(exists)
+
+       // Drop table
+       err = cat.DropTable(context.TODO(), TableIdentifier(dbName, tableName))
+       assert.NoError(err)
+
+       // Verify table is gone
+       exists, err = cat.CheckTableExists(context.TODO(), 
TableIdentifier(dbName, tableName))
+       assert.NoError(err)
+       assert.False(exists)
+}
diff --git a/catalog/hive/hive_test.go b/catalog/hive/hive_test.go
new file mode 100644
index 00000000..4f37307a
--- /dev/null
+++ b/catalog/hive/hive_test.go
@@ -0,0 +1,758 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/table"
+       "github.com/beltran/gohive/hive_metastore"
+       "github.com/stretchr/testify/mock"
+       "github.com/stretchr/testify/require"
+)
+
+type mockHiveClient struct {
+       mock.Mock
+}
+
+func (m *mockHiveClient) GetDatabase(ctx context.Context, name string) 
(*hive_metastore.Database, error) {
+       args := m.Called(ctx, name)
+       if args.Get(0) == nil {
+               return nil, args.Error(1)
+       }
+
+       return args.Get(0).(*hive_metastore.Database), args.Error(1)
+}
+
+func (m *mockHiveClient) GetAllDatabases(ctx context.Context) ([]string, 
error) {
+       args := m.Called(ctx)
+
+       return args.Get(0).([]string), args.Error(1)
+}
+
+func (m *mockHiveClient) CreateDatabase(ctx context.Context, database 
*hive_metastore.Database) error {
+       args := m.Called(ctx, database)
+
+       return args.Error(0)
+}
+
+func (m *mockHiveClient) DropDatabase(ctx context.Context, name string, 
deleteData, cascade bool) error {
+       args := m.Called(ctx, name, deleteData, cascade)
+
+       return args.Error(0)
+}
+
+func (m *mockHiveClient) AlterDatabase(ctx context.Context, name string, 
database *hive_metastore.Database) error {
+       args := m.Called(ctx, name, database)
+
+       return args.Error(0)
+}
+
+func (m *mockHiveClient) GetTable(ctx context.Context, dbName, tableName 
string) (*hive_metastore.Table, error) {
+       args := m.Called(ctx, dbName, tableName)
+       if args.Get(0) == nil {
+               return nil, args.Error(1)
+       }
+
+       return args.Get(0).(*hive_metastore.Table), args.Error(1)
+}
+
+func (m *mockHiveClient) GetTables(ctx context.Context, dbName, pattern 
string) ([]string, error) {
+       args := m.Called(ctx, dbName, pattern)
+
+       return args.Get(0).([]string), args.Error(1)
+}
+
+func (m *mockHiveClient) CreateTable(ctx context.Context, tbl 
*hive_metastore.Table) error {
+       args := m.Called(ctx, tbl)
+
+       return args.Error(0)
+}
+
+func (m *mockHiveClient) DropTable(ctx context.Context, dbName, tableName 
string, deleteData bool) error {
+       args := m.Called(ctx, dbName, tableName, deleteData)
+
+       return args.Error(0)
+}
+
+func (m *mockHiveClient) AlterTable(ctx context.Context, dbName, tableName 
string, newTable *hive_metastore.Table) error {
+       args := m.Called(ctx, dbName, tableName, newTable)
+
+       return args.Error(0)
+}
+
+func (m *mockHiveClient) Lock(ctx context.Context, request 
*hive_metastore.LockRequest) (*hive_metastore.LockResponse, error) {
+       args := m.Called(ctx, request)
+       if args.Get(0) == nil {
+               return nil, args.Error(1)
+       }
+
+       return args.Get(0).(*hive_metastore.LockResponse), args.Error(1)
+}
+
+func (m *mockHiveClient) CheckLock(ctx context.Context, lockId int64) 
(*hive_metastore.LockResponse, error) {
+       args := m.Called(ctx, lockId)
+       if args.Get(0) == nil {
+               return nil, args.Error(1)
+       }
+
+       return args.Get(0).(*hive_metastore.LockResponse), args.Error(1)
+}
+
+func (m *mockHiveClient) Unlock(ctx context.Context, lockId int64) error {
+       args := m.Called(ctx, lockId)
+
+       return args.Error(0)
+}
+
+func (m *mockHiveClient) Close() error {
+       args := m.Called()
+
+       return args.Error(0)
+}
+
+// Test data
+
+var testIcebergHiveTable1 = &hive_metastore.Table{
+       TableName: "test_table",
+       DbName:    "test_database",
+       TableType: TableTypeExternalTable,
+       Parameters: map[string]string{
+               TableTypeKey:        TableTypeIceberg,
+               MetadataLocationKey: 
"s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
+       },
+       Sd: &hive_metastore.StorageDescriptor{
+               Location: "s3://test-bucket/test_table",
+       },
+}
+
+var testIcebergHiveTable2 = &hive_metastore.Table{
+       TableName: "test_table2",
+       DbName:    "test_database",
+       TableType: TableTypeExternalTable,
+       Parameters: map[string]string{
+               TableTypeKey:        TableTypeIceberg,
+               MetadataLocationKey: 
"s3://test-bucket/test_table2/metadata/abc456-456.metadata.json",
+       },
+       Sd: &hive_metastore.StorageDescriptor{
+               Location: "s3://test-bucket/test_table2",
+       },
+}
+
+var testNonIcebergHiveTable = &hive_metastore.Table{
+       TableName: "other_table",
+       DbName:    "test_database",
+       TableType: TableTypeExternalTable,
+       Parameters: map[string]string{
+               "some_param": "some_value",
+       },
+}
+
+var testSchema = iceberg.NewSchemaWithIdentifiers(0, []int{},
+       iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String},
+       iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+       iceberg.NestedField{ID: 3, Name: "baz", Type: 
iceberg.PrimitiveTypes.Bool})
+
+// Error helpers for mocking
+var (
+       errNoSuchObject     = errors.New("NoSuchObjectException: object not 
found")
+       errAlreadyExists    = errors.New("AlreadyExistsException: object 
already exists")
+       errInvalidOperation = errors.New("InvalidOperationException: Database 
is not empty")
+)
+
+// Tests
+
+func TestHiveListTables(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetTables", mock.Anything, "test_database", "*").
+               Return([]string{"test_table", "test_table2", "other_table"}, 
nil).Once()
+
+       // Mock individual GetTable calls for each table
+       mockClient.On("GetTable", mock.Anything, "test_database", "test_table").
+               Return(testIcebergHiveTable1, nil).Once()
+       mockClient.On("GetTable", mock.Anything, "test_database", 
"test_table2").
+               Return(testIcebergHiveTable2, nil).Once()
+       mockClient.On("GetTable", mock.Anything, "test_database", 
"other_table").
+               Return(testNonIcebergHiveTable, nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       var lastErr error
+       tbls := make([]table.Identifier, 0)
+       iter := hiveCatalog.ListTables(context.TODO(), 
DatabaseIdentifier("test_database"))
+
+       for tbl, err := range iter {
+               if err != nil {
+                       lastErr = err
+
+                       break
+               }
+               tbls = append(tbls, tbl)
+       }
+
+       assert.NoError(lastErr)
+       assert.Len(tbls, 2) // Only Iceberg tables
+       assert.Contains(tbls, table.Identifier{"test_database", "test_table"})
+       assert.Contains(tbls, table.Identifier{"test_database", "test_table2"})
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveListTablesEmpty(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetTables", mock.Anything, "empty_database", "*").
+               Return([]string{}, nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       var lastErr error
+       tbls := make([]table.Identifier, 0)
+       iter := hiveCatalog.ListTables(context.TODO(), 
DatabaseIdentifier("empty_database"))
+
+       for tbl, err := range iter {
+               if err != nil {
+                       lastErr = err
+
+                       break
+               }
+               tbls = append(tbls, tbl)
+       }
+
+       assert.NoError(lastErr)
+       assert.Len(tbls, 0)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveListNamespaces(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetAllDatabases", mock.Anything).
+               Return([]string{"database1", "database2", "database3"}, 
nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       namespaces, err := hiveCatalog.ListNamespaces(context.TODO(), nil)
+       assert.NoError(err)
+       assert.Len(namespaces, 3)
+       assert.Equal([]string{"database1"}, namespaces[0])
+       assert.Equal([]string{"database2"}, namespaces[1])
+       assert.Equal([]string{"database3"}, namespaces[2])
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveListNamespacesHierarchicalError(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       _, err := hiveCatalog.ListNamespaces(context.TODO(), []string{"parent"})
+       assert.Error(err)
+       assert.Contains(err.Error(), "hierarchical namespace is not supported")
+}
+
+func TestHiveCreateNamespace(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("CreateDatabase", mock.Anything, mock.MatchedBy(func(db 
*hive_metastore.Database) bool {
+               return db.Name == "new_database" &&
+                       db.Description == "Test Description" &&
+                       db.LocationUri == "s3://test-location"
+       })).Return(nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       props := map[string]string{
+               "comment":  "Test Description",
+               "location": "s3://test-location",
+       }
+
+       err := hiveCatalog.CreateNamespace(context.TODO(), 
DatabaseIdentifier("new_database"), props)
+       assert.NoError(err)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveCreateNamespaceAlreadyExists(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("CreateDatabase", mock.Anything, mock.Anything).
+               Return(errAlreadyExists).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       err := hiveCatalog.CreateNamespace(context.TODO(), 
DatabaseIdentifier("existing_database"), nil)
+       assert.Error(err)
+       assert.True(errors.Is(err, catalog.ErrNamespaceAlreadyExists))
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropNamespace(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetDatabase", mock.Anything, "test_namespace").
+               Return(&hive_metastore.Database{Name: "test_namespace"}, 
nil).Once()
+
+       mockClient.On("DropDatabase", mock.Anything, "test_namespace", false, 
false).
+               Return(nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       err := hiveCatalog.DropNamespace(context.TODO(), 
DatabaseIdentifier("test_namespace"))
+       assert.NoError(err)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropNamespaceNotExists(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetDatabase", mock.Anything, "nonexistent").
+               Return(nil, errNoSuchObject).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       err := hiveCatalog.DropNamespace(context.TODO(), 
DatabaseIdentifier("nonexistent"))
+       assert.Error(err)
+       assert.True(errors.Is(err, catalog.ErrNoSuchNamespace))
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropNamespaceNotEmpty(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetDatabase", mock.Anything, "nonempty_db").
+               Return(&hive_metastore.Database{Name: "nonempty_db"}, 
nil).Once()
+
+       mockClient.On("DropDatabase", mock.Anything, "nonempty_db", false, 
false).
+               Return(errInvalidOperation).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       err := hiveCatalog.DropNamespace(context.TODO(), 
DatabaseIdentifier("nonempty_db"))
+       assert.Error(err)
+       assert.True(errors.Is(err, catalog.ErrNamespaceNotEmpty))
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckNamespaceExists(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetDatabase", mock.Anything, "existing_db").
+               Return(&hive_metastore.Database{Name: "existing_db"}, 
nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       exists, err := hiveCatalog.CheckNamespaceExists(context.TODO(), 
DatabaseIdentifier("existing_db"))
+       assert.NoError(err)
+       assert.True(exists)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckNamespaceNotExists(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetDatabase", mock.Anything, "nonexistent_db").
+               Return(nil, errNoSuchObject).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       exists, err := hiveCatalog.CheckNamespaceExists(context.TODO(), 
DatabaseIdentifier("nonexistent_db"))
+       assert.NoError(err)
+       assert.False(exists)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveLoadNamespaceProperties(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetDatabase", mock.Anything, "test_db").
+               Return(&hive_metastore.Database{
+                       Name:        "test_db",
+                       Description: "Test database",
+                       LocationUri: "s3://test-bucket/test_db",
+                       Parameters: map[string]string{
+                               "custom_param": "custom_value",
+                       },
+               }, nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       props, err := hiveCatalog.LoadNamespaceProperties(context.TODO(), 
DatabaseIdentifier("test_db"))
+       assert.NoError(err)
+       assert.Equal("s3://test-bucket/test_db", props["location"])
+       assert.Equal("Test database", props["comment"])
+       assert.Equal("custom_value", props["custom_param"])
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckTableExists(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetTable", mock.Anything, "test_database", "test_table").
+               Return(testIcebergHiveTable1, nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       exists, err := hiveCatalog.CheckTableExists(context.TODO(), 
TableIdentifier("test_database", "test_table"))
+       assert.NoError(err)
+       assert.True(exists)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckTableNotExists(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetTable", mock.Anything, "test_database", 
"nonexistent").
+               Return(nil, errNoSuchObject).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       exists, err := hiveCatalog.CheckTableExists(context.TODO(), 
TableIdentifier("test_database", "nonexistent"))
+       assert.NoError(err)
+       assert.False(exists)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckTableExistsNonIceberg(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetTable", mock.Anything, "test_database", 
"other_table").
+               Return(testNonIcebergHiveTable, nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       exists, err := hiveCatalog.CheckTableExists(context.TODO(), 
TableIdentifier("test_database", "other_table"))
+       assert.NoError(err)
+       assert.False(exists) // Non-Iceberg table should return false
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropTable(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetTable", mock.Anything, "test_database", "test_table").
+               Return(testIcebergHiveTable1, nil).Once()
+
+       mockClient.On("DropTable", mock.Anything, "test_database", 
"test_table", false).
+               Return(nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       err := hiveCatalog.DropTable(context.TODO(), 
TableIdentifier("test_database", "test_table"))
+       assert.NoError(err)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropTableNotExists(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetTable", mock.Anything, "test_database", 
"nonexistent").
+               Return(nil, errNoSuchObject).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       err := hiveCatalog.DropTable(context.TODO(), 
TableIdentifier("test_database", "nonexistent"))
+       assert.Error(err)
+       assert.True(errors.Is(err, catalog.ErrNoSuchTable))
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropTableNonIceberg(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+
+       mockClient.On("GetTable", mock.Anything, "test_database", 
"other_table").
+               Return(testNonIcebergHiveTable, nil).Once()
+
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       err := hiveCatalog.DropTable(context.TODO(), 
TableIdentifier("test_database", "other_table"))
+       assert.Error(err)
+       assert.Contains(err.Error(), "is not an Iceberg table")
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestHiveCatalogType(t *testing.T) {
+       assert := require.New(t)
+
+       mockClient := &mockHiveClient{}
+       hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+       assert.Equal(catalog.Hive, hiveCatalog.CatalogType())
+}
+
+func TestIsIcebergTable(t *testing.T) {
+       tests := []struct {
+               name     string
+               table    *hive_metastore.Table
+               expected bool
+       }{
+               {
+                       name:     "iceberg table uppercase",
+                       table:    testIcebergHiveTable1,
+                       expected: true,
+               },
+               {
+                       name: "iceberg table lowercase",
+                       table: &hive_metastore.Table{
+                               Parameters: map[string]string{TableTypeKey: 
"iceberg"},
+                       },
+                       expected: true,
+               },
+               {
+                       name: "iceberg table mixed case",
+                       table: &hive_metastore.Table{
+                               Parameters: map[string]string{TableTypeKey: 
"IcEbErG"},
+                       },
+                       expected: true,
+               },
+               {
+                       name:     "non-iceberg table",
+                       table:    testNonIcebergHiveTable,
+                       expected: false,
+               },
+               {
+                       name:     "nil table",
+                       table:    nil,
+                       expected: false,
+               },
+               {
+                       name:     "table without parameters",
+                       table:    &hive_metastore.Table{},
+                       expected: false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert := require.New(t)
+                       assert.Equal(tt.expected, isIcebergTable(tt.table))
+               })
+       }
+}
+
+func TestIcebergTypeToHiveType(t *testing.T) {
+       tests := []struct {
+               name         string
+               icebergType  iceberg.Type
+               expectedHive string
+       }{
+               {"boolean", iceberg.PrimitiveTypes.Bool, "boolean"},
+               {"int32", iceberg.PrimitiveTypes.Int32, "int"},
+               {"int64", iceberg.PrimitiveTypes.Int64, "bigint"},
+               {"float32", iceberg.PrimitiveTypes.Float32, "float"},
+               {"float64", iceberg.PrimitiveTypes.Float64, "double"},
+               {"date", iceberg.PrimitiveTypes.Date, "date"},
+               {"time", iceberg.PrimitiveTypes.Time, "string"},
+               {"timestamp", iceberg.PrimitiveTypes.Timestamp, "timestamp"},
+               {"timestamptz", iceberg.PrimitiveTypes.TimestampTz, 
"timestamp"},
+               {"string", iceberg.PrimitiveTypes.String, "string"},
+               {"uuid", iceberg.PrimitiveTypes.UUID, "string"},
+               {"binary", iceberg.PrimitiveTypes.Binary, "binary"},
+               {"decimal", iceberg.DecimalTypeOf(10, 2), "decimal(10,2)"},
+               {"fixed", iceberg.FixedTypeOf(16), "binary(16)"},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert := require.New(t)
+                       assert.Equal(tt.expectedHive, 
icebergTypeToHiveType(tt.icebergType))
+               })
+       }
+}
+
+func TestSchemaToHiveColumns(t *testing.T) {
+       assert := require.New(t)
+
+       columns := schemaToHiveColumns(testSchema)
+       assert.Len(columns, 3)
+
+       // Check first column
+       assert.Equal("foo", columns[0].Name)
+       assert.Equal("string", columns[0].Type)
+
+       // Check second column
+       assert.Equal("bar", columns[1].Name)
+       assert.Equal("int", columns[1].Type)
+
+       // Check third column
+       assert.Equal("baz", columns[2].Name)
+       assert.Equal("boolean", columns[2].Type)
+}
+
+func TestUpdateNamespaceProperties(t *testing.T) {
+       tests := []struct {
+               name        string
+               initial     map[string]string
+               updates     map[string]string
+               removals    []string
+               expected    catalog.PropertiesUpdateSummary
+               shouldError bool
+       }{
+               {
+                       name: "Overlapping removals and updates",
+                       initial: map[string]string{
+                               "key1": "value1",
+                               "key2": "value2",
+                       },
+                       updates: map[string]string{
+                               "key1": "new_value1",
+                       },
+                       removals:    []string{"key1"},
+                       shouldError: true,
+               },
+               {
+                       name: "Happy path with updates and removals",
+                       initial: map[string]string{
+                               "key1": "value1",
+                               "key2": "value2",
+                               "key4": "value4",
+                       },
+                       updates: map[string]string{
+                               "key2": "new_value2",
+                       },
+                       removals: []string{"key4"},
+                       expected: catalog.PropertiesUpdateSummary{
+                               Removed: []string{"key4"},
+                               Updated: []string{"key2"},
+                               Missing: []string{},
+                       },
+                       shouldError: false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert := require.New(t)
+
+                       mockClient := &mockHiveClient{}
+
+                       mockClient.On("GetDatabase", mock.Anything, 
"test_namespace").
+                               Return(&hive_metastore.Database{
+                                       Name:       "test_namespace",
+                                       Parameters: tt.initial,
+                               }, nil).Once()
+
+                       if !tt.shouldError {
+                               mockClient.On("AlterDatabase", mock.Anything, 
"test_namespace", mock.Anything).
+                                       Return(nil).Once()
+                       }
+
+                       hiveCatalog := NewCatalogWithClient(mockClient, 
iceberg.Properties{})
+
+                       summary, err := 
hiveCatalog.UpdateNamespaceProperties(context.TODO(), 
DatabaseIdentifier("test_namespace"), tt.removals, tt.updates)
+                       if tt.shouldError {
+                               assert.Error(err)
+                       } else {
+                               assert.NoError(err)
+                               assert.ElementsMatch(tt.expected.Removed, 
summary.Removed)
+                               assert.ElementsMatch(tt.expected.Updated, 
summary.Updated)
+                               assert.ElementsMatch(tt.expected.Missing, 
summary.Missing)
+                       }
+               })
+       }
+}
+
+func TestIdentifierValidation(t *testing.T) {
+       t.Run("valid table identifier", func(t *testing.T) {
+               assert := require.New(t)
+               db, tbl, err := identifierToTableName([]string{"database", 
"table"})
+               assert.NoError(err)
+               assert.Equal("database", db)
+               assert.Equal("table", tbl)
+       })
+
+       t.Run("invalid table identifier - too short", func(t *testing.T) {
+               assert := require.New(t)
+               _, _, err := identifierToTableName([]string{"database"})
+               assert.Error(err)
+       })
+
+       t.Run("invalid table identifier - too long", func(t *testing.T) {
+               assert := require.New(t)
+               _, _, err := identifierToTableName([]string{"a", "b", "c"})
+               assert.Error(err)
+       })
+
+       t.Run("valid database identifier", func(t *testing.T) {
+               assert := require.New(t)
+               db, err := identifierToDatabase([]string{"database"})
+               assert.NoError(err)
+               assert.Equal("database", db)
+       })
+
+       t.Run("invalid database identifier", func(t *testing.T) {
+               assert := require.New(t)
+               _, err := identifierToDatabase([]string{"a", "b"})
+               assert.Error(err)
+       })
+}
diff --git a/catalog/hive/lock.go b/catalog/hive/lock.go
new file mode 100644
index 00000000..51f56505
--- /dev/null
+++ b/catalog/hive/lock.go
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "math"
+       "time"
+
+       "github.com/beltran/gohive/hive_metastore"
+)
+
+// ErrLockAcquisitionFailed is returned when a lock cannot be acquired after 
all retries.
+var ErrLockAcquisitionFailed = errors.New("failed to acquire lock")
+
+type HiveLock struct {
+       client HiveClient
+       lockId int64
+}
+
+func acquireLock(ctx context.Context, client HiveClient, database, tableName 
string, opts *HiveOptions) (*HiveLock, error) {
+       lockReq := &hive_metastore.LockRequest{
+               Component: []*hive_metastore.LockComponent{{
+                       Type:      hive_metastore.LockType_EXCLUSIVE,
+                       Level:     hive_metastore.LockLevel_TABLE,
+                       Dbname:    database,
+                       Tablename: &tableName,
+               }},
+       }
+
+       lockResp, err := client.Lock(ctx, lockReq)
+       if err != nil {
+               return nil, fmt.Errorf("failed to request lock: %w", err)
+       }
+
+       if lockResp.State == hive_metastore.LockState_ACQUIRED {
+               return &HiveLock{
+                       client: client,
+                       lockId: lockResp.Lockid,
+               }, nil
+       }
+
+       // If not acquired immediately, wait and retry
+       for attempt := 0; attempt < opts.LockRetries; attempt++ {
+               // Wait before checking again
+               waitTime := calculateBackoff(attempt, opts.LockMinWaitTime, 
opts.LockMaxWaitTime)
+
+               select {
+               case <-ctx.Done():
+                       _ = client.Unlock(ctx, lockResp.Lockid)
+
+                       return nil, ctx.Err()
+               case <-time.After(waitTime):
+               }
+
+               // Check lock state
+               checkResp, err := client.CheckLock(ctx, lockResp.Lockid)
+               if err != nil {
+                       _ = client.Unlock(ctx, lockResp.Lockid)
+
+                       return nil, fmt.Errorf("failed to check lock status: 
%w", err)
+               }
+
+               switch checkResp.State {
+               case hive_metastore.LockState_ACQUIRED:
+                       return &HiveLock{
+                               client: client,
+                               lockId: lockResp.Lockid,
+                       }, nil
+               case hive_metastore.LockState_WAITING:
+                       // Continue waiting
+                       continue
+               case hive_metastore.LockState_ABORT:
+                       return nil, fmt.Errorf("%w: lock was aborted", 
ErrLockAcquisitionFailed)
+               case hive_metastore.LockState_NOT_ACQUIRED:
+                       return nil, fmt.Errorf("%w: lock not acquired", 
ErrLockAcquisitionFailed)
+               default:
+                       return nil, fmt.Errorf("%w: unexpected lock state: %v", 
ErrLockAcquisitionFailed, checkResp.State)
+               }
+       }
+
+       _ = client.Unlock(ctx, lockResp.Lockid)
+
+       return nil, fmt.Errorf("%w: exhausted %d retries for table %s.%s", 
ErrLockAcquisitionFailed, opts.LockRetries, database, tableName)
+}
+
+func calculateBackoff(attempt int, minWait, maxWait time.Duration) 
time.Duration {
+       wait := time.Duration(float64(minWait) * math.Pow(2, float64(attempt)))
+       if wait > maxWait {
+               wait = maxWait
+       }
+
+       return wait
+}
+
+func (l *HiveLock) Release(ctx context.Context) error {
+       return l.client.Unlock(ctx, l.lockId)
+}
+
+func (l *HiveLock) LockID() int64 {
+       if l == nil {
+               return 0
+       }
+
+       return l.lockId
+}
diff --git a/catalog/hive/lock_test.go b/catalog/hive/lock_test.go
new file mode 100644
index 00000000..f61a8079
--- /dev/null
+++ b/catalog/hive/lock_test.go
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+       "context"
+       "errors"
+       "testing"
+       "time"
+
+       "github.com/beltran/gohive/hive_metastore"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/mock"
+       "github.com/stretchr/testify/require"
+)
+
+func TestAcquireLockImmediateSuccess(t *testing.T) {
+       mockClient := new(mockHiveClient)
+       ctx := context.Background()
+       opts := NewHiveOptions()
+
+       mockClient.On("Lock", ctx, 
mock.AnythingOfType("*hive_metastore.LockRequest")).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 123,
+                       State:  hive_metastore.LockState_ACQUIRED,
+               }, nil)
+
+       lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+       require.NoError(t, err)
+       require.NotNil(t, lock)
+       assert.Equal(t, int64(123), lock.LockID())
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockWithRetry(t *testing.T) {
+       mockClient := new(mockHiveClient)
+       ctx := context.Background()
+       opts := NewHiveOptions()
+       opts.LockMinWaitTime = 1 * time.Millisecond // Fast retries for testing
+       opts.LockMaxWaitTime = 10 * time.Millisecond
+       opts.LockRetries = 3
+
+       // Lock request returns WAITING initially
+       mockClient.On("Lock", ctx, 
mock.AnythingOfType("*hive_metastore.LockRequest")).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 456,
+                       State:  hive_metastore.LockState_WAITING,
+               }, nil)
+
+       // First CheckLock returns WAITING
+       mockClient.On("CheckLock", ctx, int64(456)).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 456,
+                       State:  hive_metastore.LockState_WAITING,
+               }, nil).Once()
+
+       // Second CheckLock returns ACQUIRED
+       mockClient.On("CheckLock", ctx, int64(456)).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 456,
+                       State:  hive_metastore.LockState_ACQUIRED,
+               }, nil).Once()
+
+       lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+       require.NoError(t, err)
+       require.NotNil(t, lock)
+       assert.Equal(t, int64(456), lock.LockID())
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockExhaustsRetries(t *testing.T) {
+       mockClient := new(mockHiveClient)
+       ctx := context.Background()
+       opts := NewHiveOptions()
+       opts.LockMinWaitTime = 1 * time.Millisecond
+       opts.LockMaxWaitTime = 10 * time.Millisecond
+       opts.LockRetries = 2
+
+       // Lock request returns WAITING
+       mockClient.On("Lock", ctx, 
mock.AnythingOfType("*hive_metastore.LockRequest")).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 789,
+                       State:  hive_metastore.LockState_WAITING,
+               }, nil)
+
+       // All CheckLock calls return WAITING
+       mockClient.On("CheckLock", ctx, int64(789)).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 789,
+                       State:  hive_metastore.LockState_WAITING,
+               }, nil)
+
+       mockClient.On("Unlock", ctx, int64(789)).Return(nil)
+
+       lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+       require.Error(t, err)
+       require.Nil(t, lock)
+       assert.ErrorIs(t, err, ErrLockAcquisitionFailed)
+       assert.Contains(t, err.Error(), "exhausted 2 retries")
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockAborted(t *testing.T) {
+       mockClient := new(mockHiveClient)
+       ctx := context.Background()
+       opts := NewHiveOptions()
+       opts.LockMinWaitTime = 1 * time.Millisecond
+
+       // Lock request returns WAITING
+       mockClient.On("Lock", ctx, 
mock.AnythingOfType("*hive_metastore.LockRequest")).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 111,
+                       State:  hive_metastore.LockState_WAITING,
+               }, nil)
+
+       // CheckLock returns ABORT
+       mockClient.On("CheckLock", ctx, int64(111)).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 111,
+                       State:  hive_metastore.LockState_ABORT,
+               }, nil)
+
+       lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+       require.Error(t, err)
+       require.Nil(t, lock)
+       assert.ErrorIs(t, err, ErrLockAcquisitionFailed)
+       assert.Contains(t, err.Error(), "aborted")
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockRequestFails(t *testing.T) {
+       mockClient := new(mockHiveClient)
+       ctx := context.Background()
+       opts := NewHiveOptions()
+
+       // Lock request fails
+       mockClient.On("Lock", ctx, 
mock.AnythingOfType("*hive_metastore.LockRequest")).
+               Return(nil, errors.New("connection failed"))
+
+       lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+       require.Error(t, err)
+       require.Nil(t, lock)
+       assert.Contains(t, err.Error(), "failed to request lock")
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockCheckFails(t *testing.T) {
+       mockClient := new(mockHiveClient)
+       ctx := context.Background()
+       opts := NewHiveOptions()
+       opts.LockMinWaitTime = 1 * time.Millisecond
+
+       // Lock request returns WAITING
+       mockClient.On("Lock", ctx, 
mock.AnythingOfType("*hive_metastore.LockRequest")).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 222,
+                       State:  hive_metastore.LockState_WAITING,
+               }, nil)
+
+       // CheckLock fails
+       mockClient.On("CheckLock", ctx, int64(222)).
+               Return(nil, errors.New("check failed"))
+
+       // Lock should be released on error
+       mockClient.On("Unlock", ctx, int64(222)).Return(nil)
+
+       lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+       require.Error(t, err)
+       require.Nil(t, lock)
+       assert.Contains(t, err.Error(), "failed to check lock status")
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockContextCancelled(t *testing.T) {
+       mockClient := new(mockHiveClient)
+       ctx, cancel := context.WithCancel(context.Background())
+       opts := NewHiveOptions()
+       opts.LockMinWaitTime = 100 * time.Millisecond // Longer wait so we can 
cancel
+
+       // Lock request returns WAITING
+       mockClient.On("Lock", ctx, 
mock.AnythingOfType("*hive_metastore.LockRequest")).
+               Return(&hive_metastore.LockResponse{
+                       Lockid: 333,
+                       State:  hive_metastore.LockState_WAITING,
+               }, nil)
+
+       // Lock should be released when context is cancelled
+       mockClient.On("Unlock", ctx, int64(333)).Return(nil)
+
+       // Cancel context before the wait completes
+       go func() {
+               time.Sleep(10 * time.Millisecond)
+               cancel()
+       }()
+
+       lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+       require.Error(t, err)
+       require.Nil(t, lock)
+       assert.ErrorIs(t, err, context.Canceled)
+
+       mockClient.AssertExpectations(t)
+}
+
+func TestReleaseLock(t *testing.T) {
+       mockClient := new(mockHiveClient)
+       ctx := context.Background()
+
+       lock := &HiveLock{
+               client: mockClient,
+               lockId: 999,
+       }
+
+       mockClient.On("Unlock", ctx, int64(999)).Return(nil)
+
+       err := lock.Release(ctx)
+
+       require.NoError(t, err)
+       mockClient.AssertExpectations(t)
+}
+
+func TestCalculateBackoff(t *testing.T) {
+       minWait := 100 * time.Millisecond
+       maxWait := 1 * time.Second
+
+       tests := []struct {
+               attempt  int
+               expected time.Duration
+       }{
+               {0, 100 * time.Millisecond}, // 100ms * 2^0 = 100ms
+               {1, 200 * time.Millisecond}, // 100ms * 2^1 = 200ms
+               {2, 400 * time.Millisecond}, // 100ms * 2^2 = 400ms
+               {3, 800 * time.Millisecond}, // 100ms * 2^3 = 800ms
+               {4, 1 * time.Second},        // 100ms * 2^4 = 1.6s, capped at 1s
+               {10, 1 * time.Second},       // Capped at maxWait
+       }
+
+       for _, tt := range tests {
+               t.Run("", func(t *testing.T) {
+                       result := calculateBackoff(tt.attempt, minWait, maxWait)
+                       assert.Equal(t, tt.expected, result)
+               })
+       }
+}
+
+func TestLockConfigurationParsing(t *testing.T) {
+       props := map[string]string{
+               LockCheckMinWaitTime: "200ms",
+               LockCheckMaxWaitTime: "30s",
+               LockCheckRetries:     "5",
+       }
+
+       opts := NewHiveOptions()
+       opts.ApplyProperties(props)
+
+       assert.Equal(t, 200*time.Millisecond, opts.LockMinWaitTime)
+       assert.Equal(t, 30*time.Second, opts.LockMaxWaitTime)
+       assert.Equal(t, 5, opts.LockRetries)
+}
+
+func TestLockConfigurationDefaults(t *testing.T) {
+       opts := NewHiveOptions()
+
+       assert.Equal(t, DefaultLockCheckMinWaitTime, opts.LockMinWaitTime)
+       assert.Equal(t, DefaultLockCheckMaxWaitTime, opts.LockMaxWaitTime)
+       assert.Equal(t, DefaultLockCheckRetries, opts.LockRetries)
+}
diff --git a/catalog/hive/options.go b/catalog/hive/options.go
new file mode 100644
index 00000000..6681676a
--- /dev/null
+++ b/catalog/hive/options.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+       "strconv"
+       "time"
+
+       "github.com/apache/iceberg-go"
+)
+
+const (
+       // URI is the Thrift URI for the Hive Metastore (e.g., 
"thrift://localhost:9083")
+       URI = "uri"
+
+       // Warehouse is the default warehouse location for tables
+       Warehouse = "warehouse"
+
+       TableTypeKey                = "table_type"
+       TableTypeIceberg            = "ICEBERG"
+       TableTypeExternalTable      = "EXTERNAL_TABLE"
+       MetadataLocationKey         = "metadata_location"
+       PreviousMetadataLocationKey = "previous_metadata_location"
+       ExternalKey                 = "EXTERNAL"
+
+       // Lock configuration property keys
+       LockCheckMinWaitTime = "lock-check-min-wait-time"
+       LockCheckMaxWaitTime = "lock-check-max-wait-time"
+       LockCheckRetries     = "lock-check-retries"
+
+       // Default lock configuration values
+       DefaultLockCheckMinWaitTime = 100 * time.Millisecond // 100ms
+       DefaultLockCheckMaxWaitTime = 60 * time.Second       // 1 minute
+       DefaultLockCheckRetries     = 4
+)
+
+type HiveOptions struct {
+       URI       string
+       Warehouse string
+       props     iceberg.Properties
+
+       // Lock configuration for atomic commits
+       LockMinWaitTime time.Duration
+       LockMaxWaitTime time.Duration
+       LockRetries     int
+}
+
+func NewHiveOptions() *HiveOptions {
+       return &HiveOptions{
+               props:           iceberg.Properties{},
+               LockMinWaitTime: DefaultLockCheckMinWaitTime,
+               LockMaxWaitTime: DefaultLockCheckMaxWaitTime,
+               LockRetries:     DefaultLockCheckRetries,
+       }
+}
+
+func (o *HiveOptions) ApplyProperties(props iceberg.Properties) {
+       o.props = props
+
+       if uri, ok := props[URI]; ok {
+               o.URI = uri
+       }
+       if warehouse, ok := props[Warehouse]; ok {
+               o.Warehouse = warehouse
+       }
+
+       // Parse lock configuration
+       if val, ok := props[LockCheckMinWaitTime]; ok {
+               if d, err := time.ParseDuration(val); err == nil {
+                       o.LockMinWaitTime = d
+               }
+       }
+       if val, ok := props[LockCheckMaxWaitTime]; ok {
+               if d, err := time.ParseDuration(val); err == nil {
+                       o.LockMaxWaitTime = d
+               }
+       }
+       if val, ok := props[LockCheckRetries]; ok {
+               if i, err := strconv.Atoi(val); err == nil {
+                       o.LockRetries = i
+               }
+       }
+}
+
+type Option func(*HiveOptions)
+
+// WithURI sets the Thrift URI for the Hive Metastore.
+func WithURI(uri string) Option {
+       return func(o *HiveOptions) {
+               o.URI = uri
+       }
+}
+
+func WithWarehouse(warehouse string) Option {
+       return func(o *HiveOptions) {
+               o.Warehouse = warehouse
+       }
+}
+
+func WithProperties(props iceberg.Properties) Option {
+       return func(o *HiveOptions) {
+               o.props = props
+       }
+}
diff --git a/catalog/hive/schema.go b/catalog/hive/schema.go
new file mode 100644
index 00000000..9d519596
--- /dev/null
+++ b/catalog/hive/schema.go
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+       "errors"
+       "fmt"
+       "strings"
+
+       "github.com/apache/iceberg-go"
+       "github.com/beltran/gohive/hive_metastore"
+)
+
+func schemaToHiveColumns(schema *iceberg.Schema) []*hive_metastore.FieldSchema 
{
+       columns := make([]*hive_metastore.FieldSchema, 0, len(schema.Fields()))
+       for _, field := range schema.Fields() {
+               columns = append(columns, fieldToHiveColumn(field))
+       }
+
+       return columns
+}
+
+func fieldToHiveColumn(field iceberg.NestedField) *hive_metastore.FieldSchema {
+       return &hive_metastore.FieldSchema{
+               Name:    field.Name,
+               Type:    icebergTypeToHiveType(field.Type),
+               Comment: field.Doc,
+       }
+}
+
+// icebergTypeToHiveType converts an Iceberg type to a Hive type string.
+// Reference: 
https://cwiki.apache.org/confluence/display/hive/languagemanual+types
+func icebergTypeToHiveType(typ iceberg.Type) string {
+       switch t := typ.(type) {
+       case iceberg.Int64Type:
+               return "bigint"
+       case iceberg.TimeType:
+               // Hive doesn't have a native time type, use string
+               return "string"
+       case iceberg.TimestampTzType:
+               return "timestamp"
+       case iceberg.UUIDType:
+               // Represent UUID as string
+               return "string"
+       case iceberg.DecimalType:
+               return fmt.Sprintf("decimal(%d,%d)", t.Precision(), t.Scale())
+       case iceberg.FixedType:
+               return fmt.Sprintf("binary(%d)", t.Len())
+       case *iceberg.StructType:
+               var fieldStrings []string
+               for _, field := range t.Fields() {
+                       fieldStrings = append(fieldStrings,
+                               fmt.Sprintf("%s:%s", field.Name, 
icebergTypeToHiveType(field.Type)))
+               }
+
+               return fmt.Sprintf("struct<%s>", strings.Join(fieldStrings, 
","))
+       case *iceberg.ListType:
+               return fmt.Sprintf("array<%s>", 
icebergTypeToHiveType(t.ElementField().Type))
+       case *iceberg.MapType:
+               keyField := t.KeyField()
+               valueField := t.ValueField()
+
+               return fmt.Sprintf("map<%s,%s>",
+                       icebergTypeToHiveType(keyField.Type),
+                       icebergTypeToHiveType(valueField.Type))
+       default:
+               return typ.String()
+       }
+}
+
+func constructHiveTable(dbName, tableName, location, metadataLocation string, 
schema *iceberg.Schema, props map[string]string) *hive_metastore.Table {
+       parameters := make(map[string]string)
+
+       // Set Iceberg-specific parameters
+       parameters[TableTypeKey] = TableTypeIceberg
+       parameters[MetadataLocationKey] = metadataLocation
+       parameters[ExternalKey] = "TRUE"
+
+       // Set storage handler - required for Hive to query Iceberg tables
+       parameters["storage_handler"] = 
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"
+
+       // Copy additional properties
+       for k, v := range props {
+               parameters[k] = v
+       }
+
+       return &hive_metastore.Table{
+               TableName: tableName,
+               DbName:    dbName,
+               TableType: TableTypeExternalTable,
+               Sd: &hive_metastore.StorageDescriptor{
+                       Cols:         schemaToHiveColumns(schema),
+                       Location:     location,
+                       InputFormat:  
"org.apache.iceberg.mr.hive.HiveIcebergInputFormat",
+                       OutputFormat: 
"org.apache.iceberg.mr.hive.HiveIcebergOutputFormat",
+                       SerdeInfo: &hive_metastore.SerDeInfo{
+                               SerializationLib: 
"org.apache.iceberg.mr.hive.HiveIcebergSerDe",
+                       },
+               },
+               Parameters: parameters,
+       }
+}
+
+func updateHiveTableForCommit(existing *hive_metastore.Table, 
newMetadataLocation string) *hive_metastore.Table {
+       // Copy the existing table
+       updated := *existing
+
+       // Update parameters
+       if updated.Parameters == nil {
+               updated.Parameters = make(map[string]string)
+       }
+
+       // Store previous metadata location
+       if oldLocation, ok := updated.Parameters[MetadataLocationKey]; ok {
+               updated.Parameters[PreviousMetadataLocationKey] = oldLocation
+       }
+
+       // Set new metadata location
+       updated.Parameters[MetadataLocationKey] = newMetadataLocation
+
+       return &updated
+}
+
+// isIcebergTable checks if a Hive table is an Iceberg table.
+func isIcebergTable(tbl *hive_metastore.Table) bool {
+       if tbl == nil || tbl.Parameters == nil {
+               return false
+       }
+
+       tableType, ok := tbl.Parameters[TableTypeKey]
+       if !ok {
+               return false
+       }
+
+       return strings.EqualFold(tableType, TableTypeIceberg)
+}
+
+func getMetadataLocation(tbl *hive_metastore.Table) (string, error) {
+       if tbl == nil || tbl.Parameters == nil {
+               return "", errors.New("table has no parameters")
+       }
+
+       location, ok := tbl.Parameters[MetadataLocationKey]
+       if !ok {
+               return "", fmt.Errorf("table does not have %s parameter", 
MetadataLocationKey)
+       }
+
+       return location, nil
+}
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index 6d92ca0e..12838591 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
        "github.com/apache/iceberg-go/catalog/glue"
+       "github.com/apache/iceberg-go/catalog/hive"
        "github.com/apache/iceberg-go/catalog/rest"
        "github.com/apache/iceberg-go/config"
        "github.com/apache/iceberg-go/table"
@@ -196,6 +197,16 @@ func main() {
                        glue.WithAwsConfig(awscfg),
                }
                cat = glue.NewCatalog(opts...)
+       case catalog.Hive:
+               props := iceberg.Properties{
+                       hive.URI: cfg.URI,
+               }
+               if len(cfg.Warehouse) > 0 {
+                       props[hive.Warehouse] = cfg.Warehouse
+               }
+               if cat, err = hive.NewCatalog(props); err != nil {
+                       log.Fatal(err)
+               }
        default:
                log.Fatal("unrecognized catalog type")
        }
diff --git a/go.mod b/go.mod
index 00876bf8..cfaab8ce 100644
--- a/go.mod
+++ b/go.mod
@@ -31,6 +31,7 @@ require (
        github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0
        github.com/aws/smithy-go v1.24.0
        github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools 
v0.0.0-20250407191926-092f3e54b837
+       github.com/beltran/gohive v1.8.1
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
        github.com/docker/docker v28.5.2+incompatible
        github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815
@@ -100,6 +101,8 @@ require (
        github.com/aws/aws-sdk-go-v2/service/sso v1.30.9 // indirect
        github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.13 // indirect
        github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
+       github.com/beltran/gosasl v1.0.0 // indirect
+       github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab // indirect
        github.com/beorn7/perks v1.0.1 // indirect
        github.com/buger/goterm v1.0.4 // indirect
        github.com/cenkalti/backoff/v4 v4.3.0 // indirect
@@ -150,6 +153,7 @@ require (
        github.com/go-openapi/jsonreference v0.20.2 // indirect
        github.com/go-openapi/swag v0.23.0 // indirect
        github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
+       github.com/go-zookeeper/zk v1.0.4 // indirect
        github.com/goccy/go-json v0.10.5 // indirect
        github.com/goccy/go-yaml v1.17.1 // indirect
        github.com/gofrs/flock v0.12.1 // indirect
diff --git a/go.sum b/go.sum
index 25e8af56..92dee50b 100644
--- a/go.sum
+++ b/go.sum
@@ -148,6 +148,12 @@ github.com/aws/smithy-go v1.24.0 
h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk=
 github.com/aws/smithy-go v1.24.0/go.mod 
h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
 github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools 
v0.0.0-20250407191926-092f3e54b837 
h1:8eMceEa0ib+nqJuGsyowuZaVBVAr685oK6WrNIit+0g=
 github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools 
v0.0.0-20250407191926-092f3e54b837/go.mod 
h1:9Oj/8PZn3D5Ftp/Z1QWrIEFE0daERMqfJawL9duHRfc=
+github.com/beltran/gohive v1.8.1 
h1:qlygmroy3mKtKIQSpV/FqXJHty1LsPxF+JTQA5mbjwU=
+github.com/beltran/gohive v1.8.1/go.mod 
h1:BCgNAhr/wnbyXfp2yN9ZY4pVrGrtVqG4hhNDDXIal1U=
+github.com/beltran/gosasl v1.0.0 
h1:iiRtLxkvKhrNv3Ohh/n2NiyyfwIo/UbMzy/dZWiUHXE=
+github.com/beltran/gosasl v1.0.0/go.mod 
h1:Qx8cW6jkI8riyzmklj80kAIkv+iezFUTBiGU0qHhHes=
+github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab 
h1:ayfcn60tXOSYy5zUN1AMSTQo4nJCf7hrdzAVchpPst4=
+github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab/go.mod 
h1:GLe4UoSyvJ3cVG+DVtKen5eAiaD8mAJFuV5PT3Eeg9Q=
 github.com/beorn7/perks v0.0.0-20150223135152-b965b613227f/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod 
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -315,6 +321,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 
h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
 github.com/go-task/slim-sprig/v3 v3.0.0/go.mod 
h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
 github.com/go-viper/mapstructure/v2 v2.4.0 
h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
 github.com/go-viper/mapstructure/v2 v2.4.0/go.mod 
h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
+github.com/go-zookeeper/zk v1.0.4 
h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I=
+github.com/go-zookeeper/zk v1.0.4/go.mod 
h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 github.com/goccy/go-json v0.10.5 
h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
 github.com/goccy/go-json v0.10.5/go.mod 
h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
 github.com/goccy/go-yaml v1.17.1 
h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY=
diff --git a/internal/recipe/docker-compose.yml 
b/internal/recipe/docker-compose.yml
index f205db42..367ee666 100644
--- a/internal/recipe/docker-compose.yml
+++ b/internal/recipe/docker-compose.yml
@@ -118,5 +118,42 @@ services:
       - 4443:4443
     command: ["-scheme", "http", "-port", "4443", "-backend", "memory", 
"-public-host", "fake-gcs-server:4443"]
 
+  hive-metastore:
+    image: apache/hive:4.0.0
+    container_name: hive-metastore
+    user: root
+    networks:
+      iceberg_net:
+    ports:
+      - 9083:9083
+    environment:
+      - SERVICE_NAME=metastore
+      - DB_DRIVER=derby
+    volumes:
+      - hive-metastore-data:/opt/hive/data
+      - /tmp/iceberg-warehouse:/tmp/iceberg-warehouse
+
+  hiveserver2:
+    image: apache/hive:4.0.0
+    container_name: hiveserver2
+    user: root
+    networks:
+      iceberg_net:
+    depends_on:
+      - hive-metastore
+    ports:
+      - "10003:10000"  # Adjusted to avoid conflict with spark-iceberg
+      - "10002:10002"  # Web UI
+    environment:
+      - SERVICE_NAME=hiveserver2
+      - SERVICE_OPTS=-Dhive.metastore.uris=thrift://hive-metastore:9083
+      - IS_RESUME=true
+    volumes:
+      - hive-metastore-data:/opt/hive/data
+      - /tmp/iceberg-warehouse:/tmp/iceberg-warehouse
+
 networks:
   iceberg_net:
+
+volumes:
+  hive-metastore-data:

Reply via email to