Repository: beam
Updated Branches:
  refs/heads/master f138b3569 -> c489686e4


Fix bugs in fileio in the Temp IOFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a679ab11
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a679ab11
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a679ab11

Branch: refs/heads/master
Commit: a679ab111a907193067a8698708faf570f4d8c9e
Parents: f138b35
Author: Sourabh Bajaj <[email protected]>
Authored: Tue Mar 28 14:15:20 2017 -0700
Committer: Ahmet Altay <[email protected]>
Committed: Tue Mar 28 15:52:42 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/__init__.py |  1 +
 sdks/python/apache_beam/io/fileio.py   | 27 +++++++++++++++------------
 2 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a679ab11/sdks/python/apache_beam/io/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/__init__.py 
b/sdks/python/apache_beam/io/__init__.py
index 4b434be..881ce68 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -33,6 +33,7 @@ from apache_beam.io.range_trackers import *
 try:
   from apache_beam.io.gcp.bigquery import *
   from apache_beam.io.gcp.pubsub import *
+  from apache_beam.io.gcp import gcsio
 except ImportError:
   pass
 # pylint: enable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/a679ab11/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 0759ce4..f33942a 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -40,7 +40,7 @@ class ChannelFactory(object):
   @staticmethod
   def mkdir(path):
     bfs = get_filesystem(path)
-    bfs.mkdirs(path)
+    return bfs.mkdirs(path)
 
   @staticmethod
   def open(path,
@@ -59,14 +59,16 @@ class ChannelFactory(object):
 
   @staticmethod
   def rename(src, dest):
-    bfs = get_filesystem(path)
-    bfs.rename([src], [dest])
+    bfs = get_filesystem(src)
+    return bfs.rename([src], [dest])
 
   @staticmethod
   def rename_batch(src_dest_pairs):
     sources = [s for s, _ in src_dest_pairs]
     destinations = [d for _, d in src_dest_pairs]
-    bfs = get_filesystem()
+    if len(sources) == 0:
+      return []
+    bfs = get_filesystem(sources[0])
     try:
       bfs.rename(sources, destinations)
       return []
@@ -75,23 +77,23 @@ class ChannelFactory(object):
 
   @staticmethod
   def copytree(src, dest):
-    bfs = get_filesystem()
-    bfs.copy([src], [dest])
+    bfs = get_filesystem(src)
+    return bfs.copy([src], [dest])
 
   @staticmethod
   def exists(path):
     bfs = get_filesystem(path)
-    bfs.exists(path)
+    return bfs.exists(path)
 
   @staticmethod
   def rmdir(path):
     bfs = get_filesystem(path)
-    bfs.delete([path])
+    return bfs.delete([path])
 
   @staticmethod
   def rm(path):
     bfs = get_filesystem(path)
-    bfs.delete([path])
+    return bfs.delete([path])
 
   @staticmethod
   def glob(path, limit=None):
@@ -102,13 +104,13 @@ class ChannelFactory(object):
   @staticmethod
   def size_in_bytes(path):
     bfs = get_filesystem(path)
-    match_result = bfs.match([path], [limit])[0]
+    match_result = bfs.match([path])[0]
     return [f.size_in_bytes for f in match_result.metadata_list][0]
 
   @staticmethod
   def size_of_files_in_glob(path, file_names=None):
     bfs = get_filesystem(path)
-    match_result = bfs.match([path], [limit])[0]
+    match_result = bfs.match([path])[0]
     part_files = {f.path:f.size_in_bytes for f in match_result.metadata_list}
 
     if file_names is not None:
@@ -118,7 +120,8 @@ class ChannelFactory(object):
         for metadata in match_result.metadata_list:
           specific_files[metadata.path] = metadata.size_in_bytes
 
-    return part_files.update(specific_files)
+      part_files.update(specific_files)
+    return part_files
 
 
 class FileSink(iobase.Sink):

Reply via email to