This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 32e62730 feat(io): add native in-memory FS (#1025)
32e62730 is described below
commit 32e6273077e803bae56d2a489ec9f1b515e9f926
Author: Alessandro Nori <[email protected]>
AuthorDate: Wed May 6 18:28:58 2026 +0200
feat(io): add native in-memory FS (#1025)
## Summary
- Adds `MemFS`: an in-memory `IO` implementation with no external
dependencies
- `MemFS` implements `WriteFileIO` and `ListableIO`
- Moves mem tests from `io/gocloud` to `io/` and removes the
now-unnecessary gocloud import from `table/snapshot_producers_test.go`
Part of #696.
---
io/gocloud/register.go | 11 ---
io/mem.go | 189 +++++++++++++++++++++++++++++++++++++++
io/{gocloud => }/mem_test.go | 32 ++++++-
io/registry.go | 11 +--
table/snapshot_producers_test.go | 1 -
5 files changed, 222 insertions(+), 22 deletions(-)
diff --git a/io/gocloud/register.go b/io/gocloud/register.go
index 7c2b4858..1d8307a3 100644
--- a/io/gocloud/register.go
+++ b/io/gocloud/register.go
@@ -22,13 +22,11 @@ import (
"net/url"
icebergio "github.com/apache/iceberg-go/io"
- "gocloud.dev/blob/memblob"
)
func init() {
registerS3Schemes()
registerGCSScheme()
- registerMemScheme()
registerAzureSchemes()
}
@@ -59,15 +57,6 @@ func registerGCSScheme() {
})
}
-// registerMemScheme registers the in-memory blob storage scheme (mem).
-func registerMemScheme() {
- icebergio.Register("mem", func(ctx context.Context, parsed *url.URL,
props map[string]string) (icebergio.IO, error) {
- bucket := memblob.OpenBucket(nil)
-
- return createBlobFS(ctx, bucket,
defaultKeyExtractor(parsed.Host)), nil
- })
-}
-
// registerAzureSchemes registers Azure Data Lake Storage schemes (abfs,
abfss, wasb, wasbs).
func registerAzureSchemes() {
azureFactory := func(ctx context.Context, parsed *url.URL, props
map[string]string) (icebergio.IO, error) {
diff --git a/io/mem.go b/io/mem.go
new file mode 100644
index 00000000..179bc9d0
--- /dev/null
+++ b/io/mem.go
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package io
+
+import (
+ "bytes"
+ "context"
+ "io"
+ "io/fs"
+ "net/url"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+)
+
+var (
+ memBuckets = map[string]*MemFS{}
+ memBucketsMu sync.Mutex
+)
+
+func openOrCreateMemBucket(bucket string) *MemFS {
+ memBucketsMu.Lock()
+ defer memBucketsMu.Unlock()
+ if b, ok := memBuckets[bucket]; ok {
+ return b
+ }
+ b := &MemFS{files: map[string][]byte{}}
+ memBuckets[bucket] = b
+
+ return b
+}
+
+func init() {
+ Register("mem", func(_ context.Context, parsed *url.URL, _
map[string]string) (IO, error) {
+ return openOrCreateMemBucket(parsed.Host), nil
+ })
+}
+
+// MemFS is a simple in-memory IO implementation used for testing and
lightweight workloads.
+// Files are stored per named bucket and persist for the lifetime of the
process.
+// Use the "mem://bucket/path" URI scheme to access files.
+type MemFS struct {
+ mu sync.RWMutex
+ files map[string][]byte // keyed by full URI
+}
+
+func (m *MemFS) Open(name string) (File, error) {
+ m.mu.RLock()
+ data, ok := m.files[name]
+ m.mu.RUnlock()
+ if !ok {
+ return nil, &fs.PathError{Op: "open", Path: name, Err:
fs.ErrNotExist}
+ }
+ cp := make([]byte, len(data))
+ copy(cp, data)
+
+ return &memFile{data: cp, name: filepath.Base(name)}, nil
+}
+
+func (m *MemFS) Remove(name string) error {
+ m.mu.Lock()
+ delete(m.files, name)
+ m.mu.Unlock()
+
+ return nil
+}
+
+func (m *MemFS) Create(name string) (FileWriter, error) {
+ return &memWriter{fs: m, name: name}, nil
+}
+
+func (m *MemFS) WriteFile(name string, content []byte) error {
+ cp := make([]byte, len(content))
+ copy(cp, content)
+ m.mu.Lock()
+ m.files[name] = cp
+ m.mu.Unlock()
+
+ return nil
+}
+
+func (m *MemFS) WalkDir(root string, fn fs.WalkDirFunc) error {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ for key, data := range m.files {
+ if !strings.HasPrefix(key, root) {
+ continue
+ }
+ info := &memFileInfo{name: filepath.Base(key), size:
int64(len(data))}
+ if err := fn(key, fs.FileInfoToDirEntry(info), nil); err != nil
{
+ return err
+ }
+ }
+
+ return nil
+}
+
+type memFile struct {
+ data []byte
+ name string
+ pos int64
+}
+
+func (f *memFile) Read(p []byte) (int, error) {
+ if f.pos >= int64(len(f.data)) {
+ return 0, io.EOF
+ }
+ n := copy(p, f.data[f.pos:])
+ f.pos += int64(n)
+
+ return n, nil
+}
+
+func (f *memFile) Seek(offset int64, whence int) (int64, error) {
+ var abs int64
+ switch whence {
+ case io.SeekStart:
+ abs = offset
+ case io.SeekCurrent:
+ abs = f.pos + offset
+ case io.SeekEnd:
+ abs = int64(len(f.data)) + offset
+ }
+ if abs < 0 {
+ return 0, &fs.PathError{Op: "seek", Path: f.name, Err:
fs.ErrInvalid}
+ }
+ f.pos = abs
+
+ return f.pos, nil
+}
+
+func (f *memFile) ReadAt(p []byte, off int64) (int, error) {
+ if off >= int64(len(f.data)) {
+ return 0, io.EOF
+ }
+ n := copy(p, f.data[off:])
+ if n < len(p) {
+ return n, io.EOF
+ }
+
+ return n, nil
+}
+
+func (f *memFile) Close() error { return nil }
+
+func (f *memFile) Stat() (fs.FileInfo, error) {
+ return &memFileInfo{name: f.name, size: int64(len(f.data))}, nil
+}
+
+type memFileInfo struct {
+ name string
+ size int64
+}
+
+func (fi *memFileInfo) Name() string { return fi.name }
+func (fi *memFileInfo) Size() int64 { return fi.size }
+func (fi *memFileInfo) Mode() fs.FileMode { return 0 }
+func (fi *memFileInfo) ModTime() time.Time { return time.Time{} }
+func (fi *memFileInfo) IsDir() bool { return false }
+func (fi *memFileInfo) Sys() any { return nil }
+
+type memWriter struct {
+ fs *MemFS
+ name string
+ buf bytes.Buffer
+}
+
+func (w *memWriter) Write(p []byte) (int, error) { return
w.buf.Write(p) }
+func (w *memWriter) ReadFrom(r io.Reader) (int64, error) { return
w.buf.ReadFrom(r) }
+func (w *memWriter) Close() error {
+ return w.fs.WriteFile(w.name, w.buf.Bytes())
+}
diff --git a/io/gocloud/mem_test.go b/io/mem_test.go
similarity index 78%
rename from io/gocloud/mem_test.go
rename to io/mem_test.go
index a7d536ba..c064c49c 100644
--- a/io/gocloud/mem_test.go
+++ b/io/mem_test.go
@@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-package gocloud_test
+package io_test
import (
"context"
"io"
+ "io/fs"
"testing"
icebergio "github.com/apache/iceberg-go/io"
- _ "github.com/apache/iceberg-go/io/gocloud"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -131,3 +131,31 @@ func TestMemIO_MultipleFiles(t *testing.T) {
require.NoError(t, err)
file3.Close()
}
+
+func TestMemIO_WalkDir(t *testing.T) {
+ ctx := context.Background()
+
+ memIO, err := icebergio.LoadFS(ctx, map[string]string{},
"mem://walkdir-bucket/")
+ require.NoError(t, err)
+
+ listable, ok := memIO.(icebergio.ListableIO)
+ require.True(t, ok, "mem IO should implement ListableIO")
+
+ writeIO := memIO.(icebergio.WriteFileIO)
+ require.NoError(t, writeIO.WriteFile("mem://walkdir-bucket/a/1.txt",
[]byte("1")))
+ require.NoError(t, writeIO.WriteFile("mem://walkdir-bucket/a/2.txt",
[]byte("2")))
+ require.NoError(t, writeIO.WriteFile("mem://walkdir-bucket/b/3.txt",
[]byte("3")))
+
+ var walked []string
+ err = listable.WalkDir("mem://walkdir-bucket/a", func(path string, _
fs.DirEntry, err error) error {
+ require.NoError(t, err)
+ walked = append(walked, path)
+
+ return nil
+ })
+ require.NoError(t, err)
+ assert.ElementsMatch(t, []string{
+ "mem://walkdir-bucket/a/1.txt",
+ "mem://walkdir-bucket/a/2.txt",
+ }, walked)
+}
diff --git a/io/registry.go b/io/registry.go
index 66b6bf34..aacea7c4 100644
--- a/io/registry.go
+++ b/io/registry.go
@@ -77,15 +77,10 @@ func init() {
func schemeRegistrationHint(scheme string) string {
switch scheme {
- case "s3", "s3a", "s3n", "gs", "abfs", "abfss", "wasb", "wasbs", "mem":
- {
- return `hint: import the matching IO module for
side-effect registration: _ "github.com/apache/iceberg-go/io/gocloud" // for
s3/gcs/azblob/file`
- }
-
+ case "s3", "s3a", "s3n", "gs", "abfs", "abfss", "wasb", "wasbs":
+ return `hint: import the matching IO module for side-effect
registration: _ "github.com/apache/iceberg-go/io/gocloud" // for s3/gcs/azblob`
default:
- {
- return ""
- }
+ return ""
}
}
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index f43ea4c2..d9d31bf8 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -31,7 +31,6 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
- _ "github.com/apache/iceberg-go/io/gocloud"
"github.com/stretchr/testify/require"
)