[
https://issues.apache.org/jira/browse/BEAM-4172?focusedWorklogId=99493&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99493
]
ASF GitHub Bot logged work on BEAM-4172:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/May/18 12:32
Start Date: 08/May/18 12:32
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5227: [BEAM-4172] Move
textio.FileSystem and its registry to a separate package
URL: https://github.com/apache/beam/pull/5227
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/examples/minimal_wordcount/minimal_wordcount.go
b/sdks/go/examples/minimal_wordcount/minimal_wordcount.go
index e3a45e0dcdb..628429368b0 100644
--- a/sdks/go/examples/minimal_wordcount/minimal_wordcount.go
+++ b/sdks/go/examples/minimal_wordcount/minimal_wordcount.go
@@ -43,10 +43,11 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
- _ "github.com/apache/beam/sdks/go/pkg/beam/io/textio/gcs"
- _ "github.com/apache/beam/sdks/go/pkg/beam/io/textio/local"
"github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+
+ _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+ _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
)
var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
diff --git a/sdks/go/pkg/beam/io/textio/filesystem.go
b/sdks/go/pkg/beam/io/filesystem/filesystem.go
similarity index 51%
rename from sdks/go/pkg/beam/io/textio/filesystem.go
rename to sdks/go/pkg/beam/io/filesystem/filesystem.go
index 135a97fe6b5..727dd43aedb 100644
--- a/sdks/go/pkg/beam/io/textio/filesystem.go
+++ b/sdks/go/pkg/beam/io/filesystem/filesystem.go
@@ -13,29 +13,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package textio
+package filesystem
import (
"context"
"fmt"
"io"
+ "strings"
)
-var registry = make(map[string]func(context.Context) FileSystem)
+var registry = make(map[string]func(context.Context) Interface)
-// RegisterFileSystem registers a file system backend for textio.Read/Write,
-// under the given scheme.For example, "hdfs" would be registered a HFDS file
-// system and HDFS paths used transparently.
-func RegisterFileSystem(scheme string, fs func(context.Context) FileSystem) {
+// Register registers a file system backend under the given scheme. For
+// example, "hdfs" would be registered a HFDS file system and HDFS paths used
+// transparently.
+func Register(scheme string, fs func(context.Context) Interface) {
if _, ok := registry[scheme]; ok {
panic(fmt.Sprintf("scheme %v already registered", scheme))
}
registry[scheme] = fs
}
-// FileSystem is a filesystem abstraction that allows textio to use various
-// underlying storage systems transparently.
-type FileSystem interface {
+// New returns a new Interface for the given file path's scheme.
+func New(ctx context.Context, path string) (Interface, error) {
+ scheme := getScheme(path)
+ mkfs, ok := registry[scheme]
+ if !ok {
+ return nil, fmt.Errorf("file system scheme %v not registered
for %v", scheme, path)
+ }
+ return mkfs(ctx), nil
+}
+
+// Interface is a filesystem abstraction that allows beam io sources and sinks
+// to use various underlying storage systems transparently.
+type Interface interface {
io.Closer
// List expands a patten to a list of filenames.
@@ -47,3 +58,22 @@ type FileSystem interface {
// overwritten.
OpenWrite(ctx context.Context, filename string) (io.WriteCloser, error)
}
+
+func getScheme(path string) string {
+ if index := strings.Index(path, "://"); index > 0 {
+ return path[:index]
+ }
+ return "default"
+}
+
+// ValidateScheme panics if the given path's scheme does not have a
+// corresponding file system registered.
+func ValidateScheme(path string) {
+ if strings.TrimSpace(path) == "" {
+ panic("empty file glob provided")
+ }
+ scheme := getScheme(path)
+ if _, ok := registry[scheme]; !ok {
+ panic(fmt.Sprintf("filesystem scheme %v not registered",
scheme))
+ }
+}
diff --git a/sdks/go/pkg/beam/io/textio/gcs/gcs.go
b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
similarity index 96%
rename from sdks/go/pkg/beam/io/textio/gcs/gcs.go
rename to sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
index a5a6edfd5cd..8d82d5cb581 100644
--- a/sdks/go/pkg/beam/io/textio/gcs/gcs.go
+++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
@@ -23,14 +23,14 @@ import (
"path/filepath"
"strings"
- "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"google.golang.org/api/storage/v1"
)
func init() {
- textio.RegisterFileSystem("gs", New)
+ filesystem.Register("gs", New)
}
type fs struct {
@@ -40,7 +40,7 @@ type fs struct {
// New creates a new Google Cloud Storage filesystem using application
// default credentials. If it fails, it falls back to unauthenticated
// access.
-func New(ctx context.Context) textio.FileSystem {
+func New(ctx context.Context) filesystem.Interface {
client, err := gcsx.NewClient(ctx, storage.DevstorageReadWriteScope)
if err != nil {
log.Warnf(ctx, "Warning: falling back to unauthenticated GCS
access: %v", err)
diff --git a/sdks/go/pkg/beam/io/textio/local/local.go
b/sdks/go/pkg/beam/io/filesystem/local/local.go
similarity index 90%
rename from sdks/go/pkg/beam/io/textio/local/local.go
rename to sdks/go/pkg/beam/io/filesystem/local/local.go
index d63afd9919f..301af447173 100644
--- a/sdks/go/pkg/beam/io/textio/local/local.go
+++ b/sdks/go/pkg/beam/io/filesystem/local/local.go
@@ -21,17 +21,17 @@ import (
"os"
"path/filepath"
- "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
)
func init() {
- textio.RegisterFileSystem("default", New)
+ filesystem.Register("default", New)
}
type fs struct{}
// New creates a new local filesystem.
-func New(ctx context.Context) textio.FileSystem {
+func New(ctx context.Context) filesystem.Interface {
return &fs{}
}
diff --git a/sdks/go/pkg/beam/io/textio/textio.go
b/sdks/go/pkg/beam/io/textio/textio.go
index 926251fb6bc..71955efdcfe 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -18,12 +18,12 @@ package textio
import (
"bufio"
"context"
- "fmt"
"os"
"reflect"
"strings"
"github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/go/pkg/beam/log"
)
@@ -38,36 +38,10 @@ func init() {
func Read(s beam.Scope, glob string) beam.PCollection {
s = s.Scope("textio.Read")
- validateScheme(glob)
+ filesystem.ValidateScheme(glob)
return read(s, beam.Create(s, glob))
}
-func validateScheme(glob string) {
- if strings.TrimSpace(glob) == "" {
- panic("empty file glob provided")
- }
- scheme := getScheme(glob)
- if _, ok := registry[scheme]; !ok {
- panic(fmt.Sprintf("textio scheme %v not registered", scheme))
- }
-}
-
-func getScheme(glob string) string {
- if index := strings.Index(glob, "://"); index > 0 {
- return glob[:index]
- }
- return "default"
-}
-
-func newFileSystem(ctx context.Context, glob string) (FileSystem, error) {
- scheme := getScheme(glob)
- mkfs, ok := registry[scheme]
- if !ok {
- return nil, fmt.Errorf("textio scheme %v not registered for
%v", scheme, glob)
- }
- return mkfs(ctx), nil
-}
-
// ReadAll expands and reads the filename given as globs by the incoming
// PCollection<string>. It returns the lines of all files as a single
// PCollection<string>. The newlines are not part of the lines.
@@ -87,7 +61,7 @@ func expandFn(ctx context.Context, glob string, emit
func(string)) error {
return nil // ignore empty string elements here
}
- fs, err := newFileSystem(ctx, glob)
+ fs, err := filesystem.New(ctx, glob)
if err != nil {
return err
}
@@ -106,7 +80,7 @@ func expandFn(ctx context.Context, glob string, emit
func(string)) error {
func readFn(ctx context.Context, filename string, emit func(string)) error {
log.Infof(ctx, "Reading from %v", filename)
- fs, err := newFileSystem(ctx, filename)
+ fs, err := filesystem.New(ctx, filename)
if err != nil {
return err
}
@@ -133,7 +107,7 @@ func readFn(ctx context.Context, filename string, emit
func(string)) error {
func Write(s beam.Scope, filename string, col beam.PCollection) {
s = s.Scope("textio.Write")
- validateScheme(filename)
+ filesystem.ValidateScheme(filename)
// NOTE(BEAM-3579): We may never call Teardown for non-local runners and
// FinishBundle doesn't have the right granularity. We therefore
@@ -151,7 +125,7 @@ type writeFileFn struct {
}
func (w *writeFileFn) ProcessElement(ctx context.Context, _ int, lines
func(*string) bool) error {
- fs, err := newFileSystem(ctx, w.Filename)
+ fs, err := filesystem.New(ctx, w.Filename)
if err != nil {
return err
}
diff --git a/sdks/go/pkg/beam/x/beamx/run.go b/sdks/go/pkg/beam/x/beamx/run.go
index c6eaf294b4c..3380b068583 100644
--- a/sdks/go/pkg/beam/x/beamx/run.go
+++ b/sdks/go/pkg/beam/x/beamx/run.go
@@ -23,8 +23,8 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam"
// Import the reflection-optimized runtime.
_ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/optimized"
- _ "github.com/apache/beam/sdks/go/pkg/beam/io/textio/gcs"
- _ "github.com/apache/beam/sdks/go/pkg/beam/io/textio/local"
+ _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+ _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
// The imports here are for the side effect of runner registration.
_ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 99493)
Time Spent: 50m (was: 40m)
> Make public FileSystem registry
> -------------------------------
>
> Key: BEAM-4172
> URL: https://issues.apache.org/jira/browse/BEAM-4172
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Cody Schroeder
> Assignee: Cody Schroeder
> Priority: Major
> Labels: io
> Time Spent: 50m
> Remaining Estimate: 0h
>
> [https://github.com/apache/beam/blob/5743a37/sdks/go/pkg/beam/io/textio/filesystem.go]
> The current _beam/io/textio_ package includes a useful _FileSystem_ interface
> and corresponding _RegisterFileSystem_ function. The _textio_ package uses
> this internally to expose a _Read(beam.Scope, string)_ function that will
> work for any file path corresponding to a registered FileSystem.
> It would be extremely useful to expose the _FileSystem_ interface outside of
> just the _textio_ package and add global analogs for each of the _FileSystem_
> interface functions using the registry. This would allow for easier
> implementation of other file reading sources.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)