This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d8ec298 feat(catalog): hadoop cli integration (#1034)
9d8ec298 is described below

commit 9d8ec298d32914007917df0c1599b1c059174f82
Author: Tanmay Rauth <[email protected]>
AuthorDate: Thu May 7 11:17:39 2026 -0700

    feat(catalog): hadoop cli integration (#1034)
    
    [7: CLI
    
integration](https://github.com/apache/iceberg-go/issues/798#issuecomment-4320784323)
    Added a case catalog.Hadoop: branch to the catalog switch in
    cmd/iceberg/main.go, wire the cfg.Warehouse property, and add the blank
    import for catalog/hadoop.
---
 README.md                     |  14 +-
 catalog/hadoop/hadoop.go      | 209 +++++++++++++++++--
 catalog/hadoop/hadoop_test.go | 470 ++++++++++++++++++++++++++++++++++++++++++
 cmd/iceberg/main.go           |   7 +
 table/properties.go           |   1 +
 5 files changed, 682 insertions(+), 19 deletions(-)

diff --git a/README.md b/README.md
index 36279345..b68e55df 100644
--- a/README.md
+++ b/README.md
@@ -112,17 +112,17 @@ make lint-install
 
 | Operation                   | REST | Hive |  Glue  | SQL  | Hadoop |
 |:----------------------------|:----:|:----:|:------:|:----:|:------:|
-| Load Table                  |  X   |  X   |   X    |  X   |        |
+| Load Table                  |  X   |  X   |   X    |  X   |   X    |
 | List Tables                 |  X   |  X   |   X    |  X   |   X    |
-| Create Table                |  X   |  X   |   X    |  X   |        |
+| Create Table                |  X   |  X   |   X    |  X   |   X    |
 | Register Table              |  X   |  X   |   X    |      |        |
-| Update Current Snapshot     |  X   |  X   |   X    |  X   |        |
-| Create New Snapshot         |  X   |  X   |   X    |  X   |        |
+| Update Current Snapshot     |  X   |  X   |   X    |  X   |   X    |
+| Create New Snapshot         |  X   |  X   |   X    |  X   |   X    |
 | Rename Table                |  X   |  X   |   X    |  X   |        |
 | Drop Table                  |  X   |  X   |   X    |  X   |   X    |
-| Alter Table                 |  X   |  X   |   X    |  X   |        |
-| Check Table Exists          |  X   |  X   |   X    |  X   |        |
-| Set Table Properties        |  X   |  X   |   X    |  X   |        |
+| Alter Table                 |  X   |  X   |   X    |  X   |   X    |
+| Check Table Exists          |  X   |  X   |   X    |  X   |   X    |
+| Set Table Properties        |  X   |  X   |   X    |  X   |   X    |
 | List Namespaces             |  X   |  X   |   X    |  X   |   X    |
 | Create Namespace            |  X   |  X   |   X    |  X   |   X    |
 | Check Namespace Exists      |  X   |  X   |   X    |  X   |   X    |
diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go
index d3e7638f..cd1bacbd 100644
--- a/catalog/hadoop/hadoop.go
+++ b/catalog/hadoop/hadoop.go
@@ -33,6 +33,8 @@ import (
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/catalog/internal"
+       icebergio "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/google/uuid"
 )
@@ -296,12 +298,203 @@ func (c *Catalog) findVersion(ident table.Identifier) 
(int, error) {
        return c.scanForward(ident, maxVer), nil
 }
 
-func (c *Catalog) CreateTable(_ context.Context, _ table.Identifier, _ 
*iceberg.Schema, _ ...catalog.CreateTableOpt) (*table.Table, error) {
-       return nil, errors.New("hadoop catalog: CreateTable not yet 
implemented")
+func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc 
*iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
+       var cfg catalog.CreateTableCfg
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+
+       if len(ident) < 2 {
+               return nil, errors.New("hadoop catalog: table identifier must 
have at least a namespace and table name")
+       }
+
+       ns := catalog.NamespaceFromIdent(ident)
+       nsPath := c.namespaceToPath(ns)
+
+       info, err := os.Stat(nsPath)
+       if os.IsNotExist(err) || (err == nil && !info.IsDir()) {
+               return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, 
strings.Join(ns, "."))
+       }
+
+       if err != nil {
+               return nil, fmt.Errorf("hadoop catalog: failed to stat 
namespace directory: %w", err)
+       }
+
+       loc := c.defaultTableLocation(ident)
+       if cfg.Location != "" && cfg.Location != loc {
+               return nil, errors.New("hadoop catalog: custom table locations 
are not supported")
+       }
+
+       if isTableDir(loc) {
+               return nil, fmt.Errorf("%w: %s", catalog.ErrTableAlreadyExists, 
strings.Join(ident, "."))
+       }
+
+       metadata, err := table.NewMetadata(sc, cfg.PartitionSpec, 
cfg.SortOrder, loc, cfg.Properties)
+       if err != nil {
+               return nil, fmt.Errorf("hadoop catalog: failed to create table 
metadata: %w", err)
+       }
+
+       metaDir := c.metadataDir(ident)
+       if err := os.MkdirAll(metaDir, 0o755); err != nil {
+               return nil, fmt.Errorf("hadoop catalog: failed to create 
metadata directory: %w", err)
+       }
+
+       version := 1
+       metaPath := c.metadataFilePath(ident, version)
+       tempPath := filepath.Join(metaDir, uuid.New().String()+".metadata.json")
+
+       compression := table.MetadataCompressionDefault
+       if cfg.Properties != nil {
+               if v, ok := cfg.Properties[table.MetadataCompressionKey]; ok {
+                       compression = v
+               }
+       }
+
+       if err := internal.WriteTableMetadata(metadata, icebergio.LocalFS{}, 
tempPath, compression); err != nil {
+               os.Remove(tempPath)
+
+               return nil, fmt.Errorf("hadoop catalog: failed to write table 
metadata: %w", err)
+       }
+
+       if err := os.Rename(tempPath, metaPath); err != nil {
+               os.Remove(tempPath)
+
+               return nil, fmt.Errorf("hadoop catalog: failed to commit 
metadata file: %w", err)
+       }
+
+       c.writeVersionHint(ident, version)
+
+       tbl := table.New(
+               ident,
+               metadata,
+               metaPath,
+               icebergio.LoadFSFunc(c.props, metaPath),
+               c,
+       )
+
+       return tbl, nil
+}
+
+func (c *Catalog) LoadTable(ctx context.Context, ident table.Identifier) 
(*table.Table, error) {
+       if len(ident) < 2 {
+               return nil, errors.New("hadoop catalog: table identifier must 
have at least a namespace and table name")
+       }
+
+       ver, err := c.findVersion(ident)
+       if err != nil {
+               return nil, err
+       }
+
+       metaPath := c.metadataFilePath(ident, ver)
+
+       return table.NewFromLocation(ctx, ident, metaPath, 
icebergio.LoadFSFunc(c.props, metaPath), c)
+}
+
+func (c *Catalog) CheckTableExists(_ context.Context, ident table.Identifier) 
(bool, error) {
+       if len(ident) < 2 {
+               return false, nil
+       }
+
+       return isTableDir(c.tableToPath(ident)), nil
 }
 
-func (c *Catalog) CommitTable(_ context.Context, _ table.Identifier, _ 
[]table.Requirement, _ []table.Update) (table.Metadata, string, error) {
-       return nil, "", errors.New("hadoop catalog: CommitTable not yet 
implemented")
+func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, 
reqs []table.Requirement, updates []table.Update) (table.Metadata, string, 
error) {
+       if len(ident) < 2 {
+               return nil, "", errors.New("hadoop catalog: table identifier 
must have at least a namespace and table name")
+       }
+
+       // Step 1: Load current table (nil for create-via-commit).
+       current, err := c.LoadTable(ctx, ident)
+       if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) {
+               return nil, "", err
+       }
+
+       // Step 2: Validate requirements against current metadata.
+       if current != nil {
+               for _, r := range reqs {
+                       if err := r.Validate(current.Metadata()); err != nil {
+                               return nil, "", err
+                       }
+               }
+       }
+
+       // Step 3: Apply updates to produce new metadata.
+       var baseMeta table.Metadata
+       var currentMetadataLoc string
+
+       if current != nil {
+               baseMeta = current.Metadata()
+               currentMetadataLoc = current.MetadataLocation()
+       } else {
+               baseMeta, err = table.NewMetadata(iceberg.NewSchema(0), nil, 
table.UnsortedSortOrder, "", nil)
+               if err != nil {
+                       return nil, "", fmt.Errorf("hadoop catalog: failed to 
create base metadata: %w", err)
+               }
+       }
+
+       updated, err := internal.UpdateTableMetadata(baseMeta, updates, 
currentMetadataLoc)
+       if err != nil {
+               return nil, "", fmt.Errorf("hadoop catalog: failed to apply 
updates: %w", err)
+       }
+
+       // Step 4: Validate table location has not changed.
+       if current != nil && updated.Location() != current.Location() {
+               return nil, "", errors.New("hadoop catalog: table location 
cannot be changed")
+       }
+
+       // Step 5: Reject write.metadata.location property.
+       if v := updated.Properties().Get(table.WriteMetadataLocationKey, ""); v 
!= "" {
+               return nil, "", fmt.Errorf("hadoop catalog: %s property is not 
supported", table.WriteMetadataLocationKey)
+       }
+
+       // Step 6: Determine next version number.
+       var currentVersion int
+
+       if current != nil {
+               currentVersion, err = c.findVersion(ident)
+               if err != nil {
+                       return nil, "", err
+               }
+       }
+
+       newVersion := currentVersion + 1
+
+       // Step 7: Create metadata directory if needed (create-via-commit).
+       metaDir := c.metadataDir(ident)
+       if err := os.MkdirAll(metaDir, 0o755); err != nil {
+               return nil, "", fmt.Errorf("hadoop catalog: failed to create 
metadata directory: %w", err)
+       }
+
+       newMetaPath := c.metadataFilePath(ident, newVersion)
+       tempPath := filepath.Join(metaDir, uuid.New().String()+".metadata.json")
+
+       compression := updated.Properties().Get(table.MetadataCompressionKey, 
table.MetadataCompressionDefault)
+
+       if err := internal.WriteTableMetadata(updated, icebergio.LocalFS{}, 
tempPath, compression); err != nil {
+               os.Remove(tempPath)
+
+               return nil, "", fmt.Errorf("hadoop catalog: failed to write 
table metadata: %w", err)
+       }
+
+       // Conflict detection: target file must not already exist.
+       if _, err := os.Stat(newMetaPath); err == nil {
+               os.Remove(tempPath)
+
+               return nil, "", fmt.Errorf("hadoop catalog: version %d already 
exists for table %s",
+                       newVersion, strings.Join(ident, "."))
+       }
+
+       // Atomic commit via rename.
+       if err := os.Rename(tempPath, newMetaPath); err != nil {
+               os.Remove(tempPath)
+
+               return nil, "", fmt.Errorf("hadoop catalog: failed to commit 
metadata file: %w", err)
+       }
+
+       // Step 8: Best-effort version hint update.
+       c.writeVersionHint(ident, newVersion)
+
+       return updated, newMetaPath, nil
 }
 
 func (c *Catalog) ListTables(_ context.Context, ns table.Identifier) 
iter.Seq2[table.Identifier, error] {
@@ -371,14 +564,6 @@ func (c *Catalog) RenameTable(_ context.Context, _, _ 
table.Identifier) (*table.
        return nil, errors.New("hadoop catalog: rename table is not supported")
 }
 
-func (c *Catalog) LoadTable(_ context.Context, _ table.Identifier) 
(*table.Table, error) {
-       return nil, errors.New("hadoop catalog: LoadTable not yet implemented")
-}
-
-func (c *Catalog) CheckTableExists(_ context.Context, _ table.Identifier) 
(bool, error) {
-       return false, errors.New("hadoop catalog: CheckTableExists not yet 
implemented")
-}
-
 func (c *Catalog) CreateNamespace(_ context.Context, ns table.Identifier, 
props iceberg.Properties) error {
        if err := validateIdentifier(ns); err != nil {
                return err
diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go
index f9a36ca9..dbc0a422 100644
--- a/catalog/hadoop/hadoop_test.go
+++ b/catalog/hadoop/hadoop_test.go
@@ -19,6 +19,7 @@ package hadoop
 
 import (
        "context"
+       "fmt"
        "os"
        "path/filepath"
        "testing"
@@ -26,6 +27,7 @@ import (
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
        "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
        "github.com/stretchr/testify/suite"
 )
 
@@ -902,3 +904,471 @@ func (s *HadoopCatalogTestSuite) 
TestListTablesIdentifierIsolation() {
        // Each identifier should be independent — no aliasing.
        s.NotEqual(tables[0][len(tables[0])-1], tables[1][len(tables[1])-1])
 }
+
+// CreateTable tests
+
+func (s *HadoopCatalogTestSuite) testSchema() *iceberg.Schema {
+       return iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTable() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, 
s.testSchema())
+       s.Require().NoError(err)
+       s.NotNil(tbl)
+
+       // Verify metadata directory and files exist
+       metaDir := filepath.Join(s.warehouse, "ns", "tbl", "metadata")
+       s.DirExists(metaDir)
+       s.FileExists(filepath.Join(metaDir, "v1.metadata.json"))
+
+       // Verify version hint
+       data, err := os.ReadFile(filepath.Join(metaDir, "version-hint.text"))
+       s.Require().NoError(err)
+       s.Equal("1", string(data))
+
+       // Verify metadata
+       s.Equal(filepath.Join(metaDir, "v1.metadata.json"), 
tbl.MetadataLocation())
+       s.Equal(filepath.Join(s.warehouse, "ns", "tbl"), tbl.Location())
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableAndLoad() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       schema := s.testSchema()
+       created, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, schema)
+       s.Require().NoError(err)
+
+       loaded, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"})
+       s.Require().NoError(err)
+
+       s.Equal(created.Metadata().TableUUID(), loaded.Metadata().TableUUID())
+       s.Equal(created.Location(), loaded.Location())
+       s.Equal(len(created.Schema().Fields()), len(loaded.Schema().Fields()))
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableCustomLocation() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(),
+               catalog.WithLocation("/some/other/path"))
+       s.Require().Error(err)
+       s.Contains(err.Error(), "custom table locations are not supported")
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableSameLocationAllowed() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       loc := filepath.Join(s.warehouse, "ns", "tbl")
+       tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, 
s.testSchema(),
+               catalog.WithLocation(loc))
+       s.Require().NoError(err)
+       s.Equal(loc, tbl.Location())
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableNoNamespace() {
+       ctx := context.Background()
+
+       _, err := s.cat.CreateTable(ctx, []string{"nonexistent", "tbl"}, 
s.testSchema())
+       s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableAlreadyExists() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema())
+       s.Require().NoError(err)
+
+       _, err = s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema())
+       s.ErrorIs(err, catalog.ErrTableAlreadyExists)
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableWithPartitionSpec() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       spec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{
+                       SourceIDs: []int{1},
+                       FieldID:   1000,
+                       Name:      "id_bucket",
+                       Transform: iceberg.BucketTransform{NumBuckets: 16},
+               },
+       )
+
+       tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, 
s.testSchema(),
+               catalog.WithPartitionSpec(&spec))
+       s.Require().NoError(err)
+       s.False(tbl.Spec().IsUnpartitioned())
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableWithSortOrder() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       sortOrder, err := table.NewSortOrder(1, []table.SortField{
+               {
+                       SourceIDs: []int{1},
+                       Transform: iceberg.IdentityTransform{},
+                       Direction: table.SortASC,
+                       NullOrder: table.NullsFirst,
+               },
+       })
+       s.Require().NoError(err)
+
+       tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, 
s.testSchema(),
+               catalog.WithSortOrder(sortOrder))
+       s.Require().NoError(err)
+       s.Greater(tbl.SortOrder().Len(), 0)
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableWithProperties() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       props := iceberg.Properties{
+               "custom.key": "custom.value",
+       }
+
+       tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, 
s.testSchema(),
+               catalog.WithProperties(props))
+       s.Require().NoError(err)
+       s.Equal("custom.value", tbl.Properties()["custom.key"])
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableShortIdentifier() {
+       ctx := context.Background()
+
+       _, err := s.cat.CreateTable(ctx, []string{"tbl"}, s.testSchema())
+       s.Require().Error(err)
+       s.Contains(err.Error(), "at least a namespace and table name")
+}
+
+// LoadTable tests
+
+func (s *HadoopCatalogTestSuite) TestLoadTableNotExists() {
+       ctx := context.Background()
+
+       _, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"})
+       s.Require().Error(err)
+       s.ErrorIs(err, catalog.ErrNoSuchTable)
+}
+
+func (s *HadoopCatalogTestSuite) TestLoadTableStaleHint() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       // Create table (version 1)
+       _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema())
+       s.Require().NoError(err)
+
+       // Manually set the hint to a stale value
+       ident := []string{"ns", "tbl"}
+       s.cat.writeVersionHint(ident, 99)
+
+       // LoadTable should still succeed by falling back to dir listing
+       tbl, err := s.cat.LoadTable(ctx, ident)
+       s.Require().NoError(err)
+       s.NotNil(tbl)
+}
+
+func (s *HadoopCatalogTestSuite) TestLoadTableShortIdentifier() {
+       ctx := context.Background()
+
+       _, err := s.cat.LoadTable(ctx, []string{"tbl"})
+       s.Require().Error(err)
+       s.Contains(err.Error(), "at least a namespace and table name")
+}
+
+// CheckTableExists tests
+
+func (s *HadoopCatalogTestSuite) TestCheckTableExistsTrue() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema())
+       s.Require().NoError(err)
+
+       exists, err := s.cat.CheckTableExists(ctx, []string{"ns", "tbl"})
+       s.Require().NoError(err)
+       s.True(exists)
+}
+
+func (s *HadoopCatalogTestSuite) TestCheckTableExistsFalse() {
+       exists, err := s.cat.CheckTableExists(context.Background(), 
[]string{"ns", "tbl"})
+       s.Require().NoError(err)
+       s.False(exists)
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableNestedNamespace() {
+       ctx := context.Background()
+       s.Require().NoError(os.MkdirAll(filepath.Join(s.warehouse, "a", "b"), 
0o755))
+
+       tbl, err := s.cat.CreateTable(ctx, []string{"a", "b", "tbl"}, 
s.testSchema())
+       s.Require().NoError(err)
+       s.Equal(filepath.Join(s.warehouse, "a", "b", "tbl"), tbl.Location())
+
+       // Verify round-trip
+       loaded, err := s.cat.LoadTable(ctx, []string{"a", "b", "tbl"})
+       s.Require().NoError(err)
+       s.Equal(tbl.Metadata().TableUUID(), loaded.Metadata().TableUUID())
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableMetadataFormatVersion() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+       tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, 
s.testSchema())
+       s.Require().NoError(err)
+
+       // Default format version should be V2
+       s.Equal(2, tbl.Metadata().Version())
+}
+
+// CommitTable tests
+
+func (s *HadoopCatalogTestSuite) createTestTable(ns, name string) *table.Table 
{
+       ctx := context.Background()
+       s.Require().NoError(os.MkdirAll(filepath.Join(s.warehouse, ns), 0o755))
+
+       tbl, err := s.cat.CreateTable(ctx, []string{ns, name}, s.testSchema())
+       s.Require().NoError(err)
+
+       return tbl
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableSingleUpdate() {
+       ctx := context.Background()
+       tbl := s.createTestTable("ns", "tbl")
+
+       meta, metaLoc, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+               []table.Requirement{
+                       table.AssertTableUUID(tbl.Metadata().TableUUID()),
+               },
+               []table.Update{
+                       
table.NewSetPropertiesUpdate(iceberg.Properties{"test.key": "test.value"}),
+               },
+       )
+       s.Require().NoError(err)
+
+       // Version should have incremented to 2.
+       s.Contains(metaLoc, "v2.metadata.json")
+       s.Equal("test.value", meta.Properties()["test.key"])
+
+       // File should exist on disk.
+       s.FileExists(metaLoc)
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableMultipleSequential() {
+       ctx := context.Background()
+       tbl := s.createTestTable("ns", "tbl")
+       ident := []string{"ns", "tbl"}
+
+       for i := 0; i < 3; i++ {
+               loaded, err := s.cat.LoadTable(ctx, ident)
+               s.Require().NoError(err)
+
+               _, metaLoc, err := s.cat.CommitTable(ctx, ident,
+                       []table.Requirement{
+                               
table.AssertTableUUID(loaded.Metadata().TableUUID()),
+                       },
+                       []table.Update{
+                               table.NewSetPropertiesUpdate(iceberg.Properties{
+                                       fmt.Sprintf("iter.%d", i): "val",
+                               }),
+                       },
+               )
+               s.Require().NoError(err)
+               s.Contains(metaLoc, fmt.Sprintf("v%d.metadata.json", i+2))
+       }
+
+       // Final load should have all properties.
+       final, err := s.cat.LoadTable(ctx, ident)
+       s.Require().NoError(err)
+       s.Equal("val", final.Properties()["iter.0"])
+       s.Equal("val", final.Properties()["iter.1"])
+       s.Equal("val", final.Properties()["iter.2"])
+
+       // Version hint should reflect the latest version.
+       _ = tbl // suppress unused
+       s.Equal(4, s.cat.readVersionHint(ident))
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableNoChanges() {
+       ctx := context.Background()
+       tbl := s.createTestTable("ns", "tbl")
+
+       meta, metaLoc, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+               []table.Requirement{
+                       table.AssertTableUUID(tbl.Metadata().TableUUID()),
+               },
+               nil, // no updates
+       )
+       s.Require().NoError(err)
+
+       // Even with no updates, UpdateTableMetadata produces a new metadata
+       // object (different timestamp), so a new version is written.
+       s.Contains(metaLoc, "v2.metadata.json")
+       s.Equal(tbl.Metadata().TableUUID(), meta.TableUUID())
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableConflictDetection() {
+       // Conflict detection relies on os.Stat checking whether the target
+       // v{N+1}.metadata.json already exists before the atomic rename.
+       // In a single-threaded test we cannot trigger the TOCTOU race
+       // between findVersion and os.Stat. Instead, we verify that after
+       // multiple sequential commits, the version sequence is contiguous
+       // and no versions are skipped or duplicated.
+       ctx := context.Background()
+       s.createTestTable("ns", "tbl")
+       ident := []string{"ns", "tbl"}
+
+       for i := 2; i <= 5; i++ {
+               _, metaLoc, err := s.cat.CommitTable(ctx, ident,
+                       nil,
+                       []table.Update{
+                               table.NewSetPropertiesUpdate(iceberg.Properties{
+                                       fmt.Sprintf("v%d", i): "committed",
+                               }),
+                       },
+               )
+               s.Require().NoError(err)
+               s.Contains(metaLoc, fmt.Sprintf("v%d.metadata.json", i))
+               s.FileExists(s.cat.metadataFilePath(ident, i))
+       }
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableRequirementFailure() {
+       ctx := context.Background()
+       s.createTestTable("ns", "tbl")
+
+       wrongUUID := uuid.New()
+       _, _, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+               []table.Requirement{
+                       table.AssertTableUUID(wrongUUID),
+               },
+               []table.Update{
+                       table.NewSetPropertiesUpdate(iceberg.Properties{"k": 
"v"}),
+               },
+       )
+       s.Require().Error(err)
+       s.Contains(err.Error(), "UUID")
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableLocationChange() {
+       ctx := context.Background()
+       s.createTestTable("ns", "tbl")
+
+       _, _, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+               nil,
+               []table.Update{
+                       table.NewSetLocationUpdate("/some/other/location"),
+               },
+       )
+       s.Require().Error(err)
+       s.Contains(err.Error(), "location cannot be changed")
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableWriteMetadataLocation() {
+       ctx := context.Background()
+       s.createTestTable("ns", "tbl")
+
+       _, _, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+               nil,
+               []table.Update{
+                       table.NewSetPropertiesUpdate(iceberg.Properties{
+                               "write.metadata.location": 
"/custom/metadata/path",
+                       }),
+               },
+       )
+       s.Require().Error(err)
+       s.Contains(err.Error(), "write.metadata.location")
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableNoOrphanedTempFiles() {
+       ctx := context.Background()
+       s.createTestTable("ns", "tbl")
+       ident := []string{"ns", "tbl"}
+
+       // Do several commits and verify no temp files are left behind.
+       for i := 0; i < 3; i++ {
+               _, _, err := s.cat.CommitTable(ctx, ident,
+                       nil,
+                       []table.Update{
+                               table.NewSetPropertiesUpdate(iceberg.Properties{
+                                       fmt.Sprintf("iter.%d", i): "val",
+                               }),
+                       },
+               )
+               s.Require().NoError(err)
+       }
+
+       metaDir := s.cat.metadataDir(ident)
+       entries, err := os.ReadDir(metaDir)
+       s.Require().NoError(err)
+
+       for _, e := range entries {
+               if !e.IsDir() && !versionPattern.MatchString(e.Name()) && 
e.Name() != "version-hint.text" {
+                       s.Failf("orphaned temp file", "found orphaned file: 
%s", e.Name())
+               }
+       }
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableVersionHintUpdated() {
+       ctx := context.Background()
+       s.createTestTable("ns", "tbl")
+       ident := []string{"ns", "tbl"}
+
+       _, _, err := s.cat.CommitTable(ctx, ident,
+               nil,
+               []table.Update{
+                       table.NewSetPropertiesUpdate(iceberg.Properties{"k": 
"v"}),
+               },
+       )
+       s.Require().NoError(err)
+
+       s.Equal(2, s.cat.readVersionHint(ident))
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableCreateViaCommit() {
+       ctx := context.Background()
+       s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+       ident := []string{"ns", "tbl"}
+
+       loc := s.cat.defaultTableLocation(ident)
+       meta, metaLoc, err := s.cat.CommitTable(ctx, ident,
+               []table.Requirement{
+                       table.AssertCreate(),
+               },
+               []table.Update{
+                       table.NewAssignUUIDUpdate(uuid.New()),
+                       table.NewUpgradeFormatVersionUpdate(2),
+                       table.NewAddSchemaUpdate(s.testSchema()),
+                       table.NewSetCurrentSchemaUpdate(-1),
+                       table.NewSetLocationUpdate(loc),
+               },
+       )
+       s.Require().NoError(err)
+       s.Contains(metaLoc, "v1.metadata.json")
+       s.Equal(loc, meta.Location())
+
+       // Should be loadable.
+       loaded, err := s.cat.LoadTable(ctx, ident)
+       s.Require().NoError(err)
+       s.Equal(meta.TableUUID(), loaded.Metadata().TableUUID())
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableShortIdentifier() {
+       _, _, err := s.cat.CommitTable(context.Background(), []string{"tbl"}, 
nil, nil)
+       s.Require().Error(err)
+       s.Contains(err.Error(), "at least a namespace and table name")
+}
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index 7082becb..06942cf3 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
        "github.com/apache/iceberg-go/catalog/glue"
+       "github.com/apache/iceberg-go/catalog/hadoop"
        "github.com/apache/iceberg-go/catalog/hive"
        "github.com/apache/iceberg-go/catalog/rest"
        "github.com/apache/iceberg-go/config"
@@ -345,6 +346,12 @@ func initCatalog(ctx context.Context, args Args) 
catalog.Catalog {
                if cat, err = hive.NewCatalog(props); err != nil {
                        log.Fatal(err)
                }
+       case catalog.Hadoop:
+               if cat, err = hadoop.NewCatalog("hadoop", args.Warehouse, 
iceberg.Properties{
+                       "warehouse": args.Warehouse,
+               }); err != nil {
+                       log.Fatal(err)
+               }
        default:
                log.Fatal("unrecognized catalog type")
        }
diff --git a/table/properties.go b/table/properties.go
index 196a402e..6c5d04c1 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -26,6 +26,7 @@ import (
 const (
        WriteDataPathKey                        = "write.data.path"
        WriteMetadataPathKey                    = "write.metadata.path"
+       WriteMetadataLocationKey                = "write.metadata.location"
        WriteObjectStorePartitionedPathsKey     = 
"write.object-storage.partitioned-paths"
        WriteObjectStorePartitionedPathsDefault = true
        ObjectStoreEnabledKey                   = "write.object-storage.enabled"

Reply via email to