This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 32e62730 feat(io): add native in-memory FS (#1025)
32e62730 is described below

commit 32e6273077e803bae56d2a489ec9f1b515e9f926
Author: Alessandro Nori <[email protected]>
AuthorDate: Wed May 6 18:28:58 2026 +0200

    feat(io): add native in-memory FS (#1025)
    
    ## Summary
    
    - Adds `MemFS`: an in-memory `IO` implementation with no external
    dependencies
    - `MemFS` implements `WriteFileIO` and `ListableIO`
    - Moves mem tests from `io/gocloud` to `io/` and removes the
    now-unnecessary gocloud import from `table/snapshot_producers_test.go`
    
    Part of #696.
---
 io/gocloud/register.go           |  11 ---
 io/mem.go                        | 189 +++++++++++++++++++++++++++++++++++++++
 io/{gocloud => }/mem_test.go     |  32 ++++++-
 io/registry.go                   |  11 +--
 table/snapshot_producers_test.go |   1 -
 5 files changed, 222 insertions(+), 22 deletions(-)

diff --git a/io/gocloud/register.go b/io/gocloud/register.go
index 7c2b4858..1d8307a3 100644
--- a/io/gocloud/register.go
+++ b/io/gocloud/register.go
@@ -22,13 +22,11 @@ import (
        "net/url"
 
        icebergio "github.com/apache/iceberg-go/io"
-       "gocloud.dev/blob/memblob"
 )
 
 func init() {
        registerS3Schemes()
        registerGCSScheme()
-       registerMemScheme()
        registerAzureSchemes()
 }
 
@@ -59,15 +57,6 @@ func registerGCSScheme() {
        })
 }
 
-// registerMemScheme registers the in-memory blob storage scheme (mem).
-func registerMemScheme() {
-       icebergio.Register("mem", func(ctx context.Context, parsed *url.URL, 
props map[string]string) (icebergio.IO, error) {
-               bucket := memblob.OpenBucket(nil)
-
-               return createBlobFS(ctx, bucket, 
defaultKeyExtractor(parsed.Host)), nil
-       })
-}
-
 // registerAzureSchemes registers Azure Data Lake Storage schemes (abfs, 
abfss, wasb, wasbs).
 func registerAzureSchemes() {
        azureFactory := func(ctx context.Context, parsed *url.URL, props 
map[string]string) (icebergio.IO, error) {
diff --git a/io/mem.go b/io/mem.go
new file mode 100644
index 00000000..179bc9d0
--- /dev/null
+++ b/io/mem.go
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package io
+
+import (
+       "bytes"
+       "context"
+       "io"
+       "io/fs"
+       "net/url"
+       "path/filepath"
+       "strings"
+       "sync"
+       "time"
+)
+
+var (
+       memBuckets   = map[string]*MemFS{}
+       memBucketsMu sync.Mutex
+)
+
+func openOrCreateMemBucket(bucket string) *MemFS {
+       memBucketsMu.Lock()
+       defer memBucketsMu.Unlock()
+       if b, ok := memBuckets[bucket]; ok {
+               return b
+       }
+       b := &MemFS{files: map[string][]byte{}}
+       memBuckets[bucket] = b
+
+       return b
+}
+
+func init() {
+       Register("mem", func(_ context.Context, parsed *url.URL, _ 
map[string]string) (IO, error) {
+               return openOrCreateMemBucket(parsed.Host), nil
+       })
+}
+
+// MemFS is a simple in-memory IO implementation used for testing and 
lightweight workloads.
+// Files are stored per named bucket and persist for the lifetime of the 
process.
+// Use the "mem://bucket/path" URI scheme to access files.
+type MemFS struct {
+       mu    sync.RWMutex
+       files map[string][]byte // keyed by full URI
+}
+
+func (m *MemFS) Open(name string) (File, error) {
+       m.mu.RLock()
+       data, ok := m.files[name]
+       m.mu.RUnlock()
+       if !ok {
+               return nil, &fs.PathError{Op: "open", Path: name, Err: 
fs.ErrNotExist}
+       }
+       cp := make([]byte, len(data))
+       copy(cp, data)
+
+       return &memFile{data: cp, name: filepath.Base(name)}, nil
+}
+
+func (m *MemFS) Remove(name string) error {
+       m.mu.Lock()
+       delete(m.files, name)
+       m.mu.Unlock()
+
+       return nil
+}
+
+func (m *MemFS) Create(name string) (FileWriter, error) {
+       return &memWriter{fs: m, name: name}, nil
+}
+
+func (m *MemFS) WriteFile(name string, content []byte) error {
+       cp := make([]byte, len(content))
+       copy(cp, content)
+       m.mu.Lock()
+       m.files[name] = cp
+       m.mu.Unlock()
+
+       return nil
+}
+
+func (m *MemFS) WalkDir(root string, fn fs.WalkDirFunc) error {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       for key, data := range m.files {
+               if !strings.HasPrefix(key, root) {
+                       continue
+               }
+               info := &memFileInfo{name: filepath.Base(key), size: 
int64(len(data))}
+               if err := fn(key, fs.FileInfoToDirEntry(info), nil); err != nil 
{
+                       return err
+               }
+       }
+
+       return nil
+}
+
+type memFile struct {
+       data []byte
+       name string
+       pos  int64
+}
+
+func (f *memFile) Read(p []byte) (int, error) {
+       if f.pos >= int64(len(f.data)) {
+               return 0, io.EOF
+       }
+       n := copy(p, f.data[f.pos:])
+       f.pos += int64(n)
+
+       return n, nil
+}
+
+func (f *memFile) Seek(offset int64, whence int) (int64, error) {
+       var abs int64
+       switch whence {
+       case io.SeekStart:
+               abs = offset
+       case io.SeekCurrent:
+               abs = f.pos + offset
+       case io.SeekEnd:
+               abs = int64(len(f.data)) + offset
+       }
+       if abs < 0 {
+               return 0, &fs.PathError{Op: "seek", Path: f.name, Err: 
fs.ErrInvalid}
+       }
+       f.pos = abs
+
+       return f.pos, nil
+}
+
+func (f *memFile) ReadAt(p []byte, off int64) (int, error) {
+       if off >= int64(len(f.data)) {
+               return 0, io.EOF
+       }
+       n := copy(p, f.data[off:])
+       if n < len(p) {
+               return n, io.EOF
+       }
+
+       return n, nil
+}
+
+func (f *memFile) Close() error { return nil }
+
+func (f *memFile) Stat() (fs.FileInfo, error) {
+       return &memFileInfo{name: f.name, size: int64(len(f.data))}, nil
+}
+
+type memFileInfo struct {
+       name string
+       size int64
+}
+
+func (fi *memFileInfo) Name() string       { return fi.name }
+func (fi *memFileInfo) Size() int64        { return fi.size }
+func (fi *memFileInfo) Mode() fs.FileMode  { return 0 }
+func (fi *memFileInfo) ModTime() time.Time { return time.Time{} }
+func (fi *memFileInfo) IsDir() bool        { return false }
+func (fi *memFileInfo) Sys() any           { return nil }
+
+type memWriter struct {
+       fs   *MemFS
+       name string
+       buf  bytes.Buffer
+}
+
+func (w *memWriter) Write(p []byte) (int, error)         { return 
w.buf.Write(p) }
+func (w *memWriter) ReadFrom(r io.Reader) (int64, error) { return 
w.buf.ReadFrom(r) }
+func (w *memWriter) Close() error {
+       return w.fs.WriteFile(w.name, w.buf.Bytes())
+}
diff --git a/io/gocloud/mem_test.go b/io/mem_test.go
similarity index 78%
rename from io/gocloud/mem_test.go
rename to io/mem_test.go
index a7d536ba..c064c49c 100644
--- a/io/gocloud/mem_test.go
+++ b/io/mem_test.go
@@ -15,15 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package gocloud_test
+package io_test
 
 import (
        "context"
        "io"
+       "io/fs"
        "testing"
 
        icebergio "github.com/apache/iceberg-go/io"
-       _ "github.com/apache/iceberg-go/io/gocloud"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
@@ -131,3 +131,31 @@ func TestMemIO_MultipleFiles(t *testing.T) {
        require.NoError(t, err)
        file3.Close()
 }
+
+func TestMemIO_WalkDir(t *testing.T) {
+       ctx := context.Background()
+
+       memIO, err := icebergio.LoadFS(ctx, map[string]string{}, 
"mem://walkdir-bucket/")
+       require.NoError(t, err)
+
+       listable, ok := memIO.(icebergio.ListableIO)
+       require.True(t, ok, "mem IO should implement ListableIO")
+
+       writeIO := memIO.(icebergio.WriteFileIO)
+       require.NoError(t, writeIO.WriteFile("mem://walkdir-bucket/a/1.txt", 
[]byte("1")))
+       require.NoError(t, writeIO.WriteFile("mem://walkdir-bucket/a/2.txt", 
[]byte("2")))
+       require.NoError(t, writeIO.WriteFile("mem://walkdir-bucket/b/3.txt", 
[]byte("3")))
+
+       var walked []string
+       err = listable.WalkDir("mem://walkdir-bucket/a", func(path string, _ 
fs.DirEntry, err error) error {
+               require.NoError(t, err)
+               walked = append(walked, path)
+
+               return nil
+       })
+       require.NoError(t, err)
+       assert.ElementsMatch(t, []string{
+               "mem://walkdir-bucket/a/1.txt",
+               "mem://walkdir-bucket/a/2.txt",
+       }, walked)
+}
diff --git a/io/registry.go b/io/registry.go
index 66b6bf34..aacea7c4 100644
--- a/io/registry.go
+++ b/io/registry.go
@@ -77,15 +77,10 @@ func init() {
 
 func schemeRegistrationHint(scheme string) string {
        switch scheme {
-       case "s3", "s3a", "s3n", "gs", "abfs", "abfss", "wasb", "wasbs", "mem":
-               {
-                       return `hint: import the matching IO module for 
side-effect registration: _ "github.com/apache/iceberg-go/io/gocloud"  // for 
s3/gcs/azblob/file`
-               }
-
+       case "s3", "s3a", "s3n", "gs", "abfs", "abfss", "wasb", "wasbs":
+               return `hint: import the matching IO module for side-effect 
registration: _ "github.com/apache/iceberg-go/io/gocloud"  // for s3/gcs/azblob`
        default:
-               {
-                       return ""
-               }
+               return ""
        }
 }
 
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index f43ea4c2..d9d31bf8 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -31,7 +31,6 @@ import (
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/internal"
        iceio "github.com/apache/iceberg-go/io"
-       _ "github.com/apache/iceberg-go/io/gocloud"
        "github.com/stretchr/testify/require"
 )
 

Reply via email to