laskoviymishka commented on code in PR #1078:
URL: https://github.com/apache/iceberg-go/pull/1078#discussion_r3243482896


##########
io/gocloud/blob.go:
##########
@@ -102,23 +115,81 @@ type blobFileIO struct {
        newRangeReader func(ctx context.Context, key string, offset, length 
int64) (io.ReadCloser, error)
 }
 
+// resolveBucket parses path into a bucket handle and object key. If the URI
+// references the primary bucket (or has no scheme), the primary bucket is
+// returned without any additional allocation. URIs that reference a different
+// bucket are resolved via BucketOpener and cached for reuse.
+func (bfs *blobFileIO) resolveBucket(path string) (*blob.Bucket, string, 
error) {
+       _, after, hasScheme := strings.Cut(path, "://")
+       if !hasScheme {
+               // No scheme: treat as a key in the primary bucket (legacy 
behavior).
+               key, err := bfs.keyExtractor(path)
+
+               return bfs.Bucket, key, err
+       }
+
+       bucketName, key, _ := strings.Cut(after, "/")
+       if key == "" {
+               return nil, "", fmt.Errorf("URI path is empty: %s", path)
+       }
+
+       // Fast path: primary bucket.
+       if bucketName == bfs.primaryBucket {
+               return bfs.Bucket, key, nil
+       }
+
+       // Secondary bucket: check cache first.
+       if bfs.bucketOpener == nil {
+               // No opener configured: fall back to primary bucket with 
legacy key extraction.
+               // This preserves backward compatibility for callers that don't 
set a BucketOpener.
+               key, err := bfs.keyExtractor(path)
+
+               return bfs.Bucket, key, err
+       }
+
+       bfs.mu.RLock()
+       b, ok := bfs.buckets[bucketName]
+       bfs.mu.RUnlock()
+       if ok {
+               return b, key, nil
+       }
+
+       // Open a new bucket handle.
+       b, err := bfs.bucketOpener(bfs.ctx, bucketName)
+       if err != nil {
+               return nil, "", fmt.Errorf("failed to open bucket %q: %w", 
bucketName, err)
+       }
+
+       bfs.mu.Lock()
+       // Double-check: another goroutine may have opened it concurrently.
+       if existing, ok := bfs.buckets[bucketName]; ok {
+               bfs.mu.Unlock()
+               _ = b.Close()
+
+               return existing, key, nil
+       }
+       bfs.buckets[bucketName] = b
+       bfs.mu.Unlock()
+
+       return b, key, nil
+}
+
 func (bfs *blobFileIO) preprocess(path string) (string, error) {
        return bfs.keyExtractor(path)
 }
 
 func (bfs *blobFileIO) Open(path string) (icebergio.File, error) {
-       var err error
-       path, err = bfs.preprocess(path)
+       bucket, key, err := bfs.resolveBucket(path)
        if err != nil {
                return nil, &fs.PathError{Op: "open", Path: path, Err: err}
        }
-       if !fs.ValidPath(path) {
-               return nil, &fs.PathError{Op: "open", Path: path, Err: 
fs.ErrInvalid}
+       if !fs.ValidPath(key) {
+               return nil, &fs.PathError{Op: "open", Path: key, Err: 
fs.ErrInvalid}
        }
 
-       key, name := path, filepath.Base(path)
+       name := filepath.Base(key)
 
-       r, err := bfs.NewReader(bfs.ctx, key, nil)
+       r, err := bucket.NewReader(bfs.ctx, key, nil)

Review Comment:
   I think there's a subtle issue — `Open` now routes through the resolved 
`bucket` here, but `blobOpenFile.ReadAt` (line 49) still calls 
`f.b.NewRangeReader(...)`, which dispatches to the embedded *primary* bucket. 
So any cross-bucket file opened here fails on the first `ReadAt` (404, or worse 
if a same-named key exists in the primary), and that's the path Parquet column 
readers and the puffin/DV reader actually take. `TestMultiBucketWriteAndRead` 
passes only because `io.ReadAll` uses `Read`.
   
   Could we thread `bucket` into `blobOpenFile` and have `ReadAt` use 
`f.bucket.NewRangeReader(...)`? A `ReadAt`-specific test would help lock it in. 
wdyt?



##########
io/gocloud/blob.go:
##########
@@ -91,9 +92,21 @@ func defaultKeyExtractor(bucketName string) KeyExtractor {
        }
 }
 
+// BucketOpener creates a new blob.Bucket for the given bucket name.
+// Used to lazily open secondary buckets when a URI references a bucket
+// different from the primary one (e.g. write.metadata.path pointing to
+// a separate metadata bucket).
+type BucketOpener func(ctx context.Context, bucketName string) (*blob.Bucket, 
error)
+
 type blobFileIO struct {
        *blob.Bucket
 
+       primaryBucket string
+       bucketOpener  BucketOpener
+
+       mu      sync.RWMutex
+       buckets map[string]*blob.Bucket

Review Comment:
   These secondary buckets are never closed — `blobFileIO` only inherits 
`Close()` from the embedded primary. Each cached `*blob.Bucket` holds its own 
HTTP transport / connection pool, so a long-running process that touches many 
distinct metadata buckets will leak transports indefinitely.
   
   I'd add a `Close()` on `blobFileIO` that walks `bfs.buckets` under the lock 
and closes each before closing the primary. wdyt?



##########
io/gocloud/blob.go:
##########
@@ -159,37 +228,53 @@ func (bfs *blobFileIO) WriteFile(name string, content 
[]byte) error {
 // The caller must call Close on the returned Writer, even if the write is
 // aborted.
 func (bfs *blobFileIO) NewWriter(ctx context.Context, path string, overwrite 
bool, opts *blob.WriterOptions) (w *blobWriteFile, err error) {
-       path, err = bfs.preprocess(path)
+       bucket, key, err := bfs.resolveBucket(path)
        if err != nil {
                return nil, &fs.PathError{Op: "new writer", Path: path, Err: 
err}
        }
-       if !fs.ValidPath(path) {
-               return nil, &fs.PathError{Op: "new writer", Path: path, Err: 
fs.ErrInvalid}
+       if !fs.ValidPath(key) {
+               return nil, &fs.PathError{Op: "new writer", Path: key, Err: 
fs.ErrInvalid}
        }
 
        if !overwrite {
-               if exists, err := bfs.Exists(ctx, path); err != nil || exists {
+               if exists, err := bucket.Exists(ctx, key); err != nil || exists 
{
                        if err != nil {
-                               return nil, &fs.PathError{Op: "new writer", 
Path: path, Err: err}
+                               return nil, &fs.PathError{Op: "new writer", 
Path: key, Err: err}
                        }
 
-                       return nil, &fs.PathError{Op: "new writer", Path: path, 
Err: fs.ErrInvalid}
+                       return nil, &fs.PathError{Op: "new writer", Path: key, 
Err: fs.ErrInvalid}
                }
        }
-       bw, err := bfs.Bucket.NewWriter(ctx, path, opts)
+       bw, err := bucket.NewWriter(ctx, key, opts)
        if err != nil {
                return nil, err
        }
 
        return &blobWriteFile{
                        Writer: bw,
-                       name:   path,
+                       name:   key,
                },
                nil
 }
 
 func createBlobFS(ctx context.Context, bucket *blob.Bucket, keyExtractor 
KeyExtractor) icebergio.IO {
-       return &blobFileIO{Bucket: bucket, keyExtractor: keyExtractor, ctx: ctx}
+       return &blobFileIO{
+               Bucket:       bucket,
+               keyExtractor: keyExtractor,
+               ctx:          ctx,
+               buckets:      make(map[string]*blob.Bucket),
+       }
+}
+
+func createMultiBucketBlobFS(ctx context.Context, bucket *blob.Bucket, 
primaryBucket string, opener BucketOpener) icebergio.IO {
+       return &blobFileIO{
+               Bucket:        bucket,
+               primaryBucket: primaryBucket,
+               bucketOpener:  opener,
+               buckets:       make(map[string]*blob.Bucket),
+               keyExtractor:  defaultKeyExtractor(primaryBucket),
+               ctx:           ctx,
+       }
 }
 
 func (bfs *blobFileIO) WalkDir(root string, fn fs.WalkDirFunc) error {

Review Comment:
   `WalkDir` is the one IO entry point that doesn't go through `resolveBucket` 
— it still hardcodes `fs.WalkDir(bfs.Bucket, ...)`. So if `root` names a 
secondary bucket (e.g., orphan-file enumeration on `write.metadata.path`), this 
silently walks the primary warehouse bucket and returns the wrong listing.
   
   Could we route this through `resolveBucket` too? Probably worth a small test 
covering the cross-bucket root.



##########
io/gocloud/register.go:
##########
@@ -38,7 +39,13 @@ func registerS3Schemes() {
                        return nil, err
                }
 
-               return createBlobFS(ctx, bucket, 
defaultKeyExtractor(parsed.Host)), nil
+               opener := func(ctx context.Context, bucketName string) 
(*blob.Bucket, error) {
+                       u := &url.URL{Scheme: parsed.Scheme, Host: bucketName}
+
+                       return createS3Bucket(ctx, u, props)
+               }
+
+               return createMultiBucketBlobFS(ctx, bucket, parsed.Host, 
opener), nil

Review Comment:
   The fix lands for S3 here, but `registerGCSScheme` (line 64) and 
`registerAzureSchemes` (line 76) still call `createBlobFS`, so a GCS or ADLS 
user with `write.metadata.path` on a separate bucket still hits the mangled-key 
bug we're fixing. PyIceberg and Java handle cross-bucket GCS transparently, so 
tables written by them become unreadable from Go in that layout.
   
   Could we either add `BucketOpener` impls for GCS/Azure here, or call out the 
S3-only scope in the PR description with a follow-up tracked? wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to