This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 18961c76 feat(hive): Add support for view operations in hvie catalog
(#742)
18961c76 is described below
commit 18961c76a0d114156a5a669dfc6db937796c3fbf
Author: Dao Thanh Tung <[email protected]>
AuthorDate: Mon Feb 23 21:42:44 2026 +0000
feat(hive): Add support for view operations in hvie catalog (#742)
Related to this https://github.com/apache/iceberg-go/issues/739
I leave out create view ops for now and will work on it in the next PR
---------
Signed-off-by: dttung2905 <[email protected]>
---
README.md | 8 +-
catalog/hive/hive.go | 156 +++++++++++++++++++++++++++-
catalog/hive/hive_integration_test.go | 74 ++++++++++++++
catalog/hive/hive_test.go | 187 ++++++++++++++++++++++++++++++++++
catalog/hive/options.go | 10 +-
catalog/hive/schema.go | 25 +++++
6 files changed, 451 insertions(+), 9 deletions(-)
diff --git a/README.md b/README.md
index 974626ae..a714bfb2 100644
--- a/README.md
+++ b/README.md
@@ -129,10 +129,10 @@ make lint-install
| Drop Namespace | X | X | X | X |
| Update Namespace Properties | X | X | X | X |
| Create View | X | | | X |
-| Load View | | | | X |
-| List View | X | | | X |
-| Drop View | X | | | X |
-| Check View Exists | X | | | X |
+| Load View | | X | | X |
+| List View | X | X | | X |
+| Drop View | X | X | | X |
+| Check View Exists | X | X | | X |
### Read/Write Data Support
diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go
index 2236ab56..bde836e0 100644
--- a/catalog/hive/hive.go
+++ b/catalog/hive/hive.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/iceberg-go/catalog/internal"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
+ "github.com/apache/iceberg-go/view"
"github.com/beltran/gohive/hive_metastore"
)
@@ -155,12 +156,21 @@ func (c *Catalog) LoadTable(ctx context.Context,
identifier table.Identifier) (*
}
func (c *Catalog) CreateTable(ctx context.Context, identifier
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt)
(*table.Table, error) {
- staged, err := internal.CreateStagedTable(ctx, c.opts.props,
c.LoadNamespaceProperties, identifier, schema, opts...)
+ database, tableName, err := identifierToTableName(identifier)
if err != nil {
return nil, err
}
- database, tableName, err := identifierToTableName(identifier)
+ // Ensure there is no view with the same identifier.
+ viewExists, err := c.CheckViewExists(ctx, identifier)
+ if err != nil {
+ return nil, err
+ }
+ if viewExists {
+ return nil, fmt.Errorf("%w: %s.%s",
catalog.ErrViewAlreadyExists, database, tableName)
+ }
+
+ staged, err := internal.CreateStagedTable(ctx, c.opts.props,
c.LoadNamespaceProperties, identifier, schema, opts...)
if err != nil {
return nil, err
}
@@ -322,6 +332,148 @@ func (c *Catalog) CheckTableExists(ctx context.Context,
identifier table.Identif
return isIcebergTable(hiveTbl), nil
}
+// ListViews returns a list of view identifiers in the given namespace.
+func (c *Catalog) ListViews(ctx context.Context, namespace table.Identifier)
iter.Seq2[table.Identifier, error] {
+ return func(yield func(table.Identifier, error) bool) {
+ database, err := identifierToDatabase(namespace)
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+
+ exists, err := c.CheckNamespaceExists(ctx,
DatabaseIdentifier(database))
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+ if !exists {
+ yield(nil, fmt.Errorf("%w: %s",
catalog.ErrNoSuchNamespace, database))
+
+ return
+ }
+
+ viewNames, err := c.client.GetTables(ctx, database, "*")
+ if err != nil {
+ yield(nil, fmt.Errorf("failed to list views in %s: %w",
database, err))
+
+ return
+ }
+
+ if len(viewNames) == 0 {
+ return
+ }
+
+ for _, viewName := range viewNames {
+ tbl, err := c.client.GetTable(ctx, database, viewName)
+ if err != nil {
+ // Skip tables we fail to load, mirroring table
listing behavior.
+ continue
+ }
+ if isIcebergView(tbl) {
+ if !yield(TableIdentifier(database, viewName),
nil) {
+ return
+ }
+ }
+ }
+ }
+}
+
+// LoadView loads a view from the catalog.
+func (c *Catalog) LoadView(ctx context.Context, identifier table.Identifier)
(*view.View, error) {
+ database, viewName, err := identifierToTableName(identifier)
+ if err != nil {
+ return nil, err
+ }
+
+ hiveTbl, err := c.client.GetTable(ctx, database, viewName)
+ if err != nil {
+ if isNoSuchObjectError(err) {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchView,
identifier)
+ }
+
+ return nil, fmt.Errorf("failed to get view %s.%s: %w",
database, viewName, err)
+ }
+
+ // If the object is an Iceberg table, do not treat it as a view.
+ if isIcebergTable(hiveTbl) || !isIcebergView(hiveTbl) {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchView,
identifier)
+ }
+
+ metadataLocation, err := getViewMetadataLocation(hiveTbl)
+ if err != nil {
+ return nil, fmt.Errorf("%w: %s, metadata location is missing",
catalog.ErrNoSuchView, identifier)
+ }
+
+ return view.NewFromLocation(
+ ctx,
+ identifier,
+ metadataLocation,
+ io.LoadFSFunc(c.opts.props, metadataLocation),
+ )
+}
+
+// DropView drops a view from the catalog.
+func (c *Catalog) DropView(ctx context.Context, identifier table.Identifier)
error {
+ database, viewName, err := identifierToTableName(identifier)
+ if err != nil {
+ return err
+ }
+
+ hiveTbl, err := c.client.GetTable(ctx, database, viewName)
+ if err != nil {
+ if isNoSuchObjectError(err) {
+ return fmt.Errorf("%w: %s", catalog.ErrNoSuchView,
identifier)
+ }
+
+ return fmt.Errorf("failed to get view %s.%s: %w", database,
viewName, err)
+ }
+
+ if !isIcebergView(hiveTbl) {
+ return fmt.Errorf("%w: %s", catalog.ErrNoSuchView, identifier)
+ }
+
+ metadataLocation, err := getViewMetadataLocation(hiveTbl)
+ if err != nil {
+ return err
+ }
+
+ if err := c.client.DropTable(ctx, database, viewName, false); err !=
nil {
+ return fmt.Errorf("failed to drop view %s.%s: %w", database,
viewName, err)
+ }
+
+ fs, err := io.LoadFS(ctx, c.opts.props, metadataLocation)
+ if err != nil {
+ return fmt.Errorf("failed to load filesystem for view metadata:
%w", err)
+ }
+
+ if err := fs.Remove(metadataLocation); err != nil {
+ return fmt.Errorf("failed to remove view metadata file at %s:
%w", metadataLocation, err)
+ }
+
+ return nil
+}
+
+// CheckViewExists checks if a view exists in the catalog.
+func (c *Catalog) CheckViewExists(ctx context.Context, identifier
table.Identifier) (bool, error) {
+ database, viewName, err := identifierToTableName(identifier)
+ if err != nil {
+ return false, err
+ }
+
+ hiveTbl, err := c.client.GetTable(ctx, database, viewName)
+ if err != nil {
+ if isNoSuchObjectError(err) {
+ return false, nil
+ }
+
+ return false, err
+ }
+
+ return isIcebergView(hiveTbl), nil
+}
+
func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier)
([]table.Identifier, error) {
if len(parent) > 0 {
return nil, errors.New("hierarchical namespace is not
supported")
diff --git a/catalog/hive/hive_integration_test.go
b/catalog/hive/hive_integration_test.go
index cc0e6f1f..138d6605 100644
--- a/catalog/hive/hive_integration_test.go
+++ b/catalog/hive/hive_integration_test.go
@@ -21,6 +21,7 @@ package hive
import (
"context"
+ "errors"
"fmt"
"os"
"testing"
@@ -318,3 +319,76 @@ func TestHiveIntegrationDropTable(t *testing.T) {
assert.NoError(err)
assert.False(exists)
}
+
+func TestHiveIntegrationCheckViewExists(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ // Non-existent view returns false
+ exists, err := cat.CheckViewExists(context.TODO(),
TableIdentifier(dbName, "nonexistent_view"))
+ assert.NoError(err)
+ assert.False(exists)
+
+ // Create a table and ensure CheckViewExists returns false for that
name (it's a table, not a view)
+ schema := iceberg.NewSchemaWithIdentifiers(0, []int{1},
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+ tableLocation := getTestTableLocation() + "/" + dbName + "/some_table"
+ _, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName,
"some_table"), schema,
+ catalog.WithLocation(tableLocation),
+ )
+ assert.NoError(err)
+ defer cat.DropTable(context.TODO(), TableIdentifier(dbName,
"some_table"))
+
+ exists, err = cat.CheckViewExists(context.TODO(),
TableIdentifier(dbName, "some_table"))
+ assert.NoError(err)
+ assert.False(exists)
+}
+
+func TestHiveIntegrationLoadViewNoSuchView(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ _, err = cat.LoadView(context.TODO(), TableIdentifier(dbName,
"nonexistent_view"))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrNoSuchView))
+}
+
+func TestHiveIntegrationDropViewNoSuchView(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ err = cat.DropView(context.TODO(), TableIdentifier(dbName,
"nonexistent_view"))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrNoSuchView))
+}
diff --git a/catalog/hive/hive_test.go b/catalog/hive/hive_test.go
index 4f37307a..05a7b8bd 100644
--- a/catalog/hive/hive_test.go
+++ b/catalog/hive/hive_test.go
@@ -172,6 +172,19 @@ var testSchema = iceberg.NewSchemaWithIdentifiers(0,
[]int{},
iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Bool})
+var testIcebergHiveView = &hive_metastore.Table{
+ TableName: "test_view",
+ DbName: "test_database",
+ TableType: TableTypeVirtualView,
+ Parameters: map[string]string{
+ TableTypeKey: TableTypeIcebergView,
+ MetadataLocationKey:
"s3://test-bucket/test_view/metadata/view-1234.metadata.json",
+ },
+ Sd: &hive_metastore.StorageDescriptor{
+ Location: "s3://test-bucket/test_view",
+ },
+}
+
// Error helpers for mocking
var (
errNoSuchObject = errors.New("NoSuchObjectException: object not
found")
@@ -550,6 +563,125 @@ func TestHiveCatalogType(t *testing.T) {
assert.Equal(catalog.Hive, hiveCatalog.CatalogType())
}
+func TestHiveCreateTableConflictsWithView(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ // A view already exists with the same identifier.
+ mockClient.On("GetTable", mock.Anything, "test_database", "test_table").
+ Return(testIcebergHiveView, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ _, err := hiveCatalog.CreateTable(
+ context.TODO(),
+ TableIdentifier("test_database", "test_table"),
+ testSchema,
+ )
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrViewAlreadyExists))
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckViewExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database", "test_view").
+ Return(testIcebergHiveView, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ exists, err := hiveCatalog.CheckViewExists(context.TODO(),
TableIdentifier("test_database", "test_view"))
+ assert.NoError(err)
+ assert.True(exists)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveCheckViewNotExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database",
"nonexistent_view").
+ Return(nil, errNoSuchObject).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ exists, err := hiveCatalog.CheckViewExists(context.TODO(),
TableIdentifier("test_database", "nonexistent_view"))
+ assert.NoError(err)
+ assert.False(exists)
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveListViews(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ // One Iceberg view and one non-Iceberg table.
+ mockClient.On("GetTables", mock.Anything, "test_database", "*").
+ Return([]string{"test_view", "other_table"}, nil).Once()
+
+ mockClient.On("GetDatabase", mock.Anything, "test_database").
+ Return(&hive_metastore.Database{Name: "test_database"},
nil).Once()
+
+ mockClient.On("GetTable", mock.Anything, "test_database", "test_view").
+ Return(testIcebergHiveView, nil).Once()
+ mockClient.On("GetTable", mock.Anything, "test_database",
"other_table").
+ Return(testNonIcebergHiveTable, nil).Once()
+
+ hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ iter := hiveCatalog.ListViews(context.TODO(),
DatabaseIdentifier("test_database"))
+
+ var (
+ lastErr error
+ views []table.Identifier
+ )
+
+ for v, err := range iter {
+ if err != nil {
+ lastErr = err
+
+ break
+ }
+ views = append(views, v)
+ }
+
+ assert.NoError(lastErr)
+ assert.Len(views, 1)
+ assert.Equal(TableIdentifier("test_database", "test_view"), views[0])
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestHiveLoadView(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+
+ mockClient.On("GetTable", mock.Anything, "test_database", "test_view").
+ Return(testIcebergHiveView, nil).Once()
+
+ // Metadata location in the fixture is s3://... which we cannot load in
unit tests.
+ // We only assert that LoadView returns an error when the metadata is
unreachable.
+ props := iceberg.Properties{
+ "warehouse": "file:///tmp",
+ }
+ hiveCatalog := NewCatalogWithClient(mockClient, props)
+
+ _, err := hiveCatalog.LoadView(context.TODO(),
TableIdentifier("test_database", "test_view"))
+ assert.Error(err)
+
+ mockClient.AssertExpectations(t)
+}
+
func TestIsIcebergTable(t *testing.T) {
tests := []struct {
name string
@@ -600,6 +732,61 @@ func TestIsIcebergTable(t *testing.T) {
}
}
+func TestIsIcebergView(t *testing.T) {
+ tests := []struct {
+ name string
+ table *hive_metastore.Table
+ expected bool
+ }{
+ {
+ name: "iceberg view",
+ table: testIcebergHiveView,
+ expected: true,
+ },
+ {
+ name: "iceberg view mixed case",
+ table: &hive_metastore.Table{
+ TableName: "test_view_mixed",
+ DbName: "test_database",
+ TableType: "virtual_view",
+ Parameters: map[string]string{
+ TableTypeKey: "IcEbErG_ViEw",
+ },
+ },
+ expected: true,
+ },
+ {
+ name: "non-virtual-table-type",
+ table: &hive_metastore.Table{
+ TableName: "not_view",
+ DbName: "test_database",
+ TableType: TableTypeExternalTable,
+ Parameters: map[string]string{
+ TableTypeKey: TableTypeIcebergView,
+ },
+ },
+ expected: false,
+ },
+ {
+ name: "nil table",
+ table: nil,
+ expected: false,
+ },
+ {
+ name: "no parameters",
+ table: &hive_metastore.Table{TableType:
TableTypeVirtualView},
+ expected: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert := require.New(t)
+ assert.Equal(tt.expected, isIcebergView(tt.table))
+ })
+ }
+}
+
func TestIcebergTypeToHiveType(t *testing.T) {
tests := []struct {
name string
diff --git a/catalog/hive/options.go b/catalog/hive/options.go
index 6681676a..e3864338 100644
--- a/catalog/hive/options.go
+++ b/catalog/hive/options.go
@@ -31,9 +31,13 @@ const (
// Warehouse is the default warehouse location for tables
Warehouse = "warehouse"
- TableTypeKey = "table_type"
- TableTypeIceberg = "ICEBERG"
- TableTypeExternalTable = "EXTERNAL_TABLE"
+ TableTypeKey = "table_type"
+ TableTypeIceberg = "ICEBERG"
+ TableTypeExternalTable = "EXTERNAL_TABLE"
+ // Ref:
https://github.com/apache/hive/blob/7060d94843fdbc548445db6aac84dd60b44641ee/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableType.java#L27
+ TableTypeVirtualView = "VIRTUAL_VIEW"
+ // Ref:
https://github.com/apache/iceberg/blob/2f170322d425a4c6267a9033efa2107c9bfc53db/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java#L57
+ TableTypeIcebergView = "ICEBERG_VIEW"
MetadataLocationKey = "metadata_location"
PreviousMetadataLocationKey = "previous_metadata_location"
ExternalKey = "EXTERNAL"
diff --git a/catalog/hive/schema.go b/catalog/hive/schema.go
index 9d519596..3667c870 100644
--- a/catalog/hive/schema.go
+++ b/catalog/hive/schema.go
@@ -162,3 +162,28 @@ func getMetadataLocation(tbl *hive_metastore.Table)
(string, error) {
return location, nil
}
+
+// isIcebergView checks if a Hive table is an Iceberg view.
+func isIcebergView(tbl *hive_metastore.Table) bool {
+ if tbl == nil || tbl.Parameters == nil {
+ return false
+ }
+
+ // Ensure the HMS table type is a virtual view
+ if !strings.EqualFold(tbl.TableType, TableTypeVirtualView) {
+ return false
+ }
+
+ tableType, ok := tbl.Parameters[TableTypeKey]
+ if !ok {
+ return false
+ }
+
+ return strings.EqualFold(tableType, TableTypeIcebergView)
+}
+
+// getViewMetadataLocation returns the metadata location for an Iceberg view.
+// Views use the same metadata_location parameter as tables.
+func getViewMetadataLocation(tbl *hive_metastore.Table) (string, error) {
+ return getMetadataLocation(tbl)
+}