This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 55492586 feat(catalog): initial support for Hive Catalog (#678)
55492586 is described below
commit 55492586ff935c0404a261235649c95fc4d56c03
Author: Kanthi <[email protected]>
AuthorDate: Tue Feb 10 11:38:18 2026 -0600
feat(catalog): initial support for Hive Catalog (#678)
closes: #616
---
.github/workflows/go-integration.yml | 2 +-
catalog/hive/client.go | 144 +++++++
catalog/hive/hive.go | 571 +++++++++++++++++++++++++
catalog/hive/hive_integration_test.go | 320 ++++++++++++++
catalog/hive/hive_test.go | 758 ++++++++++++++++++++++++++++++++++
catalog/hive/lock.go | 123 ++++++
catalog/hive/lock_test.go | 294 +++++++++++++
catalog/hive/options.go | 119 ++++++
catalog/hive/schema.go | 164 ++++++++
cmd/iceberg/main.go | 11 +
go.mod | 4 +
go.sum | 8 +
internal/recipe/docker-compose.yml | 37 ++
13 files changed, 2554 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/go-integration.yml
b/.github/workflows/go-integration.yml
index d8d94427..76941e45 100644
--- a/.github/workflows/go-integration.yml
+++ b/.github/workflows/go-integration.yml
@@ -69,7 +69,7 @@ jobs:
go test -tags integration -v -run="^TestScanner" ./table
go test -tags integration -v ./io
go test -tags integration -v -run="^TestRestIntegration$"
./catalog/rest
-
+ go test -tags=integration -v ./catalog/hive/...
- name: Run spark integration tests
env:
AWS_S3_ENDPOINT: "${{ env.AWS_S3_ENDPOINT }}"
diff --git a/catalog/hive/client.go b/catalog/hive/client.go
new file mode 100644
index 00000000..06a696d0
--- /dev/null
+++ b/catalog/hive/client.go
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+ "context"
+ "fmt"
+ "net/url"
+ "strconv"
+
+ "github.com/beltran/gohive"
+ "github.com/beltran/gohive/hive_metastore"
+)
+
+type HiveClient interface {
+ Close() error
+
+ GetDatabase(ctx context.Context, name string)
(*hive_metastore.Database, error)
+ CreateDatabase(ctx context.Context, database *hive_metastore.Database)
error
+ AlterDatabase(ctx context.Context, dbname string, db
*hive_metastore.Database) error
+ DropDatabase(ctx context.Context, name string, deleteData, cascade
bool) error
+ GetAllDatabases(ctx context.Context) ([]string, error)
+
+ GetTable(ctx context.Context, dbName, tableName string)
(*hive_metastore.Table, error)
+ CreateTable(ctx context.Context, tbl *hive_metastore.Table) error
+ AlterTable(ctx context.Context, dbName, tableName string, newTbl
*hive_metastore.Table) error
+ DropTable(ctx context.Context, dbName, tableName string, deleteData
bool) error
+ GetTables(ctx context.Context, dbName, pattern string) ([]string, error)
+
+ Lock(ctx context.Context, request *hive_metastore.LockRequest)
(*hive_metastore.LockResponse, error)
+ CheckLock(ctx context.Context, lockId int64)
(*hive_metastore.LockResponse, error)
+ Unlock(ctx context.Context, lockId int64) error
+}
+
+type thriftClient struct {
+ *gohive.HiveMetastoreClient
+}
+
+func newHiveClient(uri string, opts *HiveOptions) (HiveClient, error) {
+ parsed, err := url.Parse(uri)
+ if err != nil {
+ return nil, fmt.Errorf("invalid URI: %w", err)
+ }
+
+ host := parsed.Hostname()
+ portStr := parsed.Port()
+ if portStr == "" {
+ portStr = "9083"
+ }
+ port, err := strconv.Atoi(portStr)
+ if err != nil {
+ return nil, fmt.Errorf("invalid port: %w", err)
+ }
+
+ config := gohive.NewMetastoreConnectConfiguration()
+ config.TransportMode = "binary"
+
+ client, err := gohive.ConnectToMetastore(host, port, "NOSASL", config)
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to metastore: %w",
err)
+ }
+
+ return &thriftClient{HiveMetastoreClient: client}, nil
+}
+
+func (c *thriftClient) Close() error {
+ if c.HiveMetastoreClient != nil {
+ c.HiveMetastoreClient.Close()
+ c.HiveMetastoreClient = nil
+ }
+
+ return nil
+}
+
+func (c *thriftClient) GetDatabase(ctx context.Context, name string)
(*hive_metastore.Database, error) {
+ return c.Client.GetDatabase(ctx, name)
+}
+
+func (c *thriftClient) CreateDatabase(ctx context.Context, database
*hive_metastore.Database) error {
+ return c.Client.CreateDatabase(ctx, database)
+}
+
+func (c *thriftClient) AlterDatabase(ctx context.Context, dbname string, db
*hive_metastore.Database) error {
+ return c.Client.AlterDatabase(ctx, dbname, db)
+}
+
+func (c *thriftClient) DropDatabase(ctx context.Context, name string,
deleteData, cascade bool) error {
+ return c.Client.DropDatabase(ctx, name, deleteData, cascade)
+}
+
+func (c *thriftClient) GetAllDatabases(ctx context.Context) ([]string, error) {
+ return c.Client.GetAllDatabases(ctx)
+}
+
+func (c *thriftClient) GetTable(ctx context.Context, dbName, tableName string)
(*hive_metastore.Table, error) {
+ return c.Client.GetTable(ctx, dbName, tableName)
+}
+
+func (c *thriftClient) CreateTable(ctx context.Context, tbl
*hive_metastore.Table) error {
+ return c.Client.CreateTable(ctx, tbl)
+}
+
+func (c *thriftClient) AlterTable(ctx context.Context, dbName, tableName
string, newTbl *hive_metastore.Table) error {
+ return c.Client.AlterTable(ctx, dbName, tableName, newTbl)
+}
+
+func (c *thriftClient) DropTable(ctx context.Context, dbName, tableName
string, deleteData bool) error {
+ return c.Client.DropTable(ctx, dbName, tableName, deleteData)
+}
+
+func (c *thriftClient) GetTables(ctx context.Context, dbName, pattern string)
([]string, error) {
+ return c.Client.GetTables(ctx, dbName, pattern)
+}
+
+func (c *thriftClient) Lock(ctx context.Context, request
*hive_metastore.LockRequest) (*hive_metastore.LockResponse, error) {
+ return c.Client.Lock(ctx, request)
+}
+
+func (c *thriftClient) CheckLock(ctx context.Context, lockId int64)
(*hive_metastore.LockResponse, error) {
+ return c.Client.CheckLock(ctx, &hive_metastore.CheckLockRequest{
+ Lockid: lockId,
+ })
+}
+
+func (c *thriftClient) Unlock(ctx context.Context, lockId int64) error {
+ return c.Client.Unlock(ctx, &hive_metastore.UnlockRequest{
+ Lockid: lockId,
+ })
+}
diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go
new file mode 100644
index 00000000..2236ab56
--- /dev/null
+++ b/catalog/hive/hive.go
@@ -0,0 +1,571 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "iter"
+ "maps"
+ "strings"
+ _ "unsafe"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/catalog/internal"
+ "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table"
+ "github.com/beltran/gohive/hive_metastore"
+)
+
+const (
+ locationKey = "location"
+ commentKey = "comment"
+ descriptionKey = "description"
+)
+
+var _ catalog.Catalog = (*Catalog)(nil)
+
+func init() {
+ catalog.Register("hive", catalog.RegistrarFunc(func(ctx
context.Context, _ string, props iceberg.Properties) (catalog.Catalog, error) {
+ return NewCatalog(props)
+ }))
+}
+
+type Catalog struct {
+ client HiveClient
+ opts *HiveOptions
+}
+
+func NewCatalog(props iceberg.Properties, opts ...Option) (*Catalog, error) {
+ o := NewHiveOptions()
+ o.ApplyProperties(props)
+
+ for _, opt := range opts {
+ opt(o)
+ }
+
+ if o.URI == "" {
+ return nil, errors.New("hive.uri is required")
+ }
+
+ client, err := newHiveClient(o.URI, o)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create Hive client: %w", err)
+ }
+
+ return &Catalog{
+ client: client,
+ opts: o,
+ }, nil
+}
+
+func NewCatalogWithClient(client HiveClient, props iceberg.Properties)
*Catalog {
+ o := NewHiveOptions()
+ o.ApplyProperties(props)
+
+ return &Catalog{
+ client: client,
+ opts: o,
+ }
+}
+
+func (c *Catalog) CatalogType() catalog.Type {
+ return catalog.Hive
+}
+
+func (c *Catalog) Close() error {
+ return c.client.Close()
+}
+
+// ListTables returns a list of table identifiers in the given namespace.
+func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier)
iter.Seq2[table.Identifier, error] {
+ return func(yield func(table.Identifier, error) bool) {
+ database, err := identifierToDatabase(namespace)
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+
+ tableNames, err := c.client.GetTables(ctx, database, "*")
+ if err != nil {
+ yield(nil, fmt.Errorf("failed to list tables in %s:
%w", database, err))
+
+ return
+ }
+
+ if len(tableNames) == 0 {
+ return
+ }
+
+ for _, tableName := range tableNames {
+ tbl, err := c.client.GetTable(ctx, database, tableName)
+ if err != nil {
+ continue
+ }
+ if isIcebergTable(tbl) {
+ if !yield(TableIdentifier(database, tableName),
nil) {
+ return
+ }
+ }
+ }
+ }
+}
+
+func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier)
(*table.Table, error) {
+ database, tableName, err := identifierToTableName(identifier)
+ if err != nil {
+ return nil, err
+ }
+
+ hiveTbl, err := c.getIcebergTable(ctx, database, tableName)
+ if err != nil {
+ return nil, err
+ }
+
+ metadataLocation, err := getMetadataLocation(hiveTbl)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get metadata location: %w",
err)
+ }
+
+ return table.NewFromLocation(
+ ctx,
+ identifier,
+ metadataLocation,
+ io.LoadFSFunc(c.opts.props, metadataLocation),
+ c,
+ )
+}
+
+func (c *Catalog) CreateTable(ctx context.Context, identifier
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt)
(*table.Table, error) {
+ staged, err := internal.CreateStagedTable(ctx, c.opts.props,
c.LoadNamespaceProperties, identifier, schema, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ database, tableName, err := identifierToTableName(identifier)
+ if err != nil {
+ return nil, err
+ }
+
+ afs, err := staged.FS(ctx)
+ if err != nil {
+ return nil, err
+ }
+ wfs, ok := afs.(io.WriteFileIO)
+ if !ok {
+ return nil, errors.New("loaded filesystem IO does not support
writing")
+ }
+
+ compression :=
staged.Table.Properties().Get(table.MetadataCompressionKey,
table.MetadataCompressionDefault)
+ if err := internal.WriteTableMetadata(staged.Metadata(), wfs,
staged.MetadataLocation(), compression); err != nil {
+ return nil, err
+ }
+
+ hiveTbl := constructHiveTable(database, tableName, staged.Location(),
staged.MetadataLocation(), schema, staged.Properties())
+
+ if err := c.client.CreateTable(ctx, hiveTbl); err != nil {
+ if isAlreadyExistsError(err) {
+ return nil, fmt.Errorf("%w: %s.%s",
catalog.ErrTableAlreadyExists, database, tableName)
+ }
+
+ return nil, fmt.Errorf("failed to create table %s.%s: %w",
database, tableName, err)
+ }
+
+ return c.LoadTable(ctx, identifier)
+}
+
+func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier)
error {
+ database, tableName, err := identifierToTableName(identifier)
+ if err != nil {
+ return err
+ }
+
+ if _, err := c.getIcebergTable(ctx, database, tableName); err != nil {
+ return err
+ }
+
+ if err := c.client.DropTable(ctx, database, tableName, false); err !=
nil {
+ return fmt.Errorf("failed to drop table %s.%s: %w", database,
tableName, err)
+ }
+
+ return nil
+}
+
+func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier)
(*table.Table, error) {
+ fromDB, fromTable, err := identifierToTableName(from)
+ if err != nil {
+ return nil, err
+ }
+
+ toDB, toTable, err := identifierToTableName(to)
+ if err != nil {
+ return nil, err
+ }
+
+ exists, err := c.CheckNamespaceExists(ctx, DatabaseIdentifier(toDB))
+ if err != nil {
+ return nil, err
+ }
+ if !exists {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace,
toDB)
+ }
+
+ hiveTbl, err := c.getIcebergTable(ctx, fromDB, fromTable)
+ if err != nil {
+ return nil, err
+ }
+
+ hiveTbl.TableName = toTable
+ hiveTbl.DbName = toDB
+
+ if err := c.client.AlterTable(ctx, fromDB, fromTable, hiveTbl); err !=
nil {
+ return nil, fmt.Errorf("failed to rename table %s.%s to %s.%s:
%w", fromDB, fromTable, toDB, toTable, err)
+ }
+
+ return c.LoadTable(ctx, to)
+}
+
+func (c *Catalog) CommitTable(ctx context.Context, identifier
table.Identifier, requirements []table.Requirement, updates []table.Update)
(table.Metadata, string, error) {
+ database, tableName, err := identifierToTableName(identifier)
+ if err != nil {
+ return nil, "", err
+ }
+
+ lock, err := acquireLock(ctx, c.client, database, tableName, c.opts)
+ if err != nil {
+ return nil, "", fmt.Errorf("failed to acquire lock for %s.%s:
%w", database, tableName, err)
+ }
+ defer func() {
+ _ = lock.Release(ctx)
+ }()
+
+ currentHiveTbl, err := c.client.GetTable(ctx, database, tableName)
+ if err != nil && !isNoSuchObjectError(err) {
+ return nil, "", err
+ }
+
+ var current *table.Table
+ if currentHiveTbl != nil && isIcebergTable(currentHiveTbl) {
+ metadataLoc, err := getMetadataLocation(currentHiveTbl)
+ if err != nil {
+ return nil, "", err
+ }
+ current, err = table.NewFromLocation(ctx, identifier,
metadataLoc, io.LoadFSFunc(c.opts.props, metadataLoc), c)
+ if err != nil {
+ return nil, "", err
+ }
+ }
+
+ staged, err := internal.UpdateAndStageTable(ctx, current, identifier,
requirements, updates, c)
+ if err != nil {
+ return nil, "", err
+ }
+
+ if current != nil && staged.Metadata().Equals(current.Metadata()) {
+ return current.Metadata(), current.MetadataLocation(), nil
+ }
+
+ if err := internal.WriteMetadata(ctx, staged.Metadata(),
staged.MetadataLocation(), staged.Properties()); err != nil {
+ return nil, "", err
+ }
+
+ if current != nil {
+ updatedHiveTbl := updateHiveTableForCommit(currentHiveTbl,
staged.MetadataLocation())
+
+ if err := c.client.AlterTable(ctx, database, tableName,
updatedHiveTbl); err != nil {
+ return nil, "", fmt.Errorf("failed to commit table
%s.%s: %w", database, tableName, err)
+ }
+ } else {
+ hiveTbl := constructHiveTable(database, tableName,
staged.Location(), staged.MetadataLocation(),
staged.Metadata().CurrentSchema(), staged.Properties())
+ if err := c.client.CreateTable(ctx, hiveTbl); err != nil {
+ return nil, "", fmt.Errorf("failed to create table
%s.%s: %w", database, tableName, err)
+ }
+ }
+
+ return staged.Metadata(), staged.MetadataLocation(), nil
+}
+
+// CheckTableExists checks if a table exists in the catalog.
+func (c *Catalog) CheckTableExists(ctx context.Context, identifier
table.Identifier) (bool, error) {
+ database, tableName, err := identifierToTableName(identifier)
+ if err != nil {
+ return false, err
+ }
+
+ hiveTbl, err := c.client.GetTable(ctx, database, tableName)
+ if err != nil {
+ if isNoSuchObjectError(err) {
+ return false, nil
+ }
+
+ return false, err
+ }
+
+ return isIcebergTable(hiveTbl), nil
+}
+
+func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier)
([]table.Identifier, error) {
+ if len(parent) > 0 {
+ return nil, errors.New("hierarchical namespace is not
supported")
+ }
+
+ databases, err := c.client.GetAllDatabases(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to list namespaces: %w", err)
+ }
+
+ namespaces := make([]table.Identifier, len(databases))
+ for i, db := range databases {
+ namespaces[i] = DatabaseIdentifier(db)
+ }
+
+ return namespaces, nil
+}
+
+// CreateNamespace creates a new namespace in the catalog.
+func (c *Catalog) CreateNamespace(ctx context.Context, namespace
table.Identifier, props iceberg.Properties) error {
+ database, err := identifierToDatabase(namespace)
+ if err != nil {
+ return err
+ }
+
+ db := &hive_metastore.Database{
+ Name: database,
+ Parameters: make(map[string]string),
+ }
+
+ for k, v := range props {
+ switch k {
+ case locationKey, "Location":
+ db.LocationUri = v
+ case commentKey, descriptionKey, "Description":
+ db.Description = v
+ default:
+ db.Parameters[k] = v
+ }
+ }
+
+ if err := c.client.CreateDatabase(ctx, db); err != nil {
+ if isAlreadyExistsError(err) {
+ return fmt.Errorf("%w: %s",
catalog.ErrNamespaceAlreadyExists, database)
+ }
+
+ return fmt.Errorf("failed to create namespace %s: %w",
database, err)
+ }
+
+ return nil
+}
+
+// DropNamespace drops a namespace from the catalog.
+func (c *Catalog) DropNamespace(ctx context.Context, namespace
table.Identifier) error {
+ database, err := identifierToDatabase(namespace)
+ if err != nil {
+ return err
+ }
+
+ _, err = c.client.GetDatabase(ctx, database)
+ if err != nil {
+ if isNoSuchObjectError(err) {
+ return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace,
database)
+ }
+
+ return err
+ }
+
+ if err := c.client.DropDatabase(ctx, database, false, false); err !=
nil {
+ if isInvalidOperationError(err) {
+ return fmt.Errorf("%w: %s",
catalog.ErrNamespaceNotEmpty, database)
+ }
+
+ return fmt.Errorf("failed to drop namespace %s: %w", database,
err)
+ }
+
+ return nil
+}
+
+// CheckNamespaceExists checks if a namespace exists in the catalog.
+func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace
table.Identifier) (bool, error) {
+ database, err := identifierToDatabase(namespace)
+ if err != nil {
+ return false, err
+ }
+
+ _, err = c.client.GetDatabase(ctx, database)
+ if err != nil {
+ if isNoSuchObjectError(err) {
+ return false, nil
+ }
+
+ return false, err
+ }
+
+ return true, nil
+}
+
+// LoadNamespaceProperties loads the properties for a namespace.
+func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace
table.Identifier) (iceberg.Properties, error) {
+ database, err := identifierToDatabase(namespace)
+ if err != nil {
+ return nil, err
+ }
+
+ db, err := c.client.GetDatabase(ctx, database)
+ if err != nil {
+ if isNoSuchObjectError(err) {
+ return nil, fmt.Errorf("%w: %s",
catalog.ErrNoSuchNamespace, database)
+ }
+
+ return nil, fmt.Errorf("failed to get namespace %s: %w",
database, err)
+ }
+
+ props := make(iceberg.Properties)
+ if db.Parameters != nil {
+ maps.Copy(props, db.Parameters)
+ }
+ if db.LocationUri != "" {
+ props[locationKey] = db.LocationUri
+ }
+ if db.Description != "" {
+ props[commentKey] = db.Description
+ }
+
+ return props, nil
+}
+
+//go:linkname getUpdatedPropsAndUpdateSummary
github.com/apache/iceberg-go/catalog.getUpdatedPropsAndUpdateSummary
+func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals
[]string, updates iceberg.Properties) (iceberg.Properties,
catalog.PropertiesUpdateSummary, error)
+
+// UpdateNamespaceProperties updates the properties for a namespace.
+func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace
table.Identifier,
+ removals []string, updates iceberg.Properties,
+) (catalog.PropertiesUpdateSummary, error) {
+ currentProps, err := c.LoadNamespaceProperties(ctx, namespace)
+ if err != nil {
+ return catalog.PropertiesUpdateSummary{}, err
+ }
+
+ updatedProperties, propertiesUpdateSummary, err :=
getUpdatedPropsAndUpdateSummary(currentProps, removals, updates)
+ if err != nil {
+ return catalog.PropertiesUpdateSummary{}, err
+ }
+
+ database, _ := identifierToDatabase(namespace)
+
+ db := &hive_metastore.Database{
+ Name: database,
+ Parameters: make(map[string]string),
+ }
+
+ for k, v := range updatedProperties {
+ switch k {
+ case locationKey, "Location":
+ db.LocationUri = v
+ case commentKey, descriptionKey, "Description":
+ db.Description = v
+ default:
+ db.Parameters[k] = v
+ }
+ }
+
+ if err := c.client.AlterDatabase(ctx, database, db); err != nil {
+ return catalog.PropertiesUpdateSummary{}, fmt.Errorf("failed to
update namespace properties %s: %w", database, err)
+ }
+
+ return propertiesUpdateSummary, nil
+}
+
+// getIcebergTable retrieves a table and validates it's an Iceberg table.
+func (c *Catalog) getIcebergTable(ctx context.Context, database, tableName
string) (*hive_metastore.Table, error) {
+ hiveTbl, err := c.client.GetTable(ctx, database, tableName)
+ if err != nil {
+ if isNoSuchObjectError(err) {
+ return nil, fmt.Errorf("%w: %s.%s",
catalog.ErrNoSuchTable, database, tableName)
+ }
+
+ return nil, fmt.Errorf("failed to get table %s.%s: %w",
database, tableName, err)
+ }
+
+ if !isIcebergTable(hiveTbl) {
+ return nil, fmt.Errorf("table %s.%s is not an Iceberg table",
database, tableName)
+ }
+
+ return hiveTbl, nil
+}
+
+func identifierToTableName(identifier table.Identifier) (string, string,
error) {
+ if len(identifier) != 2 {
+ return "", "", fmt.Errorf("invalid identifier, expected
[database, table]: %v", identifier)
+ }
+
+ return identifier[0], identifier[1], nil
+}
+
+func identifierToDatabase(identifier table.Identifier) (string, error) {
+ if len(identifier) != 1 {
+ return "", fmt.Errorf("invalid identifier, expected [database]:
%v", identifier)
+ }
+
+ return identifier[0], nil
+}
+
+// TableIdentifier returns a table identifier for a Hive table.
+func TableIdentifier(database, tableName string) table.Identifier {
+ return []string{database, tableName}
+}
+
+// DatabaseIdentifier returns a database identifier for a Hive database.
+func DatabaseIdentifier(database string) table.Identifier {
+ return []string{database}
+}
+
+func isNoSuchObjectError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ errStr := err.Error()
+
+ return strings.Contains(errStr, "NoSuchObjectException") ||
+ strings.Contains(errStr, "not found") ||
+ strings.Contains(errStr, "does not exist")
+}
+
+func isAlreadyExistsError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ errStr := err.Error()
+
+ return strings.Contains(errStr, "AlreadyExistsException") ||
+ strings.Contains(errStr, "already exists")
+}
+
+func isInvalidOperationError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ errStr := err.Error()
+
+ return strings.Contains(errStr, "InvalidOperationException") ||
+ strings.Contains(errStr, "is not empty")
+}
diff --git a/catalog/hive/hive_integration_test.go
b/catalog/hive/hive_integration_test.go
new file mode 100644
index 00000000..cc0e6f1f
--- /dev/null
+++ b/catalog/hive/hive_integration_test.go
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build integration
+
+package hive
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/catalog"
+ "github.com/stretchr/testify/require"
+)
+
+// Integration tests for the Hive Metastore catalog.
+// These tests require a running Hive Metastore instance.
+//
+// To run these tests:
+// 1. Start the Hive Metastore using Docker:
+// cd internal/recipe && docker-compose up -d hive-metastore
+//
+// 2. Set the required environment variables:
+// export TEST_HIVE_URI=thrift://localhost:9083
+// export TEST_HIVE_DATABASE=test_db
+// export TEST_TABLE_LOCATION=/tmp/iceberg/warehouse
+//
+// 3. Run the tests:
+// go test -tags=integration -v ./catalog/hive/...
+
+func getTestHiveURI() string {
+ uri := os.Getenv("TEST_HIVE_URI")
+ if uri == "" {
+ return "thrift://localhost:9083"
+ }
+ return uri
+}
+
+func getTestDatabase() string {
+ db := os.Getenv("TEST_HIVE_DATABASE")
+ if db == "" {
+ return "test_iceberg_db"
+ }
+ return db
+}
+
+func getTestTableLocation() string {
+ loc := os.Getenv("TEST_TABLE_LOCATION")
+ if loc == "" {
+ return "/tmp/iceberg/warehouse"
+ }
+ return loc
+}
+
+func createTestCatalog(t *testing.T) *Catalog {
+ t.Helper()
+
+ props := iceberg.Properties{
+ URI: getTestHiveURI(),
+ Warehouse: getTestTableLocation(),
+ }
+
+ cat, err := NewCatalog(props)
+ require.NoError(t, err)
+
+ return cat
+}
+
+func TestHiveIntegrationListNamespaces(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ namespaces, err := cat.ListNamespaces(context.TODO(), nil)
+ assert.NoError(err)
+ assert.NotNil(namespaces)
+
+ t.Logf("Found %d namespaces", len(namespaces))
+ for _, ns := range namespaces {
+ t.Logf(" - %v", ns)
+ }
+}
+
+func TestHiveIntegrationCreateAndDropNamespace(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+
+ // Create namespace
+ props := iceberg.Properties{
+ "comment": "Test database for integration tests",
+ "location": getTestTableLocation() + "/" + dbName,
+ }
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
props)
+ assert.NoError(err)
+
+ // Check it exists
+ exists, err := cat.CheckNamespaceExists(context.TODO(),
DatabaseIdentifier(dbName))
+ assert.NoError(err)
+ assert.True(exists)
+
+ // Load properties
+ loadedProps, err := cat.LoadNamespaceProperties(context.TODO(),
DatabaseIdentifier(dbName))
+ assert.NoError(err)
+ assert.Equal("Test database for integration tests",
loadedProps["comment"])
+
+ // Drop namespace
+ err = cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+ assert.NoError(err)
+
+ // Verify it's gone
+ exists, err = cat.CheckNamespaceExists(context.TODO(),
DatabaseIdentifier(dbName))
+ assert.NoError(err)
+ assert.False(exists)
+}
+
+func TestHiveIntegrationUpdateNamespaceProperties(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+
+ // Create namespace with initial properties
+ initialProps := iceberg.Properties{
+ "location": fmt.Sprintf("/tmp/iceberg-warehouse/%s", dbName),
+ "key1": "value1",
+ "key2": "value2",
+ }
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
initialProps)
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ // Update properties
+ updates := iceberg.Properties{
+ "key2": "updated_value2",
+ "key3": "value3",
+ }
+ removals := []string{"key1"}
+
+ summary, err := cat.UpdateNamespaceProperties(context.TODO(),
DatabaseIdentifier(dbName), removals, updates)
+ assert.NoError(err)
+ assert.Contains(summary.Removed, "key1")
+ assert.Contains(summary.Updated, "key2")
+ assert.Contains(summary.Updated, "key3")
+
+ // Verify updates
+ props, err := cat.LoadNamespaceProperties(context.TODO(),
DatabaseIdentifier(dbName))
+ assert.NoError(err)
+ assert.Equal("updated_value2", props["key2"])
+ assert.Equal("value3", props["key3"])
+ _, exists := props["key1"]
+ assert.False(exists)
+}
+
+func TestHiveIntegrationCreateAndListTables(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+ tableName := "test_table"
+
+ // Create namespace
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ // Create table
+ schema := iceberg.NewSchemaWithIdentifiers(0, []int{1},
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String},
+ )
+
+ tableLocation := getTestTableLocation() + "/" + dbName + "/" + tableName
+ tbl, err := cat.CreateTable(context.TODO(), TableIdentifier(dbName,
tableName), schema,
+ catalog.WithLocation(tableLocation),
+ )
+ assert.NoError(err)
+ assert.NotNil(tbl)
+ defer cat.DropTable(context.TODO(), TableIdentifier(dbName, tableName))
+
+ // Verify table exists
+ exists, err := cat.CheckTableExists(context.TODO(),
TableIdentifier(dbName, tableName))
+ assert.NoError(err)
+ assert.True(exists)
+
+ // List tables
+ tables := make([][]string, 0)
+ for tblIdent, err := range cat.ListTables(context.TODO(),
DatabaseIdentifier(dbName)) {
+ assert.NoError(err)
+ tables = append(tables, tblIdent)
+ }
+ assert.Len(tables, 1)
+ assert.Equal([]string{dbName, tableName}, tables[0])
+
+ // Load table
+ loadedTable, err := cat.LoadTable(context.TODO(),
TableIdentifier(dbName, tableName))
+ assert.NoError(err)
+ assert.NotNil(loadedTable)
+ assert.True(schema.Equals(loadedTable.Schema()))
+}
+
+func TestHiveIntegrationRenameTable(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+ oldTableName := "old_table"
+ newTableName := "new_table"
+
+ // Create namespace
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ // Create table
+ schema := iceberg.NewSchemaWithIdentifiers(0, []int{1},
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ tableLocation := getTestTableLocation() + "/" + dbName + "/" +
oldTableName
+ _, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName,
oldTableName), schema,
+ catalog.WithLocation(tableLocation),
+ )
+ assert.NoError(err)
+
+ // Rename table
+ renamedTable, err := cat.RenameTable(context.TODO(),
+ TableIdentifier(dbName, oldTableName),
+ TableIdentifier(dbName, newTableName),
+ )
+ assert.NoError(err)
+ assert.NotNil(renamedTable)
+ defer cat.DropTable(context.TODO(), TableIdentifier(dbName,
newTableName))
+
+ // Verify old table doesn't exist
+ exists, err := cat.CheckTableExists(context.TODO(),
TableIdentifier(dbName, oldTableName))
+ assert.NoError(err)
+ assert.False(exists)
+
+ // Verify new table exists
+ exists, err = cat.CheckTableExists(context.TODO(),
TableIdentifier(dbName, newTableName))
+ assert.NoError(err)
+ assert.True(exists)
+}
+
+func TestHiveIntegrationDropTable(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+ tableName := "table_to_drop"
+
+ // Create namespace
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ // Create table
+ schema := iceberg.NewSchemaWithIdentifiers(0, []int{1},
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ tableLocation := getTestTableLocation() + "/" + dbName + "/" + tableName
+ _, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName,
tableName), schema,
+ catalog.WithLocation(tableLocation),
+ )
+ assert.NoError(err)
+
+ // Verify table exists
+ exists, err := cat.CheckTableExists(context.TODO(),
TableIdentifier(dbName, tableName))
+ assert.NoError(err)
+ assert.True(exists)
+
+ // Drop table
+ err = cat.DropTable(context.TODO(), TableIdentifier(dbName, tableName))
+ assert.NoError(err)
+
+ // Verify table is gone
+ exists, err = cat.CheckTableExists(context.TODO(),
TableIdentifier(dbName, tableName))
+ assert.NoError(err)
+ assert.False(exists)
+}
diff --git a/catalog/hive/hive_test.go b/catalog/hive/hive_test.go
new file mode 100644
index 00000000..4f37307a
--- /dev/null
+++ b/catalog/hive/hive_test.go
@@ -0,0 +1,758 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+ "context"
+ "errors"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
+ "github.com/beltran/gohive/hive_metastore"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+)
+
+type mockHiveClient struct {
+ mock.Mock
+}
+
+func (m *mockHiveClient) GetDatabase(ctx context.Context, name string)
(*hive_metastore.Database, error) {
+ args := m.Called(ctx, name)
+ if args.Get(0) == nil {
+ return nil, args.Error(1)
+ }
+
+ return args.Get(0).(*hive_metastore.Database), args.Error(1)
+}
+
+func (m *mockHiveClient) GetAllDatabases(ctx context.Context) ([]string,
error) {
+ args := m.Called(ctx)
+
+ return args.Get(0).([]string), args.Error(1)
+}
+
+func (m *mockHiveClient) CreateDatabase(ctx context.Context, database
*hive_metastore.Database) error {
+ args := m.Called(ctx, database)
+
+ return args.Error(0)
+}
+
+func (m *mockHiveClient) DropDatabase(ctx context.Context, name string,
deleteData, cascade bool) error {
+ args := m.Called(ctx, name, deleteData, cascade)
+
+ return args.Error(0)
+}
+
+func (m *mockHiveClient) AlterDatabase(ctx context.Context, name string,
database *hive_metastore.Database) error {
+ args := m.Called(ctx, name, database)
+
+ return args.Error(0)
+}
+
+func (m *mockHiveClient) GetTable(ctx context.Context, dbName, tableName
string) (*hive_metastore.Table, error) {
+ args := m.Called(ctx, dbName, tableName)
+ if args.Get(0) == nil {
+ return nil, args.Error(1)
+ }
+
+ return args.Get(0).(*hive_metastore.Table), args.Error(1)
+}
+
+func (m *mockHiveClient) GetTables(ctx context.Context, dbName, pattern
string) ([]string, error) {
+ args := m.Called(ctx, dbName, pattern)
+
+ return args.Get(0).([]string), args.Error(1)
+}
+
+func (m *mockHiveClient) CreateTable(ctx context.Context, tbl
*hive_metastore.Table) error {
+ args := m.Called(ctx, tbl)
+
+ return args.Error(0)
+}
+
+func (m *mockHiveClient) DropTable(ctx context.Context, dbName, tableName
string, deleteData bool) error {
+ args := m.Called(ctx, dbName, tableName, deleteData)
+
+ return args.Error(0)
+}
+
+func (m *mockHiveClient) AlterTable(ctx context.Context, dbName, tableName
string, newTable *hive_metastore.Table) error {
+ args := m.Called(ctx, dbName, tableName, newTable)
+
+ return args.Error(0)
+}
+
+func (m *mockHiveClient) Lock(ctx context.Context, request
*hive_metastore.LockRequest) (*hive_metastore.LockResponse, error) {
+ args := m.Called(ctx, request)
+ if args.Get(0) == nil {
+ return nil, args.Error(1)
+ }
+
+ return args.Get(0).(*hive_metastore.LockResponse), args.Error(1)
+}
+
+func (m *mockHiveClient) CheckLock(ctx context.Context, lockId int64)
(*hive_metastore.LockResponse, error) {
+ args := m.Called(ctx, lockId)
+ if args.Get(0) == nil {
+ return nil, args.Error(1)
+ }
+
+ return args.Get(0).(*hive_metastore.LockResponse), args.Error(1)
+}
+
+func (m *mockHiveClient) Unlock(ctx context.Context, lockId int64) error {
+ args := m.Called(ctx, lockId)
+
+ return args.Error(0)
+}
+
+func (m *mockHiveClient) Close() error {
+ args := m.Called()
+
+ return args.Error(0)
+}
+
+// Test data
+
+var testIcebergHiveTable1 = &hive_metastore.Table{
+ TableName: "test_table",
+ DbName: "test_database",
+ TableType: TableTypeExternalTable,
+ Parameters: map[string]string{
+ TableTypeKey: TableTypeIceberg,
+ MetadataLocationKey:
"s3://test-bucket/test_table/metadata/abc123-123.metadata.json",
+ },
+ Sd: &hive_metastore.StorageDescriptor{
+ Location: "s3://test-bucket/test_table",
+ },
+}
+
+var testIcebergHiveTable2 = &hive_metastore.Table{
+ TableName: "test_table2",
+ DbName: "test_database",
+ TableType: TableTypeExternalTable,
+ Parameters: map[string]string{
+ TableTypeKey: TableTypeIceberg,
+ MetadataLocationKey:
"s3://test-bucket/test_table2/metadata/abc456-456.metadata.json",
+ },
+ Sd: &hive_metastore.StorageDescriptor{
+ Location: "s3://test-bucket/test_table2",
+ },
+}
+
+var testNonIcebergHiveTable = &hive_metastore.Table{
+ TableName: "other_table",
+ DbName: "test_database",
+ TableType: TableTypeExternalTable,
+ Parameters: map[string]string{
+ "some_param": "some_value",
+ },
+}
+
+var testSchema = iceberg.NewSchemaWithIdentifiers(0, []int{},
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Bool})
+
+// Error helpers for mocking
+var (
+ errNoSuchObject = errors.New("NoSuchObjectException: object not
found")
+ errAlreadyExists = errors.New("AlreadyExistsException: object
already exists")
+ errInvalidOperation = errors.New("InvalidOperationException: Database
is not empty")
+)
+
+// Tests
+
+func TestHiveListTables(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTables", mock.Anything, "test_database", "*").
+ Return([]string{"test_table", "test_table2", "other_table"},
nil).Once()
+
+ // Mock individual GetTable calls for each table
+ mockClient.On("GetTable", mock.Anything, "test_database", "test_table").
+ Return(testIcebergHiveTable1, nil).Once()
+ mockClient.On("GetTable", mock.Anything, "test_database",
"test_table2").
+ Return(testIcebergHiveTable2, nil).Once()
+ mockClient.On("GetTable", mock.Anything, "test_database",
"other_table").
+ Return(testNonIcebergHiveTable, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ var lastErr error
+ tbls := make([]table.Identifier, 0)
+ iter := hiveCatalog.ListTables(context.TODO(),
DatabaseIdentifier("test_database"))
+
+ for tbl, err := range iter {
+ if err != nil {
+ lastErr = err
+
+ break
+ }
+ tbls = append(tbls, tbl)
+ }
+
+ assert.NoError(lastErr)
+ assert.Len(tbls, 2) // Only Iceberg tables
+ assert.Contains(tbls, table.Identifier{"test_database", "test_table"})
+ assert.Contains(tbls, table.Identifier{"test_database", "test_table2"})
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveListTablesEmpty(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTables", mock.Anything, "empty_database", "*").
+ Return([]string{}, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ var lastErr error
+ tbls := make([]table.Identifier, 0)
+ iter := hiveCatalog.ListTables(context.TODO(),
DatabaseIdentifier("empty_database"))
+
+ for tbl, err := range iter {
+ if err != nil {
+ lastErr = err
+
+ break
+ }
+ tbls = append(tbls, tbl)
+ }
+
+ assert.NoError(lastErr)
+ assert.Len(tbls, 0)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveListNamespaces(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetAllDatabases", mock.Anything).
+ Return([]string{"database1", "database2", "database3"},
nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ namespaces, err := hiveCatalog.ListNamespaces(context.TODO(), nil)
+ assert.NoError(err)
+ assert.Len(namespaces, 3)
+ assert.Equal([]string{"database1"}, namespaces[0])
+ assert.Equal([]string{"database2"}, namespaces[1])
+ assert.Equal([]string{"database3"}, namespaces[2])
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveListNamespacesHierarchicalError(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ _, err := hiveCatalog.ListNamespaces(context.TODO(), []string{"parent"})
+ assert.Error(err)
+ assert.Contains(err.Error(), "hierarchical namespace is not supported")
+}
+
+func TestHiveCreateNamespace(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("CreateDatabase", mock.Anything, mock.MatchedBy(func(db
*hive_metastore.Database) bool {
+ return db.Name == "new_database" &&
+ db.Description == "Test Description" &&
+ db.LocationUri == "s3://test-location"
+ })).Return(nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ props := map[string]string{
+ "comment": "Test Description",
+ "location": "s3://test-location",
+ }
+
+ err := hiveCatalog.CreateNamespace(context.TODO(),
DatabaseIdentifier("new_database"), props)
+ assert.NoError(err)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCreateNamespaceAlreadyExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("CreateDatabase", mock.Anything, mock.Anything).
+ Return(errAlreadyExists).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ err := hiveCatalog.CreateNamespace(context.TODO(),
DatabaseIdentifier("existing_database"), nil)
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrNamespaceAlreadyExists))
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropNamespace(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetDatabase", mock.Anything, "test_namespace").
+ Return(&hive_metastore.Database{Name: "test_namespace"},
nil).Once()
+
+ mockClient.On("DropDatabase", mock.Anything, "test_namespace", false,
false).
+ Return(nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ err := hiveCatalog.DropNamespace(context.TODO(),
DatabaseIdentifier("test_namespace"))
+ assert.NoError(err)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropNamespaceNotExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetDatabase", mock.Anything, "nonexistent").
+ Return(nil, errNoSuchObject).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ err := hiveCatalog.DropNamespace(context.TODO(),
DatabaseIdentifier("nonexistent"))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrNoSuchNamespace))
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropNamespaceNotEmpty(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetDatabase", mock.Anything, "nonempty_db").
+ Return(&hive_metastore.Database{Name: "nonempty_db"},
nil).Once()
+
+ mockClient.On("DropDatabase", mock.Anything, "nonempty_db", false,
false).
+ Return(errInvalidOperation).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ err := hiveCatalog.DropNamespace(context.TODO(),
DatabaseIdentifier("nonempty_db"))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrNamespaceNotEmpty))
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckNamespaceExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetDatabase", mock.Anything, "existing_db").
+ Return(&hive_metastore.Database{Name: "existing_db"},
nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ exists, err := hiveCatalog.CheckNamespaceExists(context.TODO(),
DatabaseIdentifier("existing_db"))
+ assert.NoError(err)
+ assert.True(exists)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckNamespaceNotExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetDatabase", mock.Anything, "nonexistent_db").
+ Return(nil, errNoSuchObject).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ exists, err := hiveCatalog.CheckNamespaceExists(context.TODO(),
DatabaseIdentifier("nonexistent_db"))
+ assert.NoError(err)
+ assert.False(exists)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveLoadNamespaceProperties(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetDatabase", mock.Anything, "test_db").
+ Return(&hive_metastore.Database{
+ Name: "test_db",
+ Description: "Test database",
+ LocationUri: "s3://test-bucket/test_db",
+ Parameters: map[string]string{
+ "custom_param": "custom_value",
+ },
+ }, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ props, err := hiveCatalog.LoadNamespaceProperties(context.TODO(),
DatabaseIdentifier("test_db"))
+ assert.NoError(err)
+ assert.Equal("s3://test-bucket/test_db", props["location"])
+ assert.Equal("Test database", props["comment"])
+ assert.Equal("custom_value", props["custom_param"])
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckTableExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database", "test_table").
+ Return(testIcebergHiveTable1, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ exists, err := hiveCatalog.CheckTableExists(context.TODO(),
TableIdentifier("test_database", "test_table"))
+ assert.NoError(err)
+ assert.True(exists)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckTableNotExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database",
"nonexistent").
+ Return(nil, errNoSuchObject).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ exists, err := hiveCatalog.CheckTableExists(context.TODO(),
TableIdentifier("test_database", "nonexistent"))
+ assert.NoError(err)
+ assert.False(exists)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckTableExistsNonIceberg(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database",
"other_table").
+ Return(testNonIcebergHiveTable, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ exists, err := hiveCatalog.CheckTableExists(context.TODO(),
TableIdentifier("test_database", "other_table"))
+ assert.NoError(err)
+ assert.False(exists) // Non-Iceberg table should return false
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropTable(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database", "test_table").
+ Return(testIcebergHiveTable1, nil).Once()
+
+ mockClient.On("DropTable", mock.Anything, "test_database",
"test_table", false).
+ Return(nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ err := hiveCatalog.DropTable(context.TODO(),
TableIdentifier("test_database", "test_table"))
+ assert.NoError(err)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropTableNotExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database",
"nonexistent").
+ Return(nil, errNoSuchObject).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ err := hiveCatalog.DropTable(context.TODO(),
TableIdentifier("test_database", "nonexistent"))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrNoSuchTable))
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveDropTableNonIceberg(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database",
"other_table").
+ Return(testNonIcebergHiveTable, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ err := hiveCatalog.DropTable(context.TODO(),
TableIdentifier("test_database", "other_table"))
+ assert.Error(err)
+ assert.Contains(err.Error(), "is not an Iceberg table")
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCatalogType(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ assert.Equal(catalog.Hive, hiveCatalog.CatalogType())
+}
+
+func TestIsIcebergTable(t *testing.T) {
+ tests := []struct {
+ name string
+ table *hive_metastore.Table
+ expected bool
+ }{
+ {
+ name: "iceberg table uppercase",
+ table: testIcebergHiveTable1,
+ expected: true,
+ },
+ {
+ name: "iceberg table lowercase",
+ table: &hive_metastore.Table{
+ Parameters: map[string]string{TableTypeKey:
"iceberg"},
+ },
+ expected: true,
+ },
+ {
+ name: "iceberg table mixed case",
+ table: &hive_metastore.Table{
+ Parameters: map[string]string{TableTypeKey:
"IcEbErG"},
+ },
+ expected: true,
+ },
+ {
+ name: "non-iceberg table",
+ table: testNonIcebergHiveTable,
+ expected: false,
+ },
+ {
+ name: "nil table",
+ table: nil,
+ expected: false,
+ },
+ {
+ name: "table without parameters",
+ table: &hive_metastore.Table{},
+ expected: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert := require.New(t)
+ assert.Equal(tt.expected, isIcebergTable(tt.table))
+ })
+ }
+}
+
+func TestIcebergTypeToHiveType(t *testing.T) {
+ tests := []struct {
+ name string
+ icebergType iceberg.Type
+ expectedHive string
+ }{
+ {"boolean", iceberg.PrimitiveTypes.Bool, "boolean"},
+ {"int32", iceberg.PrimitiveTypes.Int32, "int"},
+ {"int64", iceberg.PrimitiveTypes.Int64, "bigint"},
+ {"float32", iceberg.PrimitiveTypes.Float32, "float"},
+ {"float64", iceberg.PrimitiveTypes.Float64, "double"},
+ {"date", iceberg.PrimitiveTypes.Date, "date"},
+ {"time", iceberg.PrimitiveTypes.Time, "string"},
+ {"timestamp", iceberg.PrimitiveTypes.Timestamp, "timestamp"},
+ {"timestamptz", iceberg.PrimitiveTypes.TimestampTz,
"timestamp"},
+ {"string", iceberg.PrimitiveTypes.String, "string"},
+ {"uuid", iceberg.PrimitiveTypes.UUID, "string"},
+ {"binary", iceberg.PrimitiveTypes.Binary, "binary"},
+ {"decimal", iceberg.DecimalTypeOf(10, 2), "decimal(10,2)"},
+ {"fixed", iceberg.FixedTypeOf(16), "binary(16)"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert := require.New(t)
+ assert.Equal(tt.expectedHive,
icebergTypeToHiveType(tt.icebergType))
+ })
+ }
+}
+
+func TestSchemaToHiveColumns(t *testing.T) {
+ assert := require.New(t)
+
+ columns := schemaToHiveColumns(testSchema)
+ assert.Len(columns, 3)
+
+ // Check first column
+ assert.Equal("foo", columns[0].Name)
+ assert.Equal("string", columns[0].Type)
+
+ // Check second column
+ assert.Equal("bar", columns[1].Name)
+ assert.Equal("int", columns[1].Type)
+
+ // Check third column
+ assert.Equal("baz", columns[2].Name)
+ assert.Equal("boolean", columns[2].Type)
+}
+
+func TestUpdateNamespaceProperties(t *testing.T) {
+ tests := []struct {
+ name string
+ initial map[string]string
+ updates map[string]string
+ removals []string
+ expected catalog.PropertiesUpdateSummary
+ shouldError bool
+ }{
+ {
+ name: "Overlapping removals and updates",
+ initial: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ },
+ updates: map[string]string{
+ "key1": "new_value1",
+ },
+ removals: []string{"key1"},
+ shouldError: true,
+ },
+ {
+ name: "Happy path with updates and removals",
+ initial: map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ "key4": "value4",
+ },
+ updates: map[string]string{
+ "key2": "new_value2",
+ },
+ removals: []string{"key4"},
+ expected: catalog.PropertiesUpdateSummary{
+ Removed: []string{"key4"},
+ Updated: []string{"key2"},
+ Missing: []string{},
+ },
+ shouldError: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetDatabase", mock.Anything,
"test_namespace").
+ Return(&hive_metastore.Database{
+ Name: "test_namespace",
+ Parameters: tt.initial,
+ }, nil).Once()
+
+ if !tt.shouldError {
+ mockClient.On("AlterDatabase", mock.Anything,
"test_namespace", mock.Anything).
+ Return(nil).Once()
+ }
+
+ hiveCatalog := NewCatalogWithClient(mockClient,
iceberg.Properties{})
+
+ summary, err :=
hiveCatalog.UpdateNamespaceProperties(context.TODO(),
DatabaseIdentifier("test_namespace"), tt.removals, tt.updates)
+ if tt.shouldError {
+ assert.Error(err)
+ } else {
+ assert.NoError(err)
+ assert.ElementsMatch(tt.expected.Removed,
summary.Removed)
+ assert.ElementsMatch(tt.expected.Updated,
summary.Updated)
+ assert.ElementsMatch(tt.expected.Missing,
summary.Missing)
+ }
+ })
+ }
+}
+
+func TestIdentifierValidation(t *testing.T) {
+ t.Run("valid table identifier", func(t *testing.T) {
+ assert := require.New(t)
+ db, tbl, err := identifierToTableName([]string{"database",
"table"})
+ assert.NoError(err)
+ assert.Equal("database", db)
+ assert.Equal("table", tbl)
+ })
+
+ t.Run("invalid table identifier - too short", func(t *testing.T) {
+ assert := require.New(t)
+ _, _, err := identifierToTableName([]string{"database"})
+ assert.Error(err)
+ })
+
+ t.Run("invalid table identifier - too long", func(t *testing.T) {
+ assert := require.New(t)
+ _, _, err := identifierToTableName([]string{"a", "b", "c"})
+ assert.Error(err)
+ })
+
+ t.Run("valid database identifier", func(t *testing.T) {
+ assert := require.New(t)
+ db, err := identifierToDatabase([]string{"database"})
+ assert.NoError(err)
+ assert.Equal("database", db)
+ })
+
+ t.Run("invalid database identifier", func(t *testing.T) {
+ assert := require.New(t)
+ _, err := identifierToDatabase([]string{"a", "b"})
+ assert.Error(err)
+ })
+}
diff --git a/catalog/hive/lock.go b/catalog/hive/lock.go
new file mode 100644
index 00000000..51f56505
--- /dev/null
+++ b/catalog/hive/lock.go
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math"
+ "time"
+
+ "github.com/beltran/gohive/hive_metastore"
+)
+
+// ErrLockAcquisitionFailed is returned when a lock cannot be acquired after
all retries.
+var ErrLockAcquisitionFailed = errors.New("failed to acquire lock")
+
+type HiveLock struct {
+ client HiveClient
+ lockId int64
+}
+
+func acquireLock(ctx context.Context, client HiveClient, database, tableName
string, opts *HiveOptions) (*HiveLock, error) {
+ lockReq := &hive_metastore.LockRequest{
+ Component: []*hive_metastore.LockComponent{{
+ Type: hive_metastore.LockType_EXCLUSIVE,
+ Level: hive_metastore.LockLevel_TABLE,
+ Dbname: database,
+ Tablename: &tableName,
+ }},
+ }
+
+ lockResp, err := client.Lock(ctx, lockReq)
+ if err != nil {
+ return nil, fmt.Errorf("failed to request lock: %w", err)
+ }
+
+ if lockResp.State == hive_metastore.LockState_ACQUIRED {
+ return &HiveLock{
+ client: client,
+ lockId: lockResp.Lockid,
+ }, nil
+ }
+
+ // If not acquired immediately, wait and retry
+ for attempt := 0; attempt < opts.LockRetries; attempt++ {
+ // Wait before checking again
+ waitTime := calculateBackoff(attempt, opts.LockMinWaitTime,
opts.LockMaxWaitTime)
+
+ select {
+ case <-ctx.Done():
+ _ = client.Unlock(ctx, lockResp.Lockid)
+
+ return nil, ctx.Err()
+ case <-time.After(waitTime):
+ }
+
+ // Check lock state
+ checkResp, err := client.CheckLock(ctx, lockResp.Lockid)
+ if err != nil {
+ _ = client.Unlock(ctx, lockResp.Lockid)
+
+ return nil, fmt.Errorf("failed to check lock status:
%w", err)
+ }
+
+ switch checkResp.State {
+ case hive_metastore.LockState_ACQUIRED:
+ return &HiveLock{
+ client: client,
+ lockId: lockResp.Lockid,
+ }, nil
+ case hive_metastore.LockState_WAITING:
+ // Continue waiting
+ continue
+ case hive_metastore.LockState_ABORT:
+ return nil, fmt.Errorf("%w: lock was aborted",
ErrLockAcquisitionFailed)
+ case hive_metastore.LockState_NOT_ACQUIRED:
+ return nil, fmt.Errorf("%w: lock not acquired",
ErrLockAcquisitionFailed)
+ default:
+ return nil, fmt.Errorf("%w: unexpected lock state: %v",
ErrLockAcquisitionFailed, checkResp.State)
+ }
+ }
+
+ _ = client.Unlock(ctx, lockResp.Lockid)
+
+ return nil, fmt.Errorf("%w: exhausted %d retries for table %s.%s",
ErrLockAcquisitionFailed, opts.LockRetries, database, tableName)
+}
+
+func calculateBackoff(attempt int, minWait, maxWait time.Duration)
time.Duration {
+ wait := time.Duration(float64(minWait) * math.Pow(2, float64(attempt)))
+ if wait > maxWait {
+ wait = maxWait
+ }
+
+ return wait
+}
+
+func (l *HiveLock) Release(ctx context.Context) error {
+ return l.client.Unlock(ctx, l.lockId)
+}
+
+func (l *HiveLock) LockID() int64 {
+ if l == nil {
+ return 0
+ }
+
+ return l.lockId
+}
diff --git a/catalog/hive/lock_test.go b/catalog/hive/lock_test.go
new file mode 100644
index 00000000..f61a8079
--- /dev/null
+++ b/catalog/hive/lock_test.go
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/beltran/gohive/hive_metastore"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+)
+
+func TestAcquireLockImmediateSuccess(t *testing.T) {
+ mockClient := new(mockHiveClient)
+ ctx := context.Background()
+ opts := NewHiveOptions()
+
+ mockClient.On("Lock", ctx,
mock.AnythingOfType("*hive_metastore.LockRequest")).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 123,
+ State: hive_metastore.LockState_ACQUIRED,
+ }, nil)
+
+ lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+ require.NoError(t, err)
+ require.NotNil(t, lock)
+ assert.Equal(t, int64(123), lock.LockID())
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockWithRetry(t *testing.T) {
+ mockClient := new(mockHiveClient)
+ ctx := context.Background()
+ opts := NewHiveOptions()
+ opts.LockMinWaitTime = 1 * time.Millisecond // Fast retries for testing
+ opts.LockMaxWaitTime = 10 * time.Millisecond
+ opts.LockRetries = 3
+
+ // Lock request returns WAITING initially
+ mockClient.On("Lock", ctx,
mock.AnythingOfType("*hive_metastore.LockRequest")).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 456,
+ State: hive_metastore.LockState_WAITING,
+ }, nil)
+
+ // First CheckLock returns WAITING
+ mockClient.On("CheckLock", ctx, int64(456)).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 456,
+ State: hive_metastore.LockState_WAITING,
+ }, nil).Once()
+
+ // Second CheckLock returns ACQUIRED
+ mockClient.On("CheckLock", ctx, int64(456)).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 456,
+ State: hive_metastore.LockState_ACQUIRED,
+ }, nil).Once()
+
+ lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+ require.NoError(t, err)
+ require.NotNil(t, lock)
+ assert.Equal(t, int64(456), lock.LockID())
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockExhaustsRetries(t *testing.T) {
+ mockClient := new(mockHiveClient)
+ ctx := context.Background()
+ opts := NewHiveOptions()
+ opts.LockMinWaitTime = 1 * time.Millisecond
+ opts.LockMaxWaitTime = 10 * time.Millisecond
+ opts.LockRetries = 2
+
+ // Lock request returns WAITING
+ mockClient.On("Lock", ctx,
mock.AnythingOfType("*hive_metastore.LockRequest")).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 789,
+ State: hive_metastore.LockState_WAITING,
+ }, nil)
+
+ // All CheckLock calls return WAITING
+ mockClient.On("CheckLock", ctx, int64(789)).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 789,
+ State: hive_metastore.LockState_WAITING,
+ }, nil)
+
+ mockClient.On("Unlock", ctx, int64(789)).Return(nil)
+
+ lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+ require.Error(t, err)
+ require.Nil(t, lock)
+ assert.ErrorIs(t, err, ErrLockAcquisitionFailed)
+ assert.Contains(t, err.Error(), "exhausted 2 retries")
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockAborted(t *testing.T) {
+ mockClient := new(mockHiveClient)
+ ctx := context.Background()
+ opts := NewHiveOptions()
+ opts.LockMinWaitTime = 1 * time.Millisecond
+
+ // Lock request returns WAITING
+ mockClient.On("Lock", ctx,
mock.AnythingOfType("*hive_metastore.LockRequest")).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 111,
+ State: hive_metastore.LockState_WAITING,
+ }, nil)
+
+ // CheckLock returns ABORT
+ mockClient.On("CheckLock", ctx, int64(111)).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 111,
+ State: hive_metastore.LockState_ABORT,
+ }, nil)
+
+ lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+ require.Error(t, err)
+ require.Nil(t, lock)
+ assert.ErrorIs(t, err, ErrLockAcquisitionFailed)
+ assert.Contains(t, err.Error(), "aborted")
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockRequestFails(t *testing.T) {
+ mockClient := new(mockHiveClient)
+ ctx := context.Background()
+ opts := NewHiveOptions()
+
+ // Lock request fails
+ mockClient.On("Lock", ctx,
mock.AnythingOfType("*hive_metastore.LockRequest")).
+ Return(nil, errors.New("connection failed"))
+
+ lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+ require.Error(t, err)
+ require.Nil(t, lock)
+ assert.Contains(t, err.Error(), "failed to request lock")
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockCheckFails(t *testing.T) {
+ mockClient := new(mockHiveClient)
+ ctx := context.Background()
+ opts := NewHiveOptions()
+ opts.LockMinWaitTime = 1 * time.Millisecond
+
+ // Lock request returns WAITING
+ mockClient.On("Lock", ctx,
mock.AnythingOfType("*hive_metastore.LockRequest")).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 222,
+ State: hive_metastore.LockState_WAITING,
+ }, nil)
+
+ // CheckLock fails
+ mockClient.On("CheckLock", ctx, int64(222)).
+ Return(nil, errors.New("check failed"))
+
+ // Lock should be released on error
+ mockClient.On("Unlock", ctx, int64(222)).Return(nil)
+
+ lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+ require.Error(t, err)
+ require.Nil(t, lock)
+ assert.Contains(t, err.Error(), "failed to check lock status")
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestAcquireLockContextCancelled(t *testing.T) {
+ mockClient := new(mockHiveClient)
+ ctx, cancel := context.WithCancel(context.Background())
+ opts := NewHiveOptions()
+ opts.LockMinWaitTime = 100 * time.Millisecond // Longer wait so we can
cancel
+
+ // Lock request returns WAITING
+ mockClient.On("Lock", ctx,
mock.AnythingOfType("*hive_metastore.LockRequest")).
+ Return(&hive_metastore.LockResponse{
+ Lockid: 333,
+ State: hive_metastore.LockState_WAITING,
+ }, nil)
+
+ // Lock should be released when context is cancelled
+ mockClient.On("Unlock", ctx, int64(333)).Return(nil)
+
+ // Cancel context before the wait completes
+ go func() {
+ time.Sleep(10 * time.Millisecond)
+ cancel()
+ }()
+
+ lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts)
+
+ require.Error(t, err)
+ require.Nil(t, lock)
+ assert.ErrorIs(t, err, context.Canceled)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestReleaseLock(t *testing.T) {
+ mockClient := new(mockHiveClient)
+ ctx := context.Background()
+
+ lock := &HiveLock{
+ client: mockClient,
+ lockId: 999,
+ }
+
+ mockClient.On("Unlock", ctx, int64(999)).Return(nil)
+
+ err := lock.Release(ctx)
+
+ require.NoError(t, err)
+ mockClient.AssertExpectations(t)
+}
+
+func TestCalculateBackoff(t *testing.T) {
+ minWait := 100 * time.Millisecond
+ maxWait := 1 * time.Second
+
+ tests := []struct {
+ attempt int
+ expected time.Duration
+ }{
+ {0, 100 * time.Millisecond}, // 100ms * 2^0 = 100ms
+ {1, 200 * time.Millisecond}, // 100ms * 2^1 = 200ms
+ {2, 400 * time.Millisecond}, // 100ms * 2^2 = 400ms
+ {3, 800 * time.Millisecond}, // 100ms * 2^3 = 800ms
+ {4, 1 * time.Second}, // 100ms * 2^4 = 1.6s, capped at 1s
+ {10, 1 * time.Second}, // Capped at maxWait
+ }
+
+ for _, tt := range tests {
+ t.Run("", func(t *testing.T) {
+ result := calculateBackoff(tt.attempt, minWait, maxWait)
+ assert.Equal(t, tt.expected, result)
+ })
+ }
+}
+
+func TestLockConfigurationParsing(t *testing.T) {
+ props := map[string]string{
+ LockCheckMinWaitTime: "200ms",
+ LockCheckMaxWaitTime: "30s",
+ LockCheckRetries: "5",
+ }
+
+ opts := NewHiveOptions()
+ opts.ApplyProperties(props)
+
+ assert.Equal(t, 200*time.Millisecond, opts.LockMinWaitTime)
+ assert.Equal(t, 30*time.Second, opts.LockMaxWaitTime)
+ assert.Equal(t, 5, opts.LockRetries)
+}
+
+func TestLockConfigurationDefaults(t *testing.T) {
+ opts := NewHiveOptions()
+
+ assert.Equal(t, DefaultLockCheckMinWaitTime, opts.LockMinWaitTime)
+ assert.Equal(t, DefaultLockCheckMaxWaitTime, opts.LockMaxWaitTime)
+ assert.Equal(t, DefaultLockCheckRetries, opts.LockRetries)
+}
diff --git a/catalog/hive/options.go b/catalog/hive/options.go
new file mode 100644
index 00000000..6681676a
--- /dev/null
+++ b/catalog/hive/options.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+ "strconv"
+ "time"
+
+ "github.com/apache/iceberg-go"
+)
+
+const (
+ // URI is the Thrift URI for the Hive Metastore (e.g.,
"thrift://localhost:9083")
+ URI = "uri"
+
+ // Warehouse is the default warehouse location for tables
+ Warehouse = "warehouse"
+
+ TableTypeKey = "table_type"
+ TableTypeIceberg = "ICEBERG"
+ TableTypeExternalTable = "EXTERNAL_TABLE"
+ MetadataLocationKey = "metadata_location"
+ PreviousMetadataLocationKey = "previous_metadata_location"
+ ExternalKey = "EXTERNAL"
+
+ // Lock configuration property keys
+ LockCheckMinWaitTime = "lock-check-min-wait-time"
+ LockCheckMaxWaitTime = "lock-check-max-wait-time"
+ LockCheckRetries = "lock-check-retries"
+
+ // Default lock configuration values
+ DefaultLockCheckMinWaitTime = 100 * time.Millisecond // 100ms
+ DefaultLockCheckMaxWaitTime = 60 * time.Second // 1 minute
+ DefaultLockCheckRetries = 4
+)
+
+type HiveOptions struct {
+ URI string
+ Warehouse string
+ props iceberg.Properties
+
+ // Lock configuration for atomic commits
+ LockMinWaitTime time.Duration
+ LockMaxWaitTime time.Duration
+ LockRetries int
+}
+
+func NewHiveOptions() *HiveOptions {
+ return &HiveOptions{
+ props: iceberg.Properties{},
+ LockMinWaitTime: DefaultLockCheckMinWaitTime,
+ LockMaxWaitTime: DefaultLockCheckMaxWaitTime,
+ LockRetries: DefaultLockCheckRetries,
+ }
+}
+
+func (o *HiveOptions) ApplyProperties(props iceberg.Properties) {
+ o.props = props
+
+ if uri, ok := props[URI]; ok {
+ o.URI = uri
+ }
+ if warehouse, ok := props[Warehouse]; ok {
+ o.Warehouse = warehouse
+ }
+
+ // Parse lock configuration
+ if val, ok := props[LockCheckMinWaitTime]; ok {
+ if d, err := time.ParseDuration(val); err == nil {
+ o.LockMinWaitTime = d
+ }
+ }
+ if val, ok := props[LockCheckMaxWaitTime]; ok {
+ if d, err := time.ParseDuration(val); err == nil {
+ o.LockMaxWaitTime = d
+ }
+ }
+ if val, ok := props[LockCheckRetries]; ok {
+ if i, err := strconv.Atoi(val); err == nil {
+ o.LockRetries = i
+ }
+ }
+}
+
+type Option func(*HiveOptions)
+
+// WithURI sets the Thrift URI for the Hive Metastore.
+func WithURI(uri string) Option {
+ return func(o *HiveOptions) {
+ o.URI = uri
+ }
+}
+
+func WithWarehouse(warehouse string) Option {
+ return func(o *HiveOptions) {
+ o.Warehouse = warehouse
+ }
+}
+
+func WithProperties(props iceberg.Properties) Option {
+ return func(o *HiveOptions) {
+ o.props = props
+ }
+}
diff --git a/catalog/hive/schema.go b/catalog/hive/schema.go
new file mode 100644
index 00000000..9d519596
--- /dev/null
+++ b/catalog/hive/schema.go
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package hive
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+ "github.com/beltran/gohive/hive_metastore"
+)
+
+func schemaToHiveColumns(schema *iceberg.Schema) []*hive_metastore.FieldSchema
{
+ columns := make([]*hive_metastore.FieldSchema, 0, len(schema.Fields()))
+ for _, field := range schema.Fields() {
+ columns = append(columns, fieldToHiveColumn(field))
+ }
+
+ return columns
+}
+
+func fieldToHiveColumn(field iceberg.NestedField) *hive_metastore.FieldSchema {
+ return &hive_metastore.FieldSchema{
+ Name: field.Name,
+ Type: icebergTypeToHiveType(field.Type),
+ Comment: field.Doc,
+ }
+}
+
+// icebergTypeToHiveType converts an Iceberg type to a Hive type string.
+// Reference:
https://cwiki.apache.org/confluence/display/hive/languagemanual+types
+func icebergTypeToHiveType(typ iceberg.Type) string {
+ switch t := typ.(type) {
+ case iceberg.Int64Type:
+ return "bigint"
+ case iceberg.TimeType:
+ // Hive doesn't have a native time type, use string
+ return "string"
+ case iceberg.TimestampTzType:
+ return "timestamp"
+ case iceberg.UUIDType:
+ // Represent UUID as string
+ return "string"
+ case iceberg.DecimalType:
+ return fmt.Sprintf("decimal(%d,%d)", t.Precision(), t.Scale())
+ case iceberg.FixedType:
+ return fmt.Sprintf("binary(%d)", t.Len())
+ case *iceberg.StructType:
+ var fieldStrings []string
+ for _, field := range t.Fields() {
+ fieldStrings = append(fieldStrings,
+ fmt.Sprintf("%s:%s", field.Name,
icebergTypeToHiveType(field.Type)))
+ }
+
+ return fmt.Sprintf("struct<%s>", strings.Join(fieldStrings,
","))
+ case *iceberg.ListType:
+ return fmt.Sprintf("array<%s>",
icebergTypeToHiveType(t.ElementField().Type))
+ case *iceberg.MapType:
+ keyField := t.KeyField()
+ valueField := t.ValueField()
+
+ return fmt.Sprintf("map<%s,%s>",
+ icebergTypeToHiveType(keyField.Type),
+ icebergTypeToHiveType(valueField.Type))
+ default:
+ return typ.String()
+ }
+}
+
+func constructHiveTable(dbName, tableName, location, metadataLocation string,
schema *iceberg.Schema, props map[string]string) *hive_metastore.Table {
+ parameters := make(map[string]string)
+
+ // Set Iceberg-specific parameters
+ parameters[TableTypeKey] = TableTypeIceberg
+ parameters[MetadataLocationKey] = metadataLocation
+ parameters[ExternalKey] = "TRUE"
+
+ // Set storage handler - required for Hive to query Iceberg tables
+ parameters["storage_handler"] =
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"
+
+ // Copy additional properties
+ for k, v := range props {
+ parameters[k] = v
+ }
+
+ return &hive_metastore.Table{
+ TableName: tableName,
+ DbName: dbName,
+ TableType: TableTypeExternalTable,
+ Sd: &hive_metastore.StorageDescriptor{
+ Cols: schemaToHiveColumns(schema),
+ Location: location,
+ InputFormat:
"org.apache.iceberg.mr.hive.HiveIcebergInputFormat",
+ OutputFormat:
"org.apache.iceberg.mr.hive.HiveIcebergOutputFormat",
+ SerdeInfo: &hive_metastore.SerDeInfo{
+ SerializationLib:
"org.apache.iceberg.mr.hive.HiveIcebergSerDe",
+ },
+ },
+ Parameters: parameters,
+ }
+}
+
+func updateHiveTableForCommit(existing *hive_metastore.Table,
newMetadataLocation string) *hive_metastore.Table {
+ // Copy the existing table
+ updated := *existing
+
+ // Update parameters
+ if updated.Parameters == nil {
+ updated.Parameters = make(map[string]string)
+ }
+
+ // Store previous metadata location
+ if oldLocation, ok := updated.Parameters[MetadataLocationKey]; ok {
+ updated.Parameters[PreviousMetadataLocationKey] = oldLocation
+ }
+
+ // Set new metadata location
+ updated.Parameters[MetadataLocationKey] = newMetadataLocation
+
+ return &updated
+}
+
+// isIcebergTable checks if a Hive table is an Iceberg table.
+func isIcebergTable(tbl *hive_metastore.Table) bool {
+ if tbl == nil || tbl.Parameters == nil {
+ return false
+ }
+
+ tableType, ok := tbl.Parameters[TableTypeKey]
+ if !ok {
+ return false
+ }
+
+ return strings.EqualFold(tableType, TableTypeIceberg)
+}
+
+func getMetadataLocation(tbl *hive_metastore.Table) (string, error) {
+ if tbl == nil || tbl.Parameters == nil {
+ return "", errors.New("table has no parameters")
+ }
+
+ location, ok := tbl.Parameters[MetadataLocationKey]
+ if !ok {
+ return "", fmt.Errorf("table does not have %s parameter",
MetadataLocationKey)
+ }
+
+ return location, nil
+}
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index 6d92ca0e..12838591 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/catalog/glue"
+ "github.com/apache/iceberg-go/catalog/hive"
"github.com/apache/iceberg-go/catalog/rest"
"github.com/apache/iceberg-go/config"
"github.com/apache/iceberg-go/table"
@@ -196,6 +197,16 @@ func main() {
glue.WithAwsConfig(awscfg),
}
cat = glue.NewCatalog(opts...)
+ case catalog.Hive:
+ props := iceberg.Properties{
+ hive.URI: cfg.URI,
+ }
+ if len(cfg.Warehouse) > 0 {
+ props[hive.Warehouse] = cfg.Warehouse
+ }
+ if cat, err = hive.NewCatalog(props); err != nil {
+ log.Fatal(err)
+ }
default:
log.Fatal("unrecognized catalog type")
}
diff --git a/go.mod b/go.mod
index 00876bf8..cfaab8ce 100644
--- a/go.mod
+++ b/go.mod
@@ -31,6 +31,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0
github.com/aws/smithy-go v1.24.0
github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools
v0.0.0-20250407191926-092f3e54b837
+ github.com/beltran/gohive v1.8.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/docker/docker v28.5.2+incompatible
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815
@@ -100,6 +101,8 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.30.9 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
+ github.com/beltran/gosasl v1.0.0 // indirect
+ github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/goterm v1.0.4 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
@@ -150,6 +153,7 @@ require (
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
+ github.com/go-zookeeper/zk v1.0.4 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/goccy/go-yaml v1.17.1 // indirect
github.com/gofrs/flock v0.12.1 // indirect
diff --git a/go.sum b/go.sum
index 25e8af56..92dee50b 100644
--- a/go.sum
+++ b/go.sum
@@ -148,6 +148,12 @@ github.com/aws/smithy-go v1.24.0
h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk=
github.com/aws/smithy-go v1.24.0/go.mod
h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools
v0.0.0-20250407191926-092f3e54b837
h1:8eMceEa0ib+nqJuGsyowuZaVBVAr685oK6WrNIit+0g=
github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools
v0.0.0-20250407191926-092f3e54b837/go.mod
h1:9Oj/8PZn3D5Ftp/Z1QWrIEFE0daERMqfJawL9duHRfc=
+github.com/beltran/gohive v1.8.1
h1:qlygmroy3mKtKIQSpV/FqXJHty1LsPxF+JTQA5mbjwU=
+github.com/beltran/gohive v1.8.1/go.mod
h1:BCgNAhr/wnbyXfp2yN9ZY4pVrGrtVqG4hhNDDXIal1U=
+github.com/beltran/gosasl v1.0.0
h1:iiRtLxkvKhrNv3Ohh/n2NiyyfwIo/UbMzy/dZWiUHXE=
+github.com/beltran/gosasl v1.0.0/go.mod
h1:Qx8cW6jkI8riyzmklj80kAIkv+iezFUTBiGU0qHhHes=
+github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab
h1:ayfcn60tXOSYy5zUN1AMSTQo4nJCf7hrdzAVchpPst4=
+github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab/go.mod
h1:GLe4UoSyvJ3cVG+DVtKen5eAiaD8mAJFuV5PT3Eeg9Q=
github.com/beorn7/perks v0.0.0-20150223135152-b965b613227f/go.mod
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -315,6 +321,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0
h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod
h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-viper/mapstructure/v2 v2.4.0
h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod
h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
+github.com/go-zookeeper/zk v1.0.4
h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I=
+github.com/go-zookeeper/zk v1.0.4/go.mod
h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/goccy/go-json v0.10.5
h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod
h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-yaml v1.17.1
h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY=
diff --git a/internal/recipe/docker-compose.yml
b/internal/recipe/docker-compose.yml
index f205db42..367ee666 100644
--- a/internal/recipe/docker-compose.yml
+++ b/internal/recipe/docker-compose.yml
@@ -118,5 +118,42 @@ services:
- 4443:4443
command: ["-scheme", "http", "-port", "4443", "-backend", "memory",
"-public-host", "fake-gcs-server:4443"]
+ hive-metastore:
+ image: apache/hive:4.0.0
+ container_name: hive-metastore
+ user: root
+ networks:
+ iceberg_net:
+ ports:
+ - 9083:9083
+ environment:
+ - SERVICE_NAME=metastore
+ - DB_DRIVER=derby
+ volumes:
+ - hive-metastore-data:/opt/hive/data
+ - /tmp/iceberg-warehouse:/tmp/iceberg-warehouse
+
+ hiveserver2:
+ image: apache/hive:4.0.0
+ container_name: hiveserver2
+ user: root
+ networks:
+ iceberg_net:
+ depends_on:
+ - hive-metastore
+ ports:
+ - "10003:10000" # Adjusted to avoid conflict with spark-iceberg
+ - "10002:10002" # Web UI
+ environment:
+ - SERVICE_NAME=hiveserver2
+ - SERVICE_OPTS=-Dhive.metastore.uris=thrift://hive-metastore:9083
+ - IS_RESUME=true
+ volumes:
+ - hive-metastore-data:/opt/hive/data
+ - /tmp/iceberg-warehouse:/tmp/iceberg-warehouse
+
networks:
iceberg_net:
+
+volumes:
+ hive-metastore-data: