This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new e40dec4a feate(hive): Add Create View for Hive catalog (#788)
e40dec4a is described below
commit e40dec4a1d10f73b1689b0aea027dc2c83167752
Author: Dao Thanh Tung <[email protected]>
AuthorDate: Mon Mar 23 21:09:06 2026 +0000
feate(hive): Add Create View for Hive catalog (#788)
Add the missing Create View for Hive catalog . Related to
https://github.com/apache/iceberg-go/issues/739
---------
Signed-off-by: dttung2905 <[email protected]>
---
README.md | 2 +-
catalog/hive/hive.go | 117 +++++++++++++++++++++++++++
catalog/hive/hive_integration_test.go | 147 ++++++++++++++++++++++++++++++++++
catalog/hive/hive_test.go | 124 ++++++++++++++++++++++++++++
catalog/hive/schema.go | 45 +++++++++++
5 files changed, 434 insertions(+), 1 deletion(-)
diff --git a/README.md b/README.md
index c02480bb..2e6d6d63 100644
--- a/README.md
+++ b/README.md
@@ -128,7 +128,7 @@ make lint-install
| Check Namespace Exists | X | X | X | X |
| Drop Namespace | X | X | X | X |
| Update Namespace Properties | X | X | X | X |
-| Create View | X | | | X |
+| Create View | X | X | | X |
| Load View | | X | | X |
| List View | X | X | | X |
| Drop View | X | X | | X |
diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go
index bde836e0..e3ac750f 100644
--- a/catalog/hive/hive.go
+++ b/catalog/hive/hive.go
@@ -202,6 +202,123 @@ func (c *Catalog) CreateTable(ctx context.Context,
identifier table.Identifier,
return c.LoadTable(ctx, identifier)
}
+// CreateView creates a new view in the catalog. It uses the same signature as
the REST catalog:
+// identifier, version (with SQL representations), schema, and optional
CreateViewOpt for location and properties.
+// Returns the created *view.View, or an error if the namespace is missing, a
view/table already exists, or creation fails.
+func (c *Catalog) CreateView(ctx context.Context, identifier table.Identifier,
version *view.Version, schema *iceberg.Schema, opts ...catalog.CreateViewOpt)
(*view.View, error) {
+ database, viewName, err := identifierToTableName(identifier)
+ if err != nil {
+ return nil, err
+ }
+
+ cfg := catalog.NewCreateViewCfg()
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ exists, err := c.CheckNamespaceExists(ctx, DatabaseIdentifier(database))
+ if err != nil {
+ return nil, err
+ }
+ if !exists {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace,
database)
+ }
+
+ viewExists, err := c.CheckViewExists(ctx, identifier)
+ if err != nil {
+ return nil, err
+ }
+ if viewExists {
+ return nil, fmt.Errorf("%w: %s.%s",
catalog.ErrViewAlreadyExists, database, viewName)
+ }
+
+ tableExists, err := c.CheckTableExists(ctx, identifier)
+ if err != nil {
+ return nil, err
+ }
+ if tableExists {
+ return nil, fmt.Errorf("%w: %s.%s",
catalog.ErrTableAlreadyExists, database, viewName)
+ }
+
+ loc := strings.TrimSuffix(cfg.Location, "/")
+ if loc == "" {
+ var err error
+ loc, err = internal.ResolveTableLocation(ctx, "", database,
viewName, c.opts.props, c.LoadNamespaceProperties)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ viewSQL, err := sqlFromVersion(version)
+ if err != nil {
+ return nil, err
+ }
+
+ defaultNS := catalog.NamespaceFromIdent(identifier)
+ catalogName := "hive"
+
+ // Merge catalog props with view props so io.LoadFS can resolve and
view metadata gets user properties.
+ props := make(iceberg.Properties)
+ if c.opts.props != nil {
+ maps.Copy(props, c.opts.props)
+ }
+ if cfg.Properties != nil {
+ maps.Copy(props, cfg.Properties)
+ }
+
+ createdView, err := view.CreateView(ctx, catalogName, identifier,
freshSchema, viewSQL, defaultNS, loc, props)
+ if err != nil {
+ return nil, err
+ }
+
+ viewProps := make(map[string]string)
+ if cfg.Properties != nil {
+ for k, v := range cfg.Properties {
+ viewProps[k] = v
+ }
+ }
+
+ hiveTbl := constructHiveViewTable(database, viewName, loc,
createdView.MetadataLocation(), freshSchema, viewSQL, viewProps)
+ if err := c.client.CreateTable(ctx, hiveTbl); err != nil {
+ if isAlreadyExistsError(err) {
+ return nil, fmt.Errorf("%w: %s.%s",
catalog.ErrViewAlreadyExists, database, viewName)
+ }
+
+ return nil, fmt.Errorf("failed to create view %s.%s: %w",
database, viewName, err)
+ }
+
+ return createdView, nil
+}
+
+// sqlFromVersion returns the SQL from the first SQL representation in
version, preferring dialect "hive".
+func sqlFromVersion(v *view.Version) (string, error) {
+ if v == nil || len(v.Representations) == 0 {
+ return "", errors.New("view version has no representations")
+ }
+ var fallback string
+ for _, r := range v.Representations {
+ if r.Type != "sql" {
+ continue
+ }
+ if strings.EqualFold(r.Dialect, "hive") {
+ return r.Sql, nil
+ }
+ if fallback == "" {
+ fallback = r.Sql
+ }
+ }
+ if fallback != "" {
+ return fallback, nil
+ }
+
+ return "", errors.New("view version has no SQL representation")
+}
+
func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier)
error {
database, tableName, err := identifierToTableName(identifier)
if err != nil {
diff --git a/catalog/hive/hive_integration_test.go
b/catalog/hive/hive_integration_test.go
index 138d6605..3cfcf398 100644
--- a/catalog/hive/hive_integration_test.go
+++ b/catalog/hive/hive_integration_test.go
@@ -29,6 +29,8 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
+ "github.com/apache/iceberg-go/view"
"github.com/stretchr/testify/require"
)
@@ -392,3 +394,148 @@ func TestHiveIntegrationDropViewNoSuchView(t *testing.T) {
assert.Error(err)
assert.True(errors.Is(err, catalog.ErrNoSuchView))
}
+
+func TestHiveIntegrationCreateView(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+ viewName := "test_view"
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col",
Type: iceberg.PrimitiveTypes.Int32, Required: true})
+ viewSQL := "SELECT 1 AS col"
+ ver, err := view.NewVersionFromSQL(1, 0, viewSQL,
table.Identifier{dbName})
+ assert.NoError(err)
+
+ viewLocation := getTestTableLocation() + "/" + dbName + "/" + viewName
+ v, err := cat.CreateView(context.TODO(), TableIdentifier(dbName,
viewName), ver, schema,
+ catalog.WithViewLocation(viewLocation),
+ )
+ assert.NoError(err)
+ assert.NotNil(v)
+ defer cat.DropView(context.TODO(), TableIdentifier(dbName, viewName))
+
+ exists, err := cat.CheckViewExists(context.TODO(),
TableIdentifier(dbName, viewName))
+ assert.NoError(err)
+ assert.True(exists)
+
+ loaded, err := cat.LoadView(context.TODO(), TableIdentifier(dbName,
viewName))
+ assert.NoError(err)
+ assert.NotNil(loaded)
+ assert.True(schema.Equals(loaded.CurrentSchema()))
+ assert.Len(loaded.CurrentVersion().Representations, 1)
+ assert.Equal("sql", loaded.CurrentVersion().Representations[0].Type)
+ assert.Equal(viewSQL, loaded.CurrentVersion().Representations[0].Sql)
+}
+
+func TestHiveIntegrationCreateViewThenDrop(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+ viewName := "view_to_drop"
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "id",
Type: iceberg.PrimitiveTypes.Int64, Required: true})
+ ver, _ := view.NewVersionFromSQL(1, 0, "SELECT 1 AS id",
table.Identifier{dbName})
+
+ viewLocation := getTestTableLocation() + "/" + dbName + "/" + viewName
+ _, err = cat.CreateView(context.TODO(), TableIdentifier(dbName,
viewName), ver, schema,
+ catalog.WithViewLocation(viewLocation),
+ )
+ assert.NoError(err)
+
+ exists, err := cat.CheckViewExists(context.TODO(),
TableIdentifier(dbName, viewName))
+ assert.NoError(err)
+ assert.True(exists)
+
+ err = cat.DropView(context.TODO(), TableIdentifier(dbName, viewName))
+ assert.NoError(err)
+
+ exists, err = cat.CheckViewExists(context.TODO(),
TableIdentifier(dbName, viewName))
+ assert.NoError(err)
+ assert.False(exists)
+
+ _, err = cat.LoadView(context.TODO(), TableIdentifier(dbName, viewName))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrNoSuchView))
+}
+
+func TestHiveIntegrationCreateView_TableConflict(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+ tableName := "t1"
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "id",
Type: iceberg.PrimitiveTypes.Int64, Required: true})
+ tableLocation := getTestTableLocation() + "/" + dbName + "/" + tableName
+ _, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName,
tableName), schema,
+ catalog.WithLocation(tableLocation),
+ )
+ assert.NoError(err)
+ defer cat.DropTable(context.TODO(), TableIdentifier(dbName, tableName))
+
+ ver, _ := view.NewVersionFromSQL(1, 0, "SELECT * FROM t1",
table.Identifier{dbName})
+ viewLocation := getTestTableLocation() + "/" + dbName + "/" + tableName
+ "_view"
+ _, err = cat.CreateView(context.TODO(), TableIdentifier(dbName,
tableName), ver, schema,
+ catalog.WithViewLocation(viewLocation),
+ )
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrTableAlreadyExists))
+}
+
+func TestHiveIntegrationCreateView_ViewConflict(t *testing.T) {
+ assert := require.New(t)
+
+ cat := createTestCatalog(t)
+ defer cat.Close()
+
+ dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
+ viewName := "v1"
+
+ err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName),
iceberg.Properties{
+ "location": getTestTableLocation() + "/" + dbName,
+ })
+ assert.NoError(err)
+ defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))
+
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col",
Type: iceberg.PrimitiveTypes.Int32, Required: true})
+ ver, _ := view.NewVersionFromSQL(1, 0, "SELECT 1 AS col",
table.Identifier{dbName})
+ viewLocation := getTestTableLocation() + "/" + dbName + "/" + viewName
+
+ _, err = cat.CreateView(context.TODO(), TableIdentifier(dbName,
viewName), ver, schema,
+ catalog.WithViewLocation(viewLocation),
+ )
+ assert.NoError(err)
+ defer cat.DropView(context.TODO(), TableIdentifier(dbName, viewName))
+
+ _, err = cat.CreateView(context.TODO(), TableIdentifier(dbName,
viewName), ver, schema,
+ catalog.WithViewLocation(viewLocation+"/second"),
+ )
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrViewAlreadyExists))
+}
diff --git a/catalog/hive/hive_test.go b/catalog/hive/hive_test.go
index 05a7b8bd..03140c34 100644
--- a/catalog/hive/hive_test.go
+++ b/catalog/hive/hive_test.go
@@ -20,11 +20,13 @@ package hive
import (
"context"
"errors"
+ "path/filepath"
"testing"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/table"
+ "github.com/apache/iceberg-go/view"
"github.com/beltran/gohive/hive_metastore"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@@ -943,3 +945,125 @@ func TestIdentifierValidation(t *testing.T) {
assert.Error(err)
})
}
+
+func TestCreateView_NamespaceNotFound(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+ mockClient.On("GetDatabase", mock.Anything, "missing_db").
+ Return(nil, errNoSuchObject).Once()
+
+ cat := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ ver, _ := view.NewVersionFromSQL(1, 0, "SELECT 1",
table.Identifier{"missing_db"})
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col",
Type: iceberg.PrimitiveTypes.Int32, Required: true})
+
+ _, err := cat.CreateView(context.Background(),
TableIdentifier("missing_db", "v1"), ver, schema,
catalog.WithViewLocation("file:///tmp/loc"))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrNoSuchNamespace))
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestCreateView_ViewAlreadyExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+ mockClient.On("GetDatabase", mock.Anything,
"test_database").Return(&hive_metastore.Database{Name: "test_database"},
nil).Once()
+ mockClient.On("GetTable", mock.Anything, "test_database",
"test_view").Return(testIcebergHiveView, nil).Once()
+
+ cat := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ ver, _ := view.NewVersionFromSQL(1, 0, "SELECT 1",
table.Identifier{"test_database"})
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col",
Type: iceberg.PrimitiveTypes.Int32, Required: true})
+
+ _, err := cat.CreateView(context.Background(),
TableIdentifier("test_database", "test_view"), ver, schema,
catalog.WithViewLocation("file:///tmp/loc"))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrViewAlreadyExists))
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestCreateView_TableAlreadyExists(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+ mockClient.On("GetDatabase", mock.Anything,
"test_database").Return(&hive_metastore.Database{Name: "test_database"},
nil).Once()
+ // Called twice: once for CheckViewExists and once for CheckTableExists.
+ mockClient.On("GetTable", mock.Anything, "test_database",
"test_table").Return(testIcebergHiveTable1, nil).Twice()
+
+ cat := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ ver, _ := view.NewVersionFromSQL(1, 0, "SELECT 1",
table.Identifier{"test_database"})
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col",
Type: iceberg.PrimitiveTypes.Int32, Required: true})
+
+ _, err := cat.CreateView(context.Background(),
TableIdentifier("test_database", "test_table"), ver, schema,
catalog.WithViewLocation("file:///tmp/loc"))
+ assert.Error(err)
+ assert.True(errors.Is(err, catalog.ErrTableAlreadyExists))
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestCreateView_InvalidIdentifier(t *testing.T) {
+ assert := require.New(t)
+
+ cat := NewCatalogWithClient(&mockHiveClient{}, iceberg.Properties{})
+
+ ver, _ := view.NewVersionFromSQL(1, 0, "SELECT 1",
table.Identifier{"db"})
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col",
Type: iceberg.PrimitiveTypes.Int32, Required: true})
+
+ _, err := cat.CreateView(context.Background(),
table.Identifier{"only_db"}, ver, schema)
+ assert.Error(err)
+}
+
+func TestCreateView_Success(t *testing.T) {
+ assert := require.New(t)
+
+ dir := t.TempDir()
+ loc := "file://" + filepath.ToSlash(dir) + "/view_loc"
+
+ mockClient := &mockHiveClient{}
+ mockClient.On("GetDatabase", mock.Anything,
"test_database").Return(&hive_metastore.Database{Name: "test_database"},
nil).Once()
+ mockClient.On("GetTable", mock.Anything, "test_database",
"new_view").Return(nil, errNoSuchObject).Twice() // CheckViewExists and
CheckTableExists
+ mockClient.On("CreateTable", mock.Anything, mock.MatchedBy(func(tbl
*hive_metastore.Table) bool {
+ return tbl != nil && tbl.TableType == TableTypeVirtualView &&
+ tbl.Parameters != nil && tbl.Parameters[TableTypeKey]
== TableTypeIcebergView &&
+ tbl.Parameters[MetadataLocationKey] != "" &&
+ tbl.ViewOriginalText == "SELECT 1 AS col" &&
tbl.ViewExpandedText == "SELECT 1 AS col"
+ })).Return(nil).Once()
+
+ cat := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ ver, err := view.NewVersionFromSQL(1, 0, "SELECT 1 AS col",
table.Identifier{"test_database"})
+ assert.NoError(err)
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col",
Type: iceberg.PrimitiveTypes.Int32, Required: true})
+
+ v, err := cat.CreateView(context.Background(),
TableIdentifier("test_database", "new_view"), ver, schema,
catalog.WithViewLocation(loc))
+ assert.NoError(err)
+ assert.NotNil(v)
+ assert.Equal(table.Identifier{"test_database", "new_view"},
v.Identifier())
+
+ mockClient.AssertExpectations(t)
+}
+
+func TestCreateView_VersionNoSQLRepresentation(t *testing.T) {
+ assert := require.New(t)
+
+ mockClient := &mockHiveClient{}
+ mockClient.On("GetDatabase", mock.Anything,
"test_database").Return(&hive_metastore.Database{Name: "test_database"},
nil).Once()
+ mockClient.On("GetTable", mock.Anything, "test_database",
"v1").Return(nil, errNoSuchObject).Twice()
+
+ cat := NewCatalogWithClient(mockClient, iceberg.Properties{})
+
+ ver, err := view.NewVersionFromSQL(1, 0, "SELECT 1",
table.Identifier{"test_database"})
+ assert.NoError(err)
+ ver.Representations = nil // no SQL representation
+
+ schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col",
Type: iceberg.PrimitiveTypes.Int32, Required: true})
+
+ _, err = cat.CreateView(context.Background(),
TableIdentifier("test_database", "v1"), ver, schema,
catalog.WithViewLocation("file:///tmp/loc"))
+ assert.Error(err)
+ assert.Contains(err.Error(), "no representations")
+
+ mockClient.AssertExpectations(t)
+}
diff --git a/catalog/hive/schema.go b/catalog/hive/schema.go
index 3667c870..310d7c7f 100644
--- a/catalog/hive/schema.go
+++ b/catalog/hive/schema.go
@@ -83,6 +83,51 @@ func icebergTypeToHiveType(typ iceberg.Type) string {
}
}
+// Generic Hive SerDe and input/output format used for Iceberg views (not
Iceberg table storage handler).
+// See HiveOperationsBase.storageDescriptor(..., false) in Java.
+const (
+ hiveViewInputFormat = "org.apache.hadoop.mapred.FileInputFormat"
+ hiveViewOutputFormat = "org.apache.hadoop.mapred.FileOutputFormat"
+ hiveViewSerDe =
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
+)
+
+// constructHiveViewTable builds an HMS Table for an Iceberg view. The table
type is VIRTUAL_VIEW,
+// parameters include table_type=ICEBERG_VIEW and metadata_location;
StorageDescriptor uses
+// generic Hive SerDe (not the Iceberg storage handler). ViewOriginalText and
ViewExpandedText
+// are set to viewSQL.
+func constructHiveViewTable(dbName, viewName, location, metadataLocation
string, schema *iceberg.Schema, viewSQL string, props map[string]string)
*hive_metastore.Table {
+ parameters := make(map[string]string)
+ parameters[TableTypeKey] = TableTypeIcebergView
+ parameters[MetadataLocationKey] = metadataLocation
+ parameters[ExternalKey] = "TRUE"
+ for k, v := range props {
+ if v != "" {
+ parameters[k] = v
+ }
+ }
+
+ // Ref:
https://github.com/apache/iceberg/blob/11dbe2f091edd4ac492f210c878d22386ec9d605/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java#L174-L178
+ tbl := &hive_metastore.Table{
+ TableName: viewName,
+ DbName: dbName,
+ TableType: TableTypeVirtualView,
+ ViewOriginalText: viewSQL,
+ ViewExpandedText: viewSQL,
+ Parameters: parameters,
+ Sd: &hive_metastore.StorageDescriptor{
+ Cols: schemaToHiveColumns(schema),
+ Location: location,
+ InputFormat: hiveViewInputFormat,
+ OutputFormat: hiveViewOutputFormat,
+ SerdeInfo: &hive_metastore.SerDeInfo{
+ SerializationLib: hiveViewSerDe,
+ },
+ },
+ }
+
+ return tbl
+}
+
func constructHiveTable(dbName, tableName, location, metadataLocation string,
schema *iceberg.Schema, props map[string]string) *hive_metastore.Table {
parameters := make(map[string]string)