This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 69fc3cdc feat(catalog): hadoop table and namespace CRUD operations
(#969)
69fc3cdc is described below
commit 69fc3cdc5b745971b7f77a9039a91bee83ddfb85
Author: Tanmay Rauth <[email protected]>
AuthorDate: Fri May 8 12:13:00 2026 -0700
feat(catalog): hadoop table and namespace CRUD operations (#969)
[4: CreateTable + LoadTable +
CheckTableExists](https://github.com/apache/iceberg-go/issues/798#issuecomment-4320784323)
Implement the three core table operations. CreateTable validates the
namespace exists, rejects custom locations, builds metadata via
table.NewMetadata, writes v1.metadata.json through a
temp-file-plus-rename pattern, and does a best-effort version-hint
write. LoadTable calls findVersion to get the current version, builds
the metadata path, and delegates to table.NewFromLocation.
CheckTableExists delegates to isTableDir. Tests cover create-and-load
round-trip, create with partition spec / sort order / properties, reject
custom location, create in non-existent namespace, create duplicate,
load non-existent, load with stale hint, and check exists true/false.
Depends on #968 #963
Relates to #798
---
catalog/hadoop/hadoop.go | 7 +-
catalog/hadoop/hadoop_integration_test.go | 84 +++++++++++++++++++++++-
catalog/hadoop/hadoop_test.go | 102 +++++++++++++++---------------
3 files changed, 137 insertions(+), 56 deletions(-)
diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go
index cd1bacbd..be7e8c24 100644
--- a/catalog/hadoop/hadoop.go
+++ b/catalog/hadoop/hadoop.go
@@ -94,7 +94,7 @@ func validateIdentifier(ident table.Identifier) error {
// Catalog is a filesystem-based Iceberg catalog that requires no external
// metastore. All state lives on disk as directories and versioned JSON
-// metadata files.
+// metadata files. Currently only local filesystem paths are supported.
type Catalog struct {
name string
warehouse string
@@ -102,8 +102,9 @@ type Catalog struct {
}
// NewCatalog creates a new Hadoop catalog rooted at the given warehouse path.
-// The warehouse directory is not created on construction; it is created
-// implicitly by the first CreateNamespace call.
+// Currently only local filesystem paths are supported. The warehouse directory
+// is not created on construction; it is created implicitly by the first
+// CreateNamespace call.
func NewCatalog(name, warehouse string, props iceberg.Properties) (*Catalog,
error) {
if warehouse == "" {
return nil, errors.New("hadoop catalog requires a warehouse
path")
diff --git a/catalog/hadoop/hadoop_integration_test.go
b/catalog/hadoop/hadoop_integration_test.go
index ca97c6f2..ffddf42d 100644
--- a/catalog/hadoop/hadoop_integration_test.go
+++ b/catalog/hadoop/hadoop_integration_test.go
@@ -26,6 +26,7 @@ import (
"strings"
"testing"
+ "github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/catalog/hadoop"
"github.com/apache/iceberg-go/internal/recipe"
@@ -44,8 +45,11 @@ type HadoopIntegrationSuite struct {
}
func (s *HadoopIntegrationSuite) SetupSuite() {
- // Use the same warehouse path as docker-compose and
hadoop_validation.py
- // so that both Go tests and Spark see the same directory.
+ // Use the same warehouse path that Spark is configured with inside the
+ // Docker container (see hadoop_validation.py and docker-compose.yml).
+ // The docker-compose volume mount maps host
/tmp/iceberg-hadoop-warehouse
+ // to container /tmp/iceberg-hadoop-warehouse, so both Go (on host) and
+ // Spark (in container) share the same directory.
s.warehouse = "/tmp/iceberg-hadoop-warehouse"
s.Require().NoError(os.MkdirAll(s.warehouse, 0o755))
@@ -228,6 +232,82 @@ func (s *HadoopIntegrationSuite)
TestDropTableThenListReflects() {
s.Equal("keep", tables[0][len(tables[0])-1])
}
+// TestGoCreateSparkDescribe creates a table from Go, then asks Spark to
+// DESCRIBE it and verifies that Spark sees the expected columns.
+func (s *HadoopIntegrationSuite) TestGoCreateSparkDescribe() {
+ // Create namespace from Go so the directory is owned by the runner
+ // process and Go can create table subdirectories inside it.
+ err := s.cat.CreateNamespace(s.ctx, []string{"describe_ns"}, nil)
+ s.Require().NoError(err)
+
+ schema := iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ tbl, err := s.cat.CreateTable(s.ctx, []string{"describe_ns",
"go_created_table"}, schema)
+ s.Require().NoError(err)
+ s.NotNil(tbl)
+
+ output := s.sparkSQL("DESCRIBE TABLE
hadoop_test.describe_ns.go_created_table")
+ s.Contains(output, "id")
+ s.Contains(output, "name")
+}
+
+// TestSparkCreateGoLoad creates a table from Spark, then loads it from
+// Go and verifies schema and metadata are consistent.
+func (s *HadoopIntegrationSuite) TestSparkCreateGoLoad() {
+ // Spark creates both namespace and table — Go only reads here.
+ s.sparkSQL("CREATE NAMESPACE IF NOT EXISTS hadoop_test.load_ns")
+ s.sparkSQL("CREATE TABLE hadoop_test.load_ns.spark_created_table (age
INT, city STRING) USING iceberg")
+
+ tbl, err := s.cat.LoadTable(s.ctx, []string{"load_ns",
"spark_created_table"})
+ s.Require().NoError(err)
+ s.NotNil(tbl)
+
+ fields := tbl.Schema().Fields()
+ fieldNames := make([]string, len(fields))
+ for i, f := range fields {
+ fieldNames[i] = f.Name
+ }
+
+ s.Contains(fieldNames, "age")
+ s.Contains(fieldNames, "city")
+}
+
+// TestGoCreateSparkSelect creates a table from Go, then runs a SELECT
+// from Spark to confirm the table is queryable (should return empty).
+func (s *HadoopIntegrationSuite) TestGoCreateSparkSelect() {
+ err := s.cat.CreateNamespace(s.ctx, []string{"select_ns"}, nil)
+ s.Require().NoError(err)
+
+ schema := iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "value", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+
+ _, err = s.cat.CreateTable(s.ctx, []string{"select_ns",
"go_select_table"}, schema)
+ s.Require().NoError(err)
+
+ output := s.sparkSQL("SELECT * FROM
hadoop_test.select_ns.go_select_table")
+ s.Contains(output, "value")
+}
+
+// TestGoCheckTableExistsForSparkTable verifies that CheckTableExists
+// returns true for a table created by Spark.
+func (s *HadoopIntegrationSuite) TestGoCheckTableExistsForSparkTable() {
+ // Spark creates both namespace and table — Go only checks existence.
+ s.sparkSQL("CREATE NAMESPACE IF NOT EXISTS hadoop_test.exists_ns")
+ s.sparkSQL("CREATE TABLE hadoop_test.exists_ns.spark_exists_table (x
INT) USING iceberg")
+
+ exists, err := s.cat.CheckTableExists(s.ctx, []string{"exists_ns",
"spark_exists_table"})
+ s.Require().NoError(err)
+ s.True(exists)
+
+ exists, err = s.cat.CheckTableExists(s.ctx, []string{"exists_ns",
"no_such_table"})
+ s.Require().NoError(err)
+ s.False(exists)
+}
+
func TestHadoopIntegration(t *testing.T) {
suite.Run(t, new(HadoopIntegrationSuite))
}
diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go
index dbc0a422..f31fc398 100644
--- a/catalog/hadoop/hadoop_test.go
+++ b/catalog/hadoop/hadoop_test.go
@@ -854,57 +854,6 @@ func (s *HadoopCatalogTestSuite)
TestDropTableShortIdentifier() {
s.Contains(err.Error(), "at least a namespace and table name")
}
-func (s *HadoopCatalogTestSuite) TestDropTableNamespacePreserved() {
- // Dropping a table should not affect the parent namespace directory.
- ctx := context.Background()
- s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
- s.createFakeTable([]string{"ns", "tbl"})
-
- s.Require().NoError(s.cat.DropTable(ctx, []string{"ns", "tbl"}))
-
- // Namespace dir should still exist.
- info, err := os.Stat(filepath.Join(s.warehouse, "ns"))
- s.Require().NoError(err)
- s.True(info.IsDir())
-}
-
-// RenameTable tests
-
-func (s *HadoopCatalogTestSuite) TestRenameTableUnsupported() {
- _, err := s.cat.RenameTable(context.Background(), []string{"ns",
"old"}, []string{"ns", "new"})
- s.Require().Error(err)
- s.Contains(err.Error(), "not supported")
-}
-
-func (s *HadoopCatalogTestSuite) TestListTablesNamespaceIsFile() {
- ctx := context.Background()
- s.Require().NoError(os.WriteFile(filepath.Join(s.warehouse,
"not_a_ns"), nil, 0o644))
-
- for _, err := range s.cat.ListTables(ctx, []string{"not_a_ns"}) {
- s.ErrorIs(err, catalog.ErrNoSuchNamespace)
-
- break
- }
-}
-
-func (s *HadoopCatalogTestSuite) TestListTablesIdentifierIsolation() {
- // Verify that yielded identifiers don't alias the namespace slice.
- ctx := context.Background()
- s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
- s.createFakeTable([]string{"ns", "tbl1"})
- s.createFakeTable([]string{"ns", "tbl2"})
-
- var tables []table.Identifier
- for ident, err := range s.cat.ListTables(ctx, []string{"ns"}) {
- s.Require().NoError(err)
- tables = append(tables, ident)
- }
-
- s.Len(tables, 2)
- // Each identifier should be independent — no aliasing.
- s.NotEqual(tables[0][len(tables[0])-1], tables[1][len(tables[1])-1])
-}
-
// CreateTable tests
func (s *HadoopCatalogTestSuite) testSchema() *iceberg.Schema {
@@ -1053,6 +1002,57 @@ func (s *HadoopCatalogTestSuite)
TestCreateTableShortIdentifier() {
s.Contains(err.Error(), "at least a namespace and table name")
}
+func (s *HadoopCatalogTestSuite) TestDropTableNamespacePreserved() {
+ // Dropping a table should not affect the parent namespace directory.
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+ s.createFakeTable([]string{"ns", "tbl"})
+
+ s.Require().NoError(s.cat.DropTable(ctx, []string{"ns", "tbl"}))
+
+ // Namespace dir should still exist.
+ info, err := os.Stat(filepath.Join(s.warehouse, "ns"))
+ s.Require().NoError(err)
+ s.True(info.IsDir())
+}
+
+// RenameTable tests
+
+func (s *HadoopCatalogTestSuite) TestRenameTableUnsupported() {
+ _, err := s.cat.RenameTable(context.Background(), []string{"ns",
"old"}, []string{"ns", "new"})
+ s.Require().Error(err)
+ s.Contains(err.Error(), "not supported")
+}
+
+func (s *HadoopCatalogTestSuite) TestListTablesNamespaceIsFile() {
+ ctx := context.Background()
+ s.Require().NoError(os.WriteFile(filepath.Join(s.warehouse,
"not_a_ns"), nil, 0o644))
+
+ for _, err := range s.cat.ListTables(ctx, []string{"not_a_ns"}) {
+ s.ErrorIs(err, catalog.ErrNoSuchNamespace)
+
+ break
+ }
+}
+
+func (s *HadoopCatalogTestSuite) TestListTablesIdentifierIsolation() {
+ // Verify that yielded identifiers don't alias the namespace slice.
+ ctx := context.Background()
+ s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))
+ s.createFakeTable([]string{"ns", "tbl1"})
+ s.createFakeTable([]string{"ns", "tbl2"})
+
+ var tables []table.Identifier
+ for ident, err := range s.cat.ListTables(ctx, []string{"ns"}) {
+ s.Require().NoError(err)
+ tables = append(tables, ident)
+ }
+
+ s.Len(tables, 2)
+ // Each identifier should be independent — no aliasing.
+ s.NotEqual(tables[0][len(tables[0])-1], tables[1][len(tables[1])-1])
+}
+
// LoadTable tests
func (s *HadoopCatalogTestSuite) TestLoadTableNotExists() {