zeroshade commented on code in PR #1078:
URL: https://github.com/apache/iceberg-go/pull/1078#discussion_r3523598465
##########
io/gocloud/blob.go:
##########
@@ -127,27 +198,25 @@ func (bfs *blobFileIO) Open(path string) (icebergio.File,
error) {
}
func (bfs *blobFileIO) Remove(name string) error {
- var err error
- name, err = bfs.preprocess(name)
+ bucket, key, err := bfs.resolveBucket(name)
if err != nil {
return &fs.PathError{Op: "remove", Path: name, Err: err}
}
- return bfs.Delete(bfs.ctx, name)
+ return bucket.Delete(bfs.ctx, key)
Review Comment:
Blocker: returning `bucket.Delete(...)` directly drops the existing
`gcerrors.NotFound` to `fs.ErrNotExist` mapping for `Remove`. That changes the
FileIO not-found semantics. Please preserve the mapping after routing to the
resolved bucket.
##########
io/gocloud/blob.go:
##########
@@ -102,23 +115,81 @@ type blobFileIO struct {
newRangeReader func(ctx context.Context, key string, offset, length
int64) (io.ReadCloser, error)
}
+// resolveBucket parses path into a bucket handle and object key. If the URI
+// references the primary bucket (or has no scheme), the primary bucket is
+// returned without any additional allocation. URIs that reference a different
+// bucket are resolved via BucketOpener and cached for reuse.
+func (bfs *blobFileIO) resolveBucket(path string) (*blob.Bucket, string,
error) {
+ _, after, hasScheme := strings.Cut(path, "://")
+ if !hasScheme {
+ // No scheme: treat as a key in the primary bucket (legacy
behavior).
+ key, err := bfs.keyExtractor(path)
+
+ return bfs.Bucket, key, err
+ }
+
+ bucketName, key, _ := strings.Cut(after, "/")
+ if key == "" {
+ return nil, "", fmt.Errorf("URI path is empty: %s", path)
+ }
+
+ // Fast path: primary bucket.
+ if bucketName == bfs.primaryBucket {
+ return bfs.Bucket, key, nil
+ }
+
+ // Secondary bucket: check cache first.
+ if bfs.bucketOpener == nil {
+ // No opener configured: fall back to primary bucket with
legacy key extraction.
+ // This preserves backward compatibility for callers that don't
set a BucketOpener.
+ key, err := bfs.keyExtractor(path)
+
+ return bfs.Bucket, key, err
+ }
+
+ bfs.mu.RLock()
+ b, ok := bfs.buckets[bucketName]
+ bfs.mu.RUnlock()
+ if ok {
+ return b, key, nil
+ }
+
+ // Open a new bucket handle.
+ b, err := bfs.bucketOpener(bfs.ctx, bucketName)
+ if err != nil {
+ return nil, "", fmt.Errorf("failed to open bucket %q: %w",
bucketName, err)
+ }
+
+ bfs.mu.Lock()
+ // Double-check: another goroutine may have opened it concurrently.
+ if existing, ok := bfs.buckets[bucketName]; ok {
+ bfs.mu.Unlock()
+ _ = b.Close()
+
+ return existing, key, nil
+ }
+ bfs.buckets[bucketName] = b
+ bfs.mu.Unlock()
+
+ return b, key, nil
+}
+
func (bfs *blobFileIO) preprocess(path string) (string, error) {
return bfs.keyExtractor(path)
}
func (bfs *blobFileIO) Open(path string) (icebergio.File, error) {
- var err error
- path, err = bfs.preprocess(path)
+ bucket, key, err := bfs.resolveBucket(path)
if err != nil {
return nil, &fs.PathError{Op: "open", Path: path, Err: err}
}
- if !fs.ValidPath(path) {
- return nil, &fs.PathError{Op: "open", Path: path, Err:
fs.ErrInvalid}
+ if !fs.ValidPath(key) {
+ return nil, &fs.PathError{Op: "open", Path: key, Err:
fs.ErrInvalid}
}
- key, name := path, filepath.Base(path)
+ name := filepath.Base(key)
- r, err := bfs.NewReader(bfs.ctx, key, nil)
+ r, err := bucket.NewReader(bfs.ctx, key, nil)
Review Comment:
Blocker: `Open` resolves the correct bucket for the initial reader, but the
returned `blobOpenFile` still stores `b: bfs`, and `blobOpenFile.ReadAt` uses
`f.b.NewRangeReader`. Range reads for Parquet/Avro/Puffin files in a secondary
bucket will still go through the primary bucket and can 404 or read the wrong
object. Please store the resolved bucket on `blobOpenFile`, use it in `ReadAt`,
and add a cross-bucket `ReadAt` test.
##########
io/gocloud/blob.go:
##########
@@ -102,23 +115,81 @@ type blobFileIO struct {
newRangeReader func(ctx context.Context, key string, offset, length
int64) (io.ReadCloser, error)
}
+// resolveBucket parses path into a bucket handle and object key. If the URI
+// references the primary bucket (or has no scheme), the primary bucket is
+// returned without any additional allocation. URIs that reference a different
+// bucket are resolved via BucketOpener and cached for reuse.
+func (bfs *blobFileIO) resolveBucket(path string) (*blob.Bucket, string,
error) {
+ _, after, hasScheme := strings.Cut(path, "://")
Review Comment:
Blocker: bucket resolution splits on `://` and then routes only by bucket
name, ignoring the URI scheme. That means a `gs://metadata-bucket/key` passed
to an S3 FileIO can be treated as an S3 bucket, which conflicts with the strict
scheme/authority validation merged in #1259. Please parse and validate scheme
plus authority, preferably by reusing the object-location parser from #1259, or
route through the global IO registry by full URI.
##########
io/gocloud/blob.go:
##########
@@ -159,37 +228,53 @@ func (bfs *blobFileIO) WriteFile(name string, content
[]byte) error {
// The caller must call Close on the returned Writer, even if the write is
// aborted.
func (bfs *blobFileIO) NewWriter(ctx context.Context, path string, overwrite
bool, opts *blob.WriterOptions) (w *blobWriteFile, err error) {
- path, err = bfs.preprocess(path)
+ bucket, key, err := bfs.resolveBucket(path)
if err != nil {
return nil, &fs.PathError{Op: "new writer", Path: path, Err:
err}
}
- if !fs.ValidPath(path) {
- return nil, &fs.PathError{Op: "new writer", Path: path, Err:
fs.ErrInvalid}
+ if !fs.ValidPath(key) {
+ return nil, &fs.PathError{Op: "new writer", Path: key, Err:
fs.ErrInvalid}
}
if !overwrite {
- if exists, err := bfs.Exists(ctx, path); err != nil || exists {
+ if exists, err := bucket.Exists(ctx, key); err != nil || exists
{
if err != nil {
- return nil, &fs.PathError{Op: "new writer",
Path: path, Err: err}
+ return nil, &fs.PathError{Op: "new writer",
Path: key, Err: err}
}
- return nil, &fs.PathError{Op: "new writer", Path: path,
Err: fs.ErrInvalid}
+ return nil, &fs.PathError{Op: "new writer", Path: key,
Err: fs.ErrInvalid}
}
}
- bw, err := bfs.Bucket.NewWriter(ctx, path, opts)
+ bw, err := bucket.NewWriter(ctx, key, opts)
if err != nil {
return nil, err
}
return &blobWriteFile{
Writer: bw,
- name: path,
+ name: key,
},
nil
}
func createBlobFS(ctx context.Context, bucket *blob.Bucket, keyExtractor
KeyExtractor) icebergio.IO {
- return &blobFileIO{Bucket: bucket, keyExtractor: keyExtractor, ctx: ctx}
+ return &blobFileIO{
+ Bucket: bucket,
+ keyExtractor: keyExtractor,
+ ctx: ctx,
+ buckets: make(map[string]*blob.Bucket),
+ }
+}
+
+func createMultiBucketBlobFS(ctx context.Context, bucket *blob.Bucket,
primaryBucket string, opener BucketOpener) icebergio.IO {
+ return &blobFileIO{
+ Bucket: bucket,
+ primaryBucket: primaryBucket,
+ bucketOpener: opener,
+ buckets: make(map[string]*blob.Bucket),
+ keyExtractor: defaultKeyExtractor(primaryBucket),
+ ctx: ctx,
+ }
}
func (bfs *blobFileIO) WalkDir(root string, fn fs.WalkDirFunc) error {
Review Comment:
High: `WalkDir` still walks `bfs.Bucket`, so secondary-bucket metadata paths
are not routed through `resolveBucket`. Cross-bucket support is also currently
S3-only, and cached secondary buckets are never closed. Please route `WalkDir`
through the resolved bucket and add a `Close` path that closes cached bucket
handles.
##########
io/gocloud/blob_test.go:
##########
@@ -379,3 +382,192 @@ func TestBlobFileIODeleteFilesEmpty(t *testing.T) {
require.NoError(t, err)
assert.Nil(t, deleted)
}
+
+// TestMultiBucketWriteAndRead verifies that blobFileIO routes reads and writes
+// to the correct bucket when a URI references a bucket different from the
+// primary one. This is the scenario triggered by Iceberg's write.metadata.path
+// table property pointing to a dedicated metadata bucket while data lives in
+// the warehouse bucket.
+func TestMultiBucketWriteAndRead(t *testing.T) {
+ ctx := context.Background()
+
+ warehouseBucket := memblob.OpenBucket(nil)
+ defer warehouseBucket.Close()
+ metadataBucket := memblob.OpenBucket(nil)
+ defer metadataBucket.Close()
+
+ bfs := &blobFileIO{
+ Bucket: warehouseBucket,
+ primaryBucket: "warehouse",
+ bucketOpener: func(_ context.Context, name string)
(*blob.Bucket, error) {
+ if name == "metadata" {
+ return metadataBucket, nil
+ }
+
+ return nil, fmt.Errorf("unknown bucket: %s", name)
+ },
+ buckets: make(map[string]*blob.Bucket),
+ keyExtractor: defaultKeyExtractor("warehouse"),
+ ctx: ctx,
+ }
+
+ // Write a data file to the primary (warehouse) bucket.
+ require.NoError(t,
bfs.WriteFile("s3://warehouse/db/table/data/file.parquet",
[]byte("data-content")))
+
+ // Write a manifest list to the metadata bucket (simulates
write.metadata.path).
+ require.NoError(t,
bfs.WriteFile("s3://metadata/db/table/snap-123.avro", []byte("manifest-list")))
+
+ // Verify the data file landed in the warehouse bucket, not the
metadata bucket.
+ exists, err := warehouseBucket.Exists(ctx, "db/table/data/file.parquet")
+ require.NoError(t, err)
+ assert.True(t, exists, "data file should exist in warehouse bucket")
+
+ exists, err = metadataBucket.Exists(ctx, "db/table/data/file.parquet")
+ require.NoError(t, err)
+ assert.False(t, exists, "data file should NOT exist in metadata bucket")
+
+ // Verify the manifest list landed in the metadata bucket, not the
warehouse bucket.
+ exists, err = metadataBucket.Exists(ctx, "db/table/snap-123.avro")
+ require.NoError(t, err)
+ assert.True(t, exists, "manifest list should exist in metadata bucket")
+
+ exists, err = warehouseBucket.Exists(ctx, "db/table/snap-123.avro")
+ require.NoError(t, err)
+ assert.False(t, exists, "manifest list should NOT exist in warehouse
bucket")
+
+ // Read back via Open (uses resolveBucket).
+ f, err := bfs.Open("s3://metadata/db/table/snap-123.avro")
+ require.NoError(t, err)
+ defer f.Close()
+ content, err := io.ReadAll(f)
+ require.NoError(t, err)
+ assert.Equal(t, "manifest-list", string(content))
+
+ // Read from the primary bucket.
+ f2, err := bfs.Open("s3://warehouse/db/table/data/file.parquet")
+ require.NoError(t, err)
+ defer f2.Close()
+ content2, err := io.ReadAll(f2)
+ require.NoError(t, err)
+ assert.Equal(t, "data-content", string(content2))
+}
+
+// TestMultiBucketDelete verifies that Remove and DeleteFiles route to the
+// correct bucket based on the URI.
+func TestMultiBucketDelete(t *testing.T) {
+ ctx := context.Background()
+
+ warehouseBucket := memblob.OpenBucket(nil)
+ defer warehouseBucket.Close()
+ metadataBucket := memblob.OpenBucket(nil)
+ defer metadataBucket.Close()
+
+ bfs := &blobFileIO{
+ Bucket: warehouseBucket,
+ primaryBucket: "warehouse",
+ bucketOpener: func(_ context.Context, name string)
(*blob.Bucket, error) {
+ if name == "metadata" {
+ return metadataBucket, nil
+ }
+
+ return nil, fmt.Errorf("unknown bucket: %s", name)
+ },
+ buckets: make(map[string]*blob.Bucket),
+ keyExtractor: defaultKeyExtractor("warehouse"),
+ ctx: ctx,
+ }
+
+ // Seed files in both buckets.
+ require.NoError(t, warehouseBucket.WriteAll(ctx, "data/file.parquet",
[]byte("d"), nil))
+ require.NoError(t, metadataBucket.WriteAll(ctx, "snap-456.avro",
[]byte("m"), nil))
+
+ // Remove from metadata bucket.
+ require.NoError(t, bfs.Remove("s3://metadata/snap-456.avro"))
+ exists, err := metadataBucket.Exists(ctx, "snap-456.avro")
+ require.NoError(t, err)
+ assert.False(t, exists, "metadata file should be deleted")
+
+ // Warehouse file should be untouched.
+ exists, err = warehouseBucket.Exists(ctx, "data/file.parquet")
+ require.NoError(t, err)
+ assert.True(t, exists, "warehouse file should still exist")
+
+ // DeleteFiles across both buckets.
+ require.NoError(t, warehouseBucket.WriteAll(ctx, "old-manifest.avro",
[]byte("x"), nil))
+ require.NoError(t, metadataBucket.WriteAll(ctx, "old-snap.avro",
[]byte("y"), nil))
+
+ deleted, err := bfs.DeleteFiles(ctx, []string{
+ "s3://warehouse/old-manifest.avro",
+ "s3://metadata/old-snap.avro",
+ })
+ require.NoError(t, err)
+ assert.Len(t, deleted, 2)
+}
+
+// TestMultiBucketOpenerCaching verifies that the bucket opener is called only
+// once per bucket name, and subsequent requests reuse the cached handle.
+func TestMultiBucketOpenerCaching(t *testing.T) {
+ ctx := context.Background()
+
+ warehouseBucket := memblob.OpenBucket(nil)
+ defer warehouseBucket.Close()
+ metadataBucket := memblob.OpenBucket(nil)
+ defer metadataBucket.Close()
+
+ openCount := 0
+ bfs := &blobFileIO{
+ Bucket: warehouseBucket,
+ primaryBucket: "warehouse",
+ bucketOpener: func(_ context.Context, name string)
(*blob.Bucket, error) {
+ openCount++
+ if name == "metadata" {
+ return metadataBucket, nil
+ }
+
+ return nil, fmt.Errorf("unknown bucket: %s", name)
+ },
+ buckets: make(map[string]*blob.Bucket),
+ keyExtractor: defaultKeyExtractor("warehouse"),
+ ctx: ctx,
+ }
+
+ require.NoError(t, metadataBucket.WriteAll(ctx, "file1.avro",
[]byte("a"), nil))
+ require.NoError(t, metadataBucket.WriteAll(ctx, "file2.avro",
[]byte("b"), nil))
+
+ // Two reads from the same secondary bucket.
+ f1, err := bfs.Open("s3://metadata/file1.avro")
+ require.NoError(t, err)
+ f1.Close()
+
+ f2, err := bfs.Open("s3://metadata/file2.avro")
+ require.NoError(t, err)
+ f2.Close()
+
+ assert.Equal(t, 1, openCount, "bucket opener should be called exactly
once for 'metadata'")
+}
+
+// TestMultiBucketFallbackWithoutOpener verifies backward compatibility: when
+// no BucketOpener is configured, cross-bucket URIs fall back to the primary
+// bucket with the legacy key extractor.
+func TestMultiBucketFallbackWithoutOpener(t *testing.T) {
+ ctx := context.Background()
+
+ bucket := memblob.OpenBucket(nil)
+ defer bucket.Close()
+
+ // No bucketOpener set: uses legacy createBlobFS path.
+ bfs := createBlobFS(ctx, bucket, defaultKeyExtractor("warehouse"))
+
+ // Write using a URI with a different bucket name. Without an opener,
+ // the key extractor strips "://other-bucket/" and the file ends up
+ // in the primary bucket under a mangled key (legacy behavior).
+ wfs := bfs.(icebergio.WriteFileIO)
+ require.NoError(t, wfs.WriteFile(
+ "s3://other-bucket/path/file.txt", []byte("hello")))
+
+ // The file lands in the primary bucket with the full
"other-bucket/path/file.txt" key
+ // because TrimPrefix("other-bucket/...", "warehouse/") is a no-op.
+ exists, err := bucket.Exists(ctx, "other-bucket/path/file.txt")
+ require.NoError(t, err)
+ assert.True(t, exists, "without opener, cross-bucket URI should fall
back to primary bucket with mangled key")
Review Comment:
Medium: this test locks in the unsafe fallback where `s3://other-bucket/...`
is written into the primary bucket. With #1259 merged, this should be rejected
as an unsupported authority unless an explicit cross-bucket opener is
configured. Please update the expected behavior instead of codifying the
fallback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]