laskoviymishka commented on code in PR #1104:
URL: https://github.com/apache/iceberg-go/pull/1104#discussion_r3282065506


##########
catalog/hadoop/hadoop.go:
##########
@@ -561,6 +561,10 @@ func (c *Catalog) DropTable(_ context.Context, ident 
table.Identifier) error {
        return os.RemoveAll(tablePath)
 }
 
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier) 
error {

Review Comment:
   This delegates to `os.RemoveAll(tablePath)`, which means any data written to 
`write.data.path` or `write.metadata.path` outside the table root just survives 
the purge. The interface godoc explicitly claims "files written outside the 
root… are deleted", so Hadoop is silently violating its own contract. I'd at 
minimum load the table and run the same union-walk + bulk-delete path the other 
catalogs use, or call out the caveat in the Hadoop-specific godoc.



##########
catalog/internal/utils.go:
##########
@@ -241,3 +245,79 @@ func UpdateAndStageTable(ctx context.Context, current 
*table.Table, ident table.
                ),
        }, nil
 }
+
+func normalizeURI(uri string) string {
+       if strings.HasPrefix(uri, "file:") {
+               // Clean "file:/", "file://", "file:///" to all consistently 
have "file:///" prefix
+               cleaned := strings.TrimPrefix(uri, "file:")
+               cleaned = strings.TrimPrefix(cleaned, "//")
+               cleaned = strings.TrimPrefix(cleaned, "/")
+
+               return "file:///" + cleaned
+       }
+
+       return uri
+}
+
+// PurgeTableFiles physically deletes all files under the table's warehouse 
location
+// and any referenced files written outside the location root (e.g., via 
write.data.path
+// or write.metadata.path properties).

Review Comment:
   We're deleting data files unconditionally here, but Iceberg's spec gates 
this on the `gc.enabled` table property — Java's `CatalogUtil.dropTableData` 
and PyIceberg both refuse to purge data when `gc.enabled=false`, because the 
same physical files may be referenced by snapshotted clones or branched tables. 
On a `gc.enabled=false` table this PR will silently corrupt the other reader. 
I'd read `tbl.Metadata().Properties().Get("gc.enabled", "true")` at the top of 
`PurgeTableFiles` and short-circuit (or at least skip the data-file portion) 
when it's false. wdyt?



##########
catalog/internal/utils.go:
##########
@@ -241,3 +245,79 @@ func UpdateAndStageTable(ctx context.Context, current 
*table.Table, ident table.
                ),
        }, nil
 }

Review Comment:
   This only strips `file://` — so the LocalFS walk callback (which yields bare 
OS paths like `/tmp/foo`) and the metadata-side URIs (`file:///tmp/foo`) land 
in the set as two distinct keys. Today it's masked because the second `Remove` 
hits `ErrNoSuchFile` and we swallow it, but the union-dedup intent is broken, 
and for any cloud FS where `WalkDir` emits a different scheme form than the 
metadata we'll get spurious `BulkRemovableIO` errors. There's already a 
`normalizeFilePath` helper in `table/orphan_cleanup.go` that does full URL 
normalization across schemes — I'd reuse that instead of rolling our own here.



##########
catalog/internal/utils.go:
##########
@@ -241,3 +245,79 @@ func UpdateAndStageTable(ctx context.Context, current 
*table.Table, ident table.
                ),
        }, nil
 }
+
+func normalizeURI(uri string) string {
+       if strings.HasPrefix(uri, "file:") {
+               // Clean "file:/", "file://", "file:///" to all consistently 
have "file:///" prefix
+               cleaned := strings.TrimPrefix(uri, "file:")
+               cleaned = strings.TrimPrefix(cleaned, "//")
+               cleaned = strings.TrimPrefix(cleaned, "/")
+
+               return "file:///" + cleaned
+       }
+
+       return uri
+}
+
+// PurgeTableFiles physically deletes all files under the table's warehouse 
location
+// and any referenced files written outside the location root (e.g., via 
write.data.path

Review Comment:
   The godoc calls this "best-effort" but `errors.Join` propagates every 
per-file failure and the catalogs wrap it as "dropped table but failed to purge 
files". That's not best-effort — that's fail-loud, with no retry path because 
the catalog row is already gone. Either match Java/PyIceberg (log per-file 
failures, return nil so the catalog drop stays the source of truth) or keep 
fail-loud and return a structured `PurgeResult{CatalogDropped bool, FileErrors 
[]error}` so the caller can tell that the catalog state is durable. Right now 
the doc and the code disagree, which is the worst of both.



##########
catalog/internal/utils.go:
##########
@@ -170,6 +173,7 @@ func getDefaultWarehouseLocation(dbname, tablename string, 
nsprops, catprops ice
 // ([\w-]{36})      -> UUID (36 characters, including hyphens)
 // (?:\.\w+)?       -> optional codec name
 // \.metadata\.json -> file extension
+// var tableMetadataFileNameRegex = 
regexp.MustCompile(`^(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json`)

Review Comment:
   Stray commented-out `tableMetadataFileNameRegex` line that the diff left 
behind. All three of us stopped on it. Worth a quick clean.



##########
catalog/catalog.go:
##########
@@ -167,6 +167,26 @@ type TransactionalCatalog interface {
        CommitTransaction(ctx context.Context, commits []table.TableCommit) 
error
 }
 
+// PurgeableTable is an optional interface that catalogs can implement
+// to support physical table deletion (catalog entry + underlying files).
+// Callers should check for this capability via a type assertion:
+//
+//     if purger, ok := cat.(catalog.PurgeableTable); ok {
+//         err := purger.PurgeTable(ctx, ident)
+//     }
+//
+// For REST catalogs the purge is delegated server-side. For client-side
+// catalogs (SQL, Glue, Hive, Hadoop) the table is first dropped from
+// the catalog, then all files under the table's [table.Metadata.Location]
+// root, plus any referenced files written outside the root (e.g. via
+// write.data.path or write.metadata.path table properties), are deleted.
+// File-deletion errors are propagated to the caller, but because the
+// catalog entry is already removed at that point there is no automatic
+// retry path.
+type PurgeableTable interface {

Review Comment:
   The catalogs that intentionally satisfy this interface would benefit from a 
`var _ catalog.PurgeableTable = (*Catalog)(nil)` assertion (REST especially, 
since it's not exercised in the same test fixture). Cheap compile-time guard 
against someone renaming the method out from under the interface.



##########
cmd/iceberg/drop_test.go:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "context"
+       "errors"
+       "iter"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+type mockCatalogForDrop struct {
+       catalogType    catalog.Type
+       dropCalled     bool
+       dropIdent      table.Identifier
+       dropErr        error
+       checkExists    bool
+       checkExistsErr error
+}
+
+func (m *mockCatalogForDrop) CatalogType() catalog.Type {
+       return m.catalogType
+}
+
+func (m *mockCatalogForDrop) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) 
(*table.Table, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) CommitTable(ctx context.Context, identifier 
table.Identifier, requirements []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
+       return nil, "", nil
+}
+
+func (m *mockCatalogForDrop) ListTables(ctx context.Context, namespace 
table.Identifier) iter.Seq2[table.Identifier, error] {
+       return nil
+}
+
+func (m *mockCatalogForDrop) LoadTable(ctx context.Context, identifier 
table.Identifier) (*table.Table, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) DropTable(ctx context.Context, identifier 
table.Identifier) error {
+       m.dropCalled = true
+       m.dropIdent = identifier
+
+       return m.dropErr
+}
+
+func (m *mockCatalogForDrop) RenameTable(ctx context.Context, from, to 
table.Identifier) (*table.Table, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) CheckTableExists(ctx context.Context, identifier 
table.Identifier) (bool, error) {
+       return m.checkExists, m.checkExistsErr
+}
+
+func (m *mockCatalogForDrop) ListNamespaces(ctx context.Context, parent 
table.Identifier) ([]table.Identifier, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
+       return nil
+}
+
+func (m *mockCatalogForDrop) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
+       return nil
+}
+
+func (m *mockCatalogForDrop) CheckNamespaceExists(ctx context.Context, 
namespace table.Identifier) (bool, error) {
+       return false, nil
+}
+
+func (m *mockCatalogForDrop) LoadNamespaceProperties(ctx context.Context, 
namespace table.Identifier) (iceberg.Properties, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) UpdateNamespaceProperties(ctx context.Context, 
namespace table.Identifier, removals []string, updates iceberg.Properties) 
(catalog.PropertiesUpdateSummary, error) {
+       return catalog.PropertiesUpdateSummary{}, nil
+}
+
+type mockPurgeableCatalog struct {
+       mockCatalogForDrop
+       purgeCalled bool
+       purgeIdent  table.Identifier
+       purgeErr    error

Review Comment:
   `purgeErr` is declared on the mock but never used in a test case — I'd add a 
`TestRunDropTablePurgeError` that returns an error here and asserts the 
wrapping ("dropped table but failed to purge files") so the contract can't 
silently regress.



##########
catalog/sql/sql_test.go:
##########
@@ -684,6 +686,106 @@ func (s *SqliteCatalogTestSuite) TestDropTableNotExist() {
        }
 }
 
+func (s *SqliteCatalogTestSuite) TestPurgeTable() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+
+               schema := iceberg.NewSchema(1, iceberg.NestedField{
+                       ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String, Required: true,
+               })
+               tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID, 
schema)
+               s.Require().NoError(err)
+
+               // Append data to create data files and manifest files
+               arrowSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "foo", Type: arrow.BinaryTypes.String},
+               }, nil)
+
+               bldr := array.NewStringBuilder(memory.DefaultAllocator)
+               bldr.Append("bar")
+               arr := bldr.NewArray()
+
+               rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 1)
+               arrTable := array.NewTableFromRecords(arrowSchema, 
[]arrow.RecordBatch{rec})
+
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AppendTable(context.Background(), 
arrTable, 1024, nil))
+               tbl, err = tx.Commit(context.Background())
+               s.Require().NoError(err)
+
+               arr.Release()
+               bldr.Release()
+               rec.Release()
+               arrTable.Release()
+
+               metaLoc := strings.TrimPrefix(tbl.MetadataLocation(), "file://")
+               tableLoc := strings.TrimPrefix(tbl.Location(), "file://")
+               s.FileExists(metaLoc)
+
+               // Create a dummy statistics file at an external path outside 
the table Location()
+               externalStatsPath := filepath.Join(s.warehouse, 
"external-path", "stats.puffin")
+               
s.Require().NoError(os.MkdirAll(filepath.Dir(externalStatsPath), 0o755))
+               s.Require().NoError(os.WriteFile(externalStatsPath, 
[]byte("dummy puffin data"), 0o644))
+               s.FileExists(externalStatsPath)
+
+               // Add this external statistics file to the table metadata JSON
+               metaBytes, err := os.ReadFile(metaLoc)
+               s.Require().NoError(err)
+               var metaMap map[string]any
+               s.Require().NoError(json.Unmarshal(metaBytes, &metaMap))
+               metaMap["statistics"] = []any{
+                       map[string]any{
+                               "snapshot-id":               int64(1),

Review Comment:
   Hard-coding `"snapshot-id": int64(1)` here is fragile — if it ever drifts 
from `tbl.CurrentSnapshot().SnapshotID`, the test will still pass, but for the 
wrong reason (the external file gets removed by the walk rather than by the 
referenced-files path, which is what this test is supposed to exercise). I'd 
read the snapshot id off the loaded table instead.



##########
cmd/iceberg/main.go:
##########
@@ -537,10 +538,21 @@ func runDrop(ctx context.Context, output Output, cat 
catalog.Catalog, cmd *DropC
                        os.Exit(1)
                }
        case cmd.Table != nil:
-               err := cat.DropTable(ctx, 
catalog.ToIdentifier(cmd.Table.Identifier))
+               ident := catalog.ToIdentifier(cmd.Table.Identifier)
+               var err error
+               if cmd.Table.Purge {
+                       if purger, ok := cat.(catalog.PurgeableTable); ok {
+                               err = purger.PurgeTable(ctx, ident)
+                       } else {
+                               output.Error(fmt.Errorf("catalog %s does not 
support purge", cat.CatalogType()))
+                               osExit(1)

Review Comment:
   The purge branch uses `osExit(1)` (the swappable test hook), but the 
namespace branch right next to it still calls bare `os.Exit(1)`. Functionally 
fine today, but it's the kind of thing that makes future test additions land on 
a confusing failure mode. I'd make both use `osExit` for consistency.



##########
table/orphan_cleanup.go:
##########
@@ -225,10 +225,10 @@ func (t Table) executeOrphanCleanup(ctx context.Context, 
cfg *orphanCleanupConfi
        return result, nil
 }
 
-// getReferencedFiles collects all files referenced by table metadata: 
previous metadata
+// GetReferencedFiles collects all files referenced by table metadata: 
previous metadata
 // files, statistics and partition-statistics paths (Puffin, etc.), and all 
paths reachable
 // from current snapshots (manifest lists, manifests, data files).
-func (t Table) getReferencedFiles(fs iceio.IO) (map[string]bool, error) {
+func (t Table) GetReferencedFiles(fs iceio.IO) (map[string]bool, error) {

Review Comment:
   This is a public method on `Table` whose only caller is `catalog/internal`, 
takes an `iceio.IO` it'll panic-deref if the table has snapshots and `fs` is 
nil, and the returned `map[string]bool` has no documented contract about path 
normalization. I'd consider either folding this into a `Table.PurgeFiles(ctx)` 
that owns its own FS, or pushing it onto `Metadata` and documenting the nil-fs 
case. Exporting it as-is locks in an awkward surface for one internal caller.
   
   Also worth a note: this walk doesn't include deletion-vector blob paths. 
With #1100 just merged, V3 tables with DVs will leak `.dv` files on stores 
where `ListableIO` isn't available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to