This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 9d8ec298 feat(catalog): hadoop cli integration (#1034)
9d8ec298 is described below
commit 9d8ec298d32914007917df0c1599b1c059174f82
Author: Tanmay Rauth <[email protected]>
AuthorDate: Thu May 7 11:17:39 2026 -0700
feat(catalog): hadoop cli integration (#1034)
[7: CLI
integration](https://github.com/apache/iceberg-go/issues/798#issuecomment-4320784323)
Added a case catalog.Hadoop: branch to the catalog switch in
cmd/iceberg/main.go, wire the cfg.Warehouse property, and add the blank
import for catalog/hadoop.
---
README.md | 14 +-
catalog/hadoop/hadoop.go | 209 +++++++++++++++++--
catalog/hadoop/hadoop_test.go | 470 ++++++++++++++++++++++++++++++++++++++++++
cmd/iceberg/main.go | 7 +
table/properties.go | 1 +
5 files changed, 682 insertions(+), 19 deletions(-)
diff --git a/README.md b/README.md
index 36279345..b68e55df 100644
--- a/README.md
+++ b/README.md
@@ -112,17 +112,17 @@ make lint-install
| Operation | REST | Hive | Glue | SQL | Hadoop |
|:----------------------------|:----:|:----:|:------:|:----:|:------:|
-| Load Table | X | X | X | X | |
+| Load Table | X | X | X | X | X |
| List Tables | X | X | X | X | X |
-| Create Table | X | X | X | X | |
+| Create Table | X | X | X | X | X |
| Register Table | X | X | X | | |
-| Update Current Snapshot | X | X | X | X | |
-| Create New Snapshot | X | X | X | X | |
+| Update Current Snapshot | X | X | X | X | X |
+| Create New Snapshot | X | X | X | X | X |
| Rename Table | X | X | X | X | |
| Drop Table | X | X | X | X | X |
-| Alter Table | X | X | X | X | |
-| Check Table Exists | X | X | X | X | |
-| Set Table Properties | X | X | X | X | |
+| Alter Table | X | X | X | X | X |
+| Check Table Exists | X | X | X | X | X |
+| Set Table Properties | X | X | X | X | X |
| List Namespaces | X | X | X | X | X |
| Create Namespace | X | X | X | X | X |
| Check Namespace Exists | X | X | X | X | X |
diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go
index d3e7638f..cd1bacbd 100644
--- a/catalog/hadoop/hadoop.go
+++ b/catalog/hadoop/hadoop.go
@@ -33,6 +33,8 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/catalog/internal"
+ icebergio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/google/uuid"
)
@@ -296,12 +298,203 @@ func (c *Catalog) findVersion(ident table.Identifier)
(int, error) {
return c.scanForward(ident, maxVer), nil
}
-func (c *Catalog) CreateTable(_ context.Context, _ table.Identifier, _
*iceberg.Schema, _ ...catalog.CreateTableOpt) (*table.Table, error) {
- return nil, errors.New("hadoop catalog: CreateTable not yet
implemented")
+func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc
*iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) {
+ var cfg catalog.CreateTableCfg
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ if len(ident) < 2 {
+ return nil, errors.New("hadoop catalog: table identifier must
have at least a namespace and table name")
+ }
+
+ ns := catalog.NamespaceFromIdent(ident)
+ nsPath := c.namespaceToPath(ns)
+
+ info, err := os.Stat(nsPath)
+ if os.IsNotExist(err) || (err == nil && !info.IsDir()) {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace,
strings.Join(ns, "."))
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("hadoop catalog: failed to stat
namespace directory: %w", err)
+ }
+
+ loc := c.defaultTableLocation(ident)
+ if cfg.Location != "" && cfg.Location != loc {
+ return nil, errors.New("hadoop catalog: custom table locations
are not supported")
+ }
+
+ if isTableDir(loc) {
+ return nil, fmt.Errorf("%w: %s", catalog.ErrTableAlreadyExists,
strings.Join(ident, "."))
+ }
+
+ metadata, err := table.NewMetadata(sc, cfg.PartitionSpec,
cfg.SortOrder, loc, cfg.Properties)
+ if err != nil {
+ return nil, fmt.Errorf("hadoop catalog: failed to create table
metadata: %w", err)
+ }
+
+ metaDir := c.metadataDir(ident)
+ if err := os.MkdirAll(metaDir, 0o755); err != nil {
+ return nil, fmt.Errorf("hadoop catalog: failed to create
metadata directory: %w", err)
+ }
+
+ version := 1
+ metaPath := c.metadataFilePath(ident, version)
+ tempPath := filepath.Join(metaDir, uuid.New().String()+".metadata.json")
+
+ compression := table.MetadataCompressionDefault
+ if cfg.Properties != nil {
+ if v, ok := cfg.Properties[table.MetadataCompressionKey]; ok {
+ compression = v
+ }
+ }
+
+ if err := internal.WriteTableMetadata(metadata, icebergio.LocalFS{},
tempPath, compression); err != nil {
+ os.Remove(tempPath)
+
+ return nil, fmt.Errorf("hadoop catalog: failed to write table
metadata: %w", err)
+ }
+
+ if err := os.Rename(tempPath, metaPath); err != nil {
+ os.Remove(tempPath)
+
+ return nil, fmt.Errorf("hadoop catalog: failed to commit
metadata file: %w", err)
+ }
+
+ c.writeVersionHint(ident, version)
+
+ tbl := table.New(
+ ident,
+ metadata,
+ metaPath,
+ icebergio.LoadFSFunc(c.props, metaPath),
+ c,
+ )
+
+ return tbl, nil
+}
+
+func (c *Catalog) LoadTable(ctx context.Context, ident table.Identifier)
(*table.Table, error) {
+ if len(ident) < 2 {
+ return nil, errors.New("hadoop catalog: table identifier must
have at least a namespace and table name")
+ }
+
+ ver, err := c.findVersion(ident)
+ if err != nil {
+ return nil, err
+ }
+
+ metaPath := c.metadataFilePath(ident, ver)
+
+ return table.NewFromLocation(ctx, ident, metaPath,
icebergio.LoadFSFunc(c.props, metaPath), c)
+}
+
+func (c *Catalog) CheckTableExists(_ context.Context, ident table.Identifier)
(bool, error) {
+ if len(ident) < 2 {
+ return false, nil
+ }
+
+ return isTableDir(c.tableToPath(ident)), nil
}
-func (c *Catalog) CommitTable(_ context.Context, _ table.Identifier, _
[]table.Requirement, _ []table.Update) (table.Metadata, string, error) {
- return nil, "", errors.New("hadoop catalog: CommitTable not yet
implemented")
+func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier,
reqs []table.Requirement, updates []table.Update) (table.Metadata, string,
error) {
+ if len(ident) < 2 {
+ return nil, "", errors.New("hadoop catalog: table identifier
must have at least a namespace and table name")
+ }
+
+ // Step 1: Load current table (nil for create-via-commit).
+ current, err := c.LoadTable(ctx, ident)
+ if err != nil && !errors.Is(err, catalog.ErrNoSuchTable) {
+ return nil, "", err
+ }
+
+ // Step 2: Validate requirements against current metadata.
+ if current != nil {
+ for _, r := range reqs {
+ if err := r.Validate(current.Metadata()); err != nil {
+ return nil, "", err
+ }
+ }
+ }
+
+ // Step 3: Apply updates to produce new metadata.
+ var baseMeta table.Metadata
+ var currentMetadataLoc string
+
+ if current != nil {
+ baseMeta = current.Metadata()
+ currentMetadataLoc = current.MetadataLocation()
+ } else {
+ baseMeta, err = table.NewMetadata(iceberg.NewSchema(0), nil,
table.UnsortedSortOrder, "", nil)
+ if err != nil {
+ return nil, "", fmt.Errorf("hadoop catalog: failed to
create base metadata: %w", err)
+ }
+ }
+
+ updated, err := internal.UpdateTableMetadata(baseMeta, updates,
currentMetadataLoc)
+ if err != nil {
+ return nil, "", fmt.Errorf("hadoop catalog: failed to apply
updates: %w", err)
+ }
+
+ // Step 4: Validate table location has not changed.
+ if current != nil && updated.Location() != current.Location() {
+ return nil, "", errors.New("hadoop catalog: table location
cannot be changed")
+ }
+
+ // Step 5: Reject write.metadata.location property.
+ if v := updated.Properties().Get(table.WriteMetadataLocationKey, ""); v
!= "" {
+ return nil, "", fmt.Errorf("hadoop catalog: %s property is not
supported", table.WriteMetadataLocationKey)
+ }
+
+ // Step 6: Determine next version number.
+ var currentVersion int
+
+ if current != nil {
+ currentVersion, err = c.findVersion(ident)
+ if err != nil {
+ return nil, "", err
+ }
+ }
+
+ newVersion := currentVersion + 1
+
+ // Step 7: Create metadata directory if needed (create-via-commit).
+ metaDir := c.metadataDir(ident)
+ if err := os.MkdirAll(metaDir, 0o755); err != nil {
+ return nil, "", fmt.Errorf("hadoop catalog: failed to create
metadata directory: %w", err)
+ }
+
+ newMetaPath := c.metadataFilePath(ident, newVersion)
+ tempPath := filepath.Join(metaDir, uuid.New().String()+".metadata.json")
+
+ compression := updated.Properties().Get(table.MetadataCompressionKey,
table.MetadataCompressionDefault)
+
+ if err := internal.WriteTableMetadata(updated, icebergio.LocalFS{},
tempPath, compression); err != nil {
+ os.Remove(tempPath)
+
+ return nil, "", fmt.Errorf("hadoop catalog: failed to write
table metadata: %w", err)
+ }
+
+ // Conflict detection: target file must not already exist.
+ if _, err := os.Stat(newMetaPath); err == nil {
+ os.Remove(tempPath)
+
+ return nil, "", fmt.Errorf("hadoop catalog: version %d already
exists for table %s",
+ newVersion, strings.Join(ident, "."))
+ }
+
+ // Atomic commit via rename.
+ if err := os.Rename(tempPath, newMetaPath); err != nil {
+ os.Remove(tempPath)
+
+ return nil, "", fmt.Errorf("hadoop catalog: failed to commit
metadata file: %w", err)
+ }
+
+ // Step 8: Best-effort version hint update.
+ c.writeVersionHint(ident, newVersion)
+
+ return updated, newMetaPath, nil
}
func (c *Catalog) ListTables(_ context.Context, ns table.Identifier)
iter.Seq2[table.Identifier, error] {
@@ -371,14 +564,6 @@ func (c *Catalog) RenameTable(_ context.Context, _, _
table.Identifier) (*table.
return nil, errors.New("hadoop catalog: rename table is not supported")
}
-func (c *Catalog) LoadTable(_ context.Context, _ table.Identifier)
(*table.Table, error) {
- return nil, errors.New("hadoop catalog: LoadTable not yet implemented")
-}
-
-func (c *Catalog) CheckTableExists(_ context.Context, _ table.Identifier)
(bool, error) {
- return false, errors.New("hadoop catalog: CheckTableExists not yet
implemented")
-}
-
func (c *Catalog) CreateNamespace(_ context.Context, ns table.Identifier,
props iceberg.Properties) error {
if err := validateIdentifier(ns); err != nil {
return err
diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go
index f9a36ca9..dbc0a422 100644
--- a/catalog/hadoop/hadoop_test.go
+++ b/catalog/hadoop/hadoop_test.go
@@ -19,6 +19,7 @@ package hadoop
import (
"context"
+ "fmt"
"os"
"path/filepath"
"testing"
@@ -26,6 +27,7 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/table"
+ "github.com/google/uuid"
"github.com/stretchr/testify/suite"
)
@@ -902,3 +904,471 @@ func (s *HadoopCatalogTestSuite)
TestListTablesIdentifierIsolation() {
// Each identifier should be independent — no aliasing.
s.NotEqual(tables[0][len(tables[0])-1], tables[1][len(tables[1])-1])
}
+
+// CreateTable tests
+
+func (s *HadoopCatalogTestSuite) testSchema() *iceberg.Schema {
+ return iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTable() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"},
s.testSchema())
+ s.Require().NoError(err)
+ s.NotNil(tbl)
+
+ // Verify metadata directory and files exist
+ metaDir := filepath.Join(s.warehouse, "ns", "tbl", "metadata")
+ s.DirExists(metaDir)
+ s.FileExists(filepath.Join(metaDir, "v1.metadata.json"))
+
+ // Verify version hint
+ data, err := os.ReadFile(filepath.Join(metaDir, "version-hint.text"))
+ s.Require().NoError(err)
+ s.Equal("1", string(data))
+
+ // Verify metadata
+ s.Equal(filepath.Join(metaDir, "v1.metadata.json"),
tbl.MetadataLocation())
+ s.Equal(filepath.Join(s.warehouse, "ns", "tbl"), tbl.Location())
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableAndLoad() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ schema := s.testSchema()
+ created, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, schema)
+ s.Require().NoError(err)
+
+ loaded, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"})
+ s.Require().NoError(err)
+
+ s.Equal(created.Metadata().TableUUID(), loaded.Metadata().TableUUID())
+ s.Equal(created.Location(), loaded.Location())
+ s.Equal(len(created.Schema().Fields()), len(loaded.Schema().Fields()))
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableCustomLocation() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(),
+ catalog.WithLocation("/some/other/path"))
+ s.Require().Error(err)
+ s.Contains(err.Error(), "custom table locations are not supported")
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableSameLocationAllowed() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ loc := filepath.Join(s.warehouse, "ns", "tbl")
+ tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"},
s.testSchema(),
+ catalog.WithLocation(loc))
+ s.Require().NoError(err)
+ s.Equal(loc, tbl.Location())
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableNoNamespace() {
+ ctx := context.Background()
+
+ _, err := s.cat.CreateTable(ctx, []string{"nonexistent", "tbl"},
s.testSchema())
+ s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableAlreadyExists() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema())
+ s.Require().NoError(err)
+
+ _, err = s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema())
+ s.ErrorIs(err, catalog.ErrTableAlreadyExists)
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableWithPartitionSpec() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ spec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{
+ SourceIDs: []int{1},
+ FieldID: 1000,
+ Name: "id_bucket",
+ Transform: iceberg.BucketTransform{NumBuckets: 16},
+ },
+ )
+
+ tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"},
s.testSchema(),
+ catalog.WithPartitionSpec(&spec))
+ s.Require().NoError(err)
+ s.False(tbl.Spec().IsUnpartitioned())
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableWithSortOrder() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ sortOrder, err := table.NewSortOrder(1, []table.SortField{
+ {
+ SourceIDs: []int{1},
+ Transform: iceberg.IdentityTransform{},
+ Direction: table.SortASC,
+ NullOrder: table.NullsFirst,
+ },
+ })
+ s.Require().NoError(err)
+
+ tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"},
s.testSchema(),
+ catalog.WithSortOrder(sortOrder))
+ s.Require().NoError(err)
+ s.Greater(tbl.SortOrder().Len(), 0)
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableWithProperties() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ props := iceberg.Properties{
+ "custom.key": "custom.value",
+ }
+
+ tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"},
s.testSchema(),
+ catalog.WithProperties(props))
+ s.Require().NoError(err)
+ s.Equal("custom.value", tbl.Properties()["custom.key"])
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableShortIdentifier() {
+ ctx := context.Background()
+
+ _, err := s.cat.CreateTable(ctx, []string{"tbl"}, s.testSchema())
+ s.Require().Error(err)
+ s.Contains(err.Error(), "at least a namespace and table name")
+}
+
+// LoadTable tests
+
+func (s *HadoopCatalogTestSuite) TestLoadTableNotExists() {
+ ctx := context.Background()
+
+ _, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"})
+ s.Require().Error(err)
+ s.ErrorIs(err, catalog.ErrNoSuchTable)
+}
+
+func (s *HadoopCatalogTestSuite) TestLoadTableStaleHint() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ // Create table (version 1)
+ _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema())
+ s.Require().NoError(err)
+
+ // Manually set the hint to a stale value
+ ident := []string{"ns", "tbl"}
+ s.cat.writeVersionHint(ident, 99)
+
+ // LoadTable should still succeed by falling back to dir listing
+ tbl, err := s.cat.LoadTable(ctx, ident)
+ s.Require().NoError(err)
+ s.NotNil(tbl)
+}
+
+func (s *HadoopCatalogTestSuite) TestLoadTableShortIdentifier() {
+ ctx := context.Background()
+
+ _, err := s.cat.LoadTable(ctx, []string{"tbl"})
+ s.Require().Error(err)
+ s.Contains(err.Error(), "at least a namespace and table name")
+}
+
+// CheckTableExists tests
+
+func (s *HadoopCatalogTestSuite) TestCheckTableExistsTrue() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema())
+ s.Require().NoError(err)
+
+ exists, err := s.cat.CheckTableExists(ctx, []string{"ns", "tbl"})
+ s.Require().NoError(err)
+ s.True(exists)
+}
+
+func (s *HadoopCatalogTestSuite) TestCheckTableExistsFalse() {
+ exists, err := s.cat.CheckTableExists(context.Background(),
[]string{"ns", "tbl"})
+ s.Require().NoError(err)
+ s.False(exists)
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableNestedNamespace() {
+ ctx := context.Background()
+ s.Require().NoError(os.MkdirAll(filepath.Join(s.warehouse, "a", "b"),
0o755))
+
+ tbl, err := s.cat.CreateTable(ctx, []string{"a", "b", "tbl"},
s.testSchema())
+ s.Require().NoError(err)
+ s.Equal(filepath.Join(s.warehouse, "a", "b", "tbl"), tbl.Location())
+
+ // Verify round-trip
+ loaded, err := s.cat.LoadTable(ctx, []string{"a", "b", "tbl"})
+ s.Require().NoError(err)
+ s.Equal(tbl.Metadata().TableUUID(), loaded.Metadata().TableUUID())
+}
+
+func (s *HadoopCatalogTestSuite) TestCreateTableMetadataFormatVersion() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+
+ tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"},
s.testSchema())
+ s.Require().NoError(err)
+
+ // Default format version should be V2
+ s.Equal(2, tbl.Metadata().Version())
+}
+
+// CommitTable tests
+
+func (s *HadoopCatalogTestSuite) createTestTable(ns, name string) *table.Table
{
+ ctx := context.Background()
+ s.Require().NoError(os.MkdirAll(filepath.Join(s.warehouse, ns), 0o755))
+
+ tbl, err := s.cat.CreateTable(ctx, []string{ns, name}, s.testSchema())
+ s.Require().NoError(err)
+
+ return tbl
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableSingleUpdate() {
+ ctx := context.Background()
+ tbl := s.createTestTable("ns", "tbl")
+
+ meta, metaLoc, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+ []table.Requirement{
+ table.AssertTableUUID(tbl.Metadata().TableUUID()),
+ },
+ []table.Update{
+
table.NewSetPropertiesUpdate(iceberg.Properties{"test.key": "test.value"}),
+ },
+ )
+ s.Require().NoError(err)
+
+ // Version should have incremented to 2.
+ s.Contains(metaLoc, "v2.metadata.json")
+ s.Equal("test.value", meta.Properties()["test.key"])
+
+ // File should exist on disk.
+ s.FileExists(metaLoc)
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableMultipleSequential() {
+ ctx := context.Background()
+ tbl := s.createTestTable("ns", "tbl")
+ ident := []string{"ns", "tbl"}
+
+ for i := 0; i < 3; i++ {
+ loaded, err := s.cat.LoadTable(ctx, ident)
+ s.Require().NoError(err)
+
+ _, metaLoc, err := s.cat.CommitTable(ctx, ident,
+ []table.Requirement{
+
table.AssertTableUUID(loaded.Metadata().TableUUID()),
+ },
+ []table.Update{
+ table.NewSetPropertiesUpdate(iceberg.Properties{
+ fmt.Sprintf("iter.%d", i): "val",
+ }),
+ },
+ )
+ s.Require().NoError(err)
+ s.Contains(metaLoc, fmt.Sprintf("v%d.metadata.json", i+2))
+ }
+
+ // Final load should have all properties.
+ final, err := s.cat.LoadTable(ctx, ident)
+ s.Require().NoError(err)
+ s.Equal("val", final.Properties()["iter.0"])
+ s.Equal("val", final.Properties()["iter.1"])
+ s.Equal("val", final.Properties()["iter.2"])
+
+ // Version hint should reflect the latest version.
+ _ = tbl // suppress unused
+ s.Equal(4, s.cat.readVersionHint(ident))
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableNoChanges() {
+ ctx := context.Background()
+ tbl := s.createTestTable("ns", "tbl")
+
+ meta, metaLoc, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+ []table.Requirement{
+ table.AssertTableUUID(tbl.Metadata().TableUUID()),
+ },
+ nil, // no updates
+ )
+ s.Require().NoError(err)
+
+ // Even with no updates, UpdateTableMetadata produces a new metadata
+ // object (different timestamp), so a new version is written.
+ s.Contains(metaLoc, "v2.metadata.json")
+ s.Equal(tbl.Metadata().TableUUID(), meta.TableUUID())
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableConflictDetection() {
+ // Conflict detection relies on os.Stat checking whether the target
+ // v{N+1}.metadata.json already exists before the atomic rename.
+ // In a single-threaded test we cannot trigger the TOCTOU race
+ // between findVersion and os.Stat. Instead, we verify that after
+ // multiple sequential commits, the version sequence is contiguous
+ // and no versions are skipped or duplicated.
+ ctx := context.Background()
+ s.createTestTable("ns", "tbl")
+ ident := []string{"ns", "tbl"}
+
+ for i := 2; i <= 5; i++ {
+ _, metaLoc, err := s.cat.CommitTable(ctx, ident,
+ nil,
+ []table.Update{
+ table.NewSetPropertiesUpdate(iceberg.Properties{
+ fmt.Sprintf("v%d", i): "committed",
+ }),
+ },
+ )
+ s.Require().NoError(err)
+ s.Contains(metaLoc, fmt.Sprintf("v%d.metadata.json", i))
+ s.FileExists(s.cat.metadataFilePath(ident, i))
+ }
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableRequirementFailure() {
+ ctx := context.Background()
+ s.createTestTable("ns", "tbl")
+
+ wrongUUID := uuid.New()
+ _, _, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+ []table.Requirement{
+ table.AssertTableUUID(wrongUUID),
+ },
+ []table.Update{
+ table.NewSetPropertiesUpdate(iceberg.Properties{"k":
"v"}),
+ },
+ )
+ s.Require().Error(err)
+ s.Contains(err.Error(), "UUID")
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableLocationChange() {
+ ctx := context.Background()
+ s.createTestTable("ns", "tbl")
+
+ _, _, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+ nil,
+ []table.Update{
+ table.NewSetLocationUpdate("/some/other/location"),
+ },
+ )
+ s.Require().Error(err)
+ s.Contains(err.Error(), "location cannot be changed")
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableWriteMetadataLocation() {
+ ctx := context.Background()
+ s.createTestTable("ns", "tbl")
+
+ _, _, err := s.cat.CommitTable(ctx, []string{"ns", "tbl"},
+ nil,
+ []table.Update{
+ table.NewSetPropertiesUpdate(iceberg.Properties{
+ "write.metadata.location":
"/custom/metadata/path",
+ }),
+ },
+ )
+ s.Require().Error(err)
+ s.Contains(err.Error(), "write.metadata.location")
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableNoOrphanedTempFiles() {
+ ctx := context.Background()
+ s.createTestTable("ns", "tbl")
+ ident := []string{"ns", "tbl"}
+
+ // Do several commits and verify no temp files are left behind.
+ for i := 0; i < 3; i++ {
+ _, _, err := s.cat.CommitTable(ctx, ident,
+ nil,
+ []table.Update{
+ table.NewSetPropertiesUpdate(iceberg.Properties{
+ fmt.Sprintf("iter.%d", i): "val",
+ }),
+ },
+ )
+ s.Require().NoError(err)
+ }
+
+ metaDir := s.cat.metadataDir(ident)
+ entries, err := os.ReadDir(metaDir)
+ s.Require().NoError(err)
+
+ for _, e := range entries {
+ if !e.IsDir() && !versionPattern.MatchString(e.Name()) &&
e.Name() != "version-hint.text" {
+ s.Failf("orphaned temp file", "found orphaned file:
%s", e.Name())
+ }
+ }
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableVersionHintUpdated() {
+ ctx := context.Background()
+ s.createTestTable("ns", "tbl")
+ ident := []string{"ns", "tbl"}
+
+ _, _, err := s.cat.CommitTable(ctx, ident,
+ nil,
+ []table.Update{
+ table.NewSetPropertiesUpdate(iceberg.Properties{"k":
"v"}),
+ },
+ )
+ s.Require().NoError(err)
+
+ s.Equal(2, s.cat.readVersionHint(ident))
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableCreateViaCommit() {
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+ ident := []string{"ns", "tbl"}
+
+ loc := s.cat.defaultTableLocation(ident)
+ meta, metaLoc, err := s.cat.CommitTable(ctx, ident,
+ []table.Requirement{
+ table.AssertCreate(),
+ },
+ []table.Update{
+ table.NewAssignUUIDUpdate(uuid.New()),
+ table.NewUpgradeFormatVersionUpdate(2),
+ table.NewAddSchemaUpdate(s.testSchema()),
+ table.NewSetCurrentSchemaUpdate(-1),
+ table.NewSetLocationUpdate(loc),
+ },
+ )
+ s.Require().NoError(err)
+ s.Contains(metaLoc, "v1.metadata.json")
+ s.Equal(loc, meta.Location())
+
+ // Should be loadable.
+ loaded, err := s.cat.LoadTable(ctx, ident)
+ s.Require().NoError(err)
+ s.Equal(meta.TableUUID(), loaded.Metadata().TableUUID())
+}
+
+func (s *HadoopCatalogTestSuite) TestCommitTableShortIdentifier() {
+ _, _, err := s.cat.CommitTable(context.Background(), []string{"tbl"},
nil, nil)
+ s.Require().Error(err)
+ s.Contains(err.Error(), "at least a namespace and table name")
+}
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index 7082becb..06942cf3 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/catalog/glue"
+ "github.com/apache/iceberg-go/catalog/hadoop"
"github.com/apache/iceberg-go/catalog/hive"
"github.com/apache/iceberg-go/catalog/rest"
"github.com/apache/iceberg-go/config"
@@ -345,6 +346,12 @@ func initCatalog(ctx context.Context, args Args)
catalog.Catalog {
if cat, err = hive.NewCatalog(props); err != nil {
log.Fatal(err)
}
+ case catalog.Hadoop:
+ if cat, err = hadoop.NewCatalog("hadoop", args.Warehouse,
iceberg.Properties{
+ "warehouse": args.Warehouse,
+ }); err != nil {
+ log.Fatal(err)
+ }
default:
log.Fatal("unrecognized catalog type")
}
diff --git a/table/properties.go b/table/properties.go
index 196a402e..6c5d04c1 100644
--- a/table/properties.go
+++ b/table/properties.go
@@ -26,6 +26,7 @@ import (
const (
WriteDataPathKey = "write.data.path"
WriteMetadataPathKey = "write.metadata.path"
+ WriteMetadataLocationKey = "write.metadata.location"
WriteObjectStorePartitionedPathsKey =
"write.object-storage.partitioned-paths"
WriteObjectStorePartitionedPathsDefault = true
ObjectStoreEnabledKey = "write.object-storage.enabled"