Repository: incubator-impala
Updated Branches:
  refs/heads/master e89d7057a -> f15589573


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index b70c9c8..e2a92ec 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -31,11 +31,12 @@
 #include "common/hdfs.h"
 #include "common/object-pool.h"
 #include "common/status.h"
+#include "runtime/disk-io-mgr-handle-cache.h"
 #include "runtime/thread-resource-mgr.h"
+#include "util/aligned-new.h"
 #include "util/bit-util.h"
 #include "util/error-util.h"
 #include "util/internal-queue.h"
-#include "util/lru-cache.h"
 #include "util/runtime-profile.h"
 #include "util/thread.h"
 
@@ -196,39 +197,12 @@ class MemTracker;
 
 class DiskIoRequestContext;
 
-class DiskIoMgr {
+// This is cache line aligned because the FileHandleCache needs cache line 
alignment
+// for its partitions.
+class DiskIoMgr : public CacheLineAligned {
  public:
   class ScanRange;
 
-  /// This class is a small wrapper around the hdfsFile handle and the file 
system
-  /// instance which is needed to close the file handle in case of eviction. It
-  /// additionally encapsulates the last modified time of the associated file 
when it was
-  /// last opened.
-  class HdfsCachedFileHandle {
-   public:
-
-    /// Constructor will open the file
-    HdfsCachedFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
-
-    /// Destructor will close the file handle
-    ~HdfsCachedFileHandle();
-
-    hdfsFile file() const { return hdfs_file_;  }
-
-    int64_t mtime() const { return mtime_; }
-
-    /// This method is called to release acquired resources by the cached 
handle when it
-    /// is evicted.
-    static void Release(HdfsCachedFileHandle** h);
-
-    bool ok() const { return hdfs_file_ != NULL; }
-
-   private:
-    hdfsFS fs_;
-    hdfsFile hdfs_file_;
-    int64_t mtime_;
-  };
-
   /// Buffer struct that is used by the caller and IoMgr to pass read buffers.
   /// It is is expected that only one thread has ownership of this object at a
   /// time.
@@ -306,7 +280,7 @@ class DiskIoMgr {
     /// true if the current scan range is complete
     bool eosr_;
 
-    /// Status of the read to this buffer. if status is not ok, 'buffer' is 
NULL
+    /// Status of the read to this buffer. if status is not ok, 'buffer' is 
nullptr
     Status status_;
 
     int64_t scan_range_offset_;
@@ -327,6 +301,7 @@ class DiskIoMgr {
    public:
     hdfsFS fs() const { return fs_; }
     const char* file() const { return file_.c_str(); }
+    std::string* file_string() { return &file_; }
     int64_t offset() const { return offset_; }
     int64_t len() const { return len_; }
     int disk_id() const { return disk_id_; }
@@ -334,9 +309,9 @@ class DiskIoMgr {
 
    protected:
     RequestRange(RequestType::type request_type)
-      : fs_(NULL), offset_(-1), len_(-1), disk_id_(-1), 
request_type_(request_type) {}
+      : fs_(nullptr), offset_(-1), len_(-1), disk_id_(-1), 
request_type_(request_type) {}
 
-    /// Hadoop filesystem that contains file_, or set to NULL for local 
filesystem.
+    /// Hadoop filesystem that contains file_, or set to nullptr for local 
filesystem.
     hdfsFS fs_;
 
     /// Path to file being read or written.
@@ -364,12 +339,12 @@ class DiskIoMgr {
     BufferOpts(bool try_cache, int64_t mtime)
       : try_cache_(try_cache),
         mtime_(mtime),
-        client_buffer_(NULL),
+        client_buffer_(nullptr),
         client_buffer_len_(-1) {}
 
     /// Set options for an uncached read into an IoMgr-allocated buffer.
     static BufferOpts Uncached() {
-      return BufferOpts(false, NEVER_CACHE, NULL, -1);
+      return BufferOpts(false, NEVER_CACHE, nullptr, -1);
     }
 
     /// Set options to read the entire scan range into 'client_buffer'. The 
length of the
@@ -400,7 +375,7 @@ class DiskIoMgr {
     /// NEVER_CACHE, caching is disabled.
     const int64_t mtime_;
 
-    /// A destination buffer provided by the client, NULL and -1 if no buffer.
+    /// A destination buffer provided by the client, nullptr and -1 if no 
buffer.
     uint8_t* const client_buffer_;
     const int64_t client_buffer_len_;
   };
@@ -416,7 +391,7 @@ class DiskIoMgr {
     virtual ~ScanRange();
 
     /// Resets this scan range object with the scan range description. The 
scan range
-    /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is NULL 
for the
+    /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is 
nullptr for the
     /// local filesystem). The scan range must fall within the file bounds 
(offset >= 0
     /// and offset + len <= file_length). 'disk_id' is the disk queue to add 
the range
     /// to. If 'expected_local' is true, a warning is generated if the read 
did not
@@ -424,7 +399,7 @@ class DiskIoMgr {
     /// see the DiskIoMgr class comment and the BufferOpts comments for 
details.
     /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary 
data.
     void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int 
disk_id,
-        bool expected_local, const BufferOpts& buffer_opts, void* meta_data = 
NULL);
+        bool expected_local, const BufferOpts& buffer_opts, void* meta_data = 
nullptr);
 
     void* meta_data() const { return meta_data_; }
     bool try_cache() const { return try_cache_; }
@@ -433,7 +408,7 @@ class DiskIoMgr {
 
     /// Returns the next buffer for this scan range. buffer is an output 
parameter.
     /// This function blocks until a buffer is ready or an error occurred. If 
this is
-    /// called when all buffers have been returned, *buffer is set to NULL and 
Status::OK
+    /// called when all buffers have been returned, *buffer is set to nullptr 
and Status::OK
     /// is returned.
     /// Only one thread can be in GetNext() at any time.
     Status GetNext(BufferDescriptor** buffer) WARN_UNUSED_RESULT;
@@ -474,7 +449,15 @@ class DiskIoMgr {
     int64_t MaxReadChunkSize() const;
 
     /// Opens the file for this range. This function only modifies state in 
this range.
-    Status Open();
+    /// If 'use_file_handle_cache' is true and this is a local hdfs file, then 
this scan
+    /// range will not maintain an exclusive file handle. It will borrow an 
hdfs file
+    /// handle from the file handle cache for each Read(), so Open() does 
nothing.
+    /// If 'use_file_handle_cache' is false or this is a remote hdfs file or 
this is
+    /// a local OS file, Open() will maintain a file handle on the scan range 
for
+    /// exclusive use by this scan range. An exclusive hdfs file handle still 
comes
+    /// from the cache, but it is held for the entire duration of a scan 
range's lifetime.
+    /// All local OS files are opened using normal OS file APIs.
+    Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT;
 
     /// Closes the file for this range. This function only modifies state in 
this range.
     void Close();
@@ -484,6 +467,12 @@ class DiskIoMgr {
     Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read,
         bool* eosr) WARN_UNUSED_RESULT;
 
+    /// Get the read statistics from the Hdfs file handle and aggregate them to
+    /// the DiskIoRequestContext. This clears the statistics on this file 
handle.
+    /// It is safe to pass hdfsFile by value, as hdfsFile's underlying type is 
a
+    /// pointer.
+    void GetHdfsStatistics(hdfsFile fh);
+
     /// Reads from the DN cache. On success, sets cached_buffer_ to the DN 
buffer
     /// and *read_succeeded to true.
     /// If the data is not cached, returns ok() and *read_succeeded is set to 
false.
@@ -492,7 +481,7 @@ class DiskIoMgr {
 
     /// Pointer to caller specified metadata. This is untouched by the io 
manager
     /// and the caller can put whatever auxiliary data in here.
-    void* meta_data_;
+    void* meta_data_ = nullptr;
 
     /// If true, this scan range is expected to be cached. Note that this 
might be wrong
     /// since the block could have been uncached. In that case, the cached path
@@ -506,18 +495,31 @@ class DiskIoMgr {
     /// TODO: we can do more with this
     bool expected_local_;
 
-    DiskIoMgr* io_mgr_;
+    /// Total number of bytes read remotely. This is necessary to maintain a 
count of
+    /// the number of remote scan ranges. Since IO statistics can be collected 
multiple
+    /// times for a scan range, it is necessary to keep some state about 
whether this
+    /// scan range has already been counted as remote. There is also a 
requirement to
+    /// log the number of unexpected remote bytes for a scan range. To solve 
both
+    /// requirements, maintain num_remote_bytes_ on the ScanRange and push it 
to the
+    /// reader_ once at the close of the scan range.
+    int64_t num_remote_bytes_;
+
+    DiskIoMgr* io_mgr_ = nullptr;
 
     /// Reader/owner of the scan range
-    DiskIoRequestContext* reader_;
+    DiskIoRequestContext* reader_ = nullptr;
 
     /// File handle either to hdfs or local fs (FILE*)
-    ///
-    /// TODO: The pointer to HdfsCachedFileHandle is manually managed and 
should be
-    /// replaced by unique_ptr in C++11
+    /// The hdfs file handle is only stored here in three cases:
+    /// 1. The file handle cache is off (max_cached_file_handles == 0).
+    /// 2. The scan range is using hdfs caching.
+    /// -OR-
+    /// 3. The hdfs file is expected to be remote (expected_local_ == false)
+    /// In each case, the scan range gets a file handle from the file handle 
cache
+    /// at Open() and holds it exclusively until Close() is called.
     union {
-      FILE* local_file_;
-      HdfsCachedFileHandle* hdfs_file_;
+      FILE* local_file_ = nullptr;
+      HdfsFileHandle* exclusive_hdfs_fh_;
     };
 
     /// Tagged union that holds a buffer for the cases when there is a buffer 
allocated
@@ -536,7 +538,7 @@ class DiskIoMgr {
 
       /// Valid and non-NULL if the external_buffer_tag_ is CACHED_BUFFER, 
which means
       /// that a cached read succeeded and all the bytes for the range are in 
this buffer.
-      struct hadoopRzBuffer* cached_buffer_;
+      struct hadoopRzBuffer* cached_buffer_ = nullptr;
     };
 
     /// Lock protecting fields below.
@@ -697,7 +699,7 @@ class DiskIoMgr {
   /// Returns the next unstarted scan range for this reader. When the range is 
returned,
   /// the disk threads in the IoMgr will already have started reading from it. 
The
   /// caller is expected to call ScanRange::GetNext on the returned range.
-  /// If there are no more unstarted ranges, NULL is returned.
+  /// If there are no more unstarted ranges, nullptr is returned.
   /// This call is blocking.
   Status GetNextRange(DiskIoRequestContext* reader, ScanRange** range) 
WARN_UNUSED_RESULT;
 
@@ -732,6 +734,8 @@ class DiskIoMgr {
   int64_t bytes_read_dn_cache(DiskIoRequestContext* reader) const;
   int num_remote_ranges(DiskIoRequestContext* reader) const;
   int64_t unexpected_remote_bytes(DiskIoRequestContext* reader) const;
+  int cached_file_handles_hit_count(DiskIoRequestContext* reader) const;
+  int cached_file_handles_miss_count(DiskIoRequestContext* reader) const;
 
   /// Returns the read throughput across all readers.
   /// TODO: should this be a sliding window?  This should report metrics for 
the
@@ -763,16 +767,20 @@ class DiskIoMgr {
   /// for debugging.
   bool Validate() const;
 
-  /// Given a FS handle, name and last modified time of the file, tries to 
open that file
-  /// and return an instance of HdfsCachedFileHandle. In case of an error 
returns NULL.
-  HdfsCachedFileHandle* OpenHdfsFile(const hdfsFS& fs, const char* fname, 
int64_t mtime);
+  /// Given a FS handle, name and last modified time of the file, gets an 
HdfsFileHandle
+  /// from the file handle cache. On success, records statistics about whether 
this was
+  /// a cache hit or miss in the `reader` as well as at the system level. In 
case of an
+  /// error returns nullptr.
+  HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs,
+      std::string* fname, int64_t mtime, DiskIoRequestContext *reader);
 
-  /// When the file handle is no longer in use by the scan range, return it 
and try to
-  /// unbuffer the handle. If unbuffering, closing sockets and dropping 
buffers in the
-  /// libhdfs client, is not supported, close the file handle. If the unbuffer 
operation
-  /// is supported, put the file handle together with the mtime in the LRU 
cache for
-  /// later reuse.
-  void CacheOrCloseFileHandle(const char* fname, HdfsCachedFileHandle* fid, 
bool close);
+  /// Releases a file handle back to the file handle cache when it is no 
longer in use.
+  void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid);
+
+  /// Reopens a file handle by destroying the file handle and getting a fresh
+  /// file handle from the cache. Returns an error if the file could not be 
reopened.
+  Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, 
int64_t mtime,
+      HdfsFileHandle** fid);
 
   /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the 
buffers if
   /// 'bytes_to_free' is -1.
@@ -823,7 +831,7 @@ class DiskIoMgr {
   ThreadGroup disk_thread_group_;
 
   /// Options object for cached hdfs reads. Set on startup and never modified.
-  struct hadoopRzOptions* cached_read_options_;
+  struct hadoopRzOptions* cached_read_options_ = nullptr;
 
   /// True if the IoMgr should be torn down. Worker threads watch for this to
   /// know to terminate. This variable is read/written to by different threads.
@@ -876,10 +884,13 @@ class DiskIoMgr {
   /// round-robin assignment for that case.
   static AtomicInt32 next_disk_id_;
 
+  // Number of file handle cache partitions to use
+  static const size_t NUM_FILE_HANDLE_CACHE_PARTITIONS = 16;
+
   // Caching structure that maps file names to cached file handles. The cache 
has an upper
   // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached 
file
   // handles are closed.
-  FifoMultimap<std::string, HdfsCachedFileHandle*> file_handle_cache_;
+  FileHandleCache<NUM_FILE_HANDLE_CACHE_PARTITIONS> file_handle_cache_;
 
   /// Returns the index into free_buffers_ for a given buffer size
   int free_buffers_idx(int64_t buffer_size);
@@ -906,7 +917,7 @@ class DiskIoMgr {
   /// Returns a buffer desc object which can now be used for another reader.
   void ReturnBufferDesc(BufferDescriptor* desc);
 
-  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
NULL), either
+  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
nullptr), either
   /// freeing it or returning it to 'free_buffers_'. Memory tracking is 
updated to
   /// reflect the transfer of ownership from desc->mem_tracker_ to the disk 
I/O mgr.
   void FreeBufferMemory(BufferDescriptor* desc);
@@ -953,9 +964,9 @@ class DiskIoMgr {
   void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, 
ScanRange* range);
 
   /// Try to allocate the next buffer for the scan range, returning the new 
buffer
-  /// if successful. If 'reader' is cancelled, cancels the range and returns 
NULL.
+  /// if successful. If 'reader' is cancelled, cancels the range and returns 
nullptr.
   /// If there is memory pressure and buffers are already queued, adds the 
range
-  /// to the blocked ranges and returns NULL.
+  /// to the blocked ranges and returns nullptr.
   BufferDescriptor* TryAllocateNextBufferForRange(DiskQueue* disk_queue,
       DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size);
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/tests/custom_cluster/test_hdfs_fd_caching.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py 
b/tests/custom_cluster/test_hdfs_fd_caching.py
index c030c8c..bcf50e6 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -58,41 +58,41 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args="--max_cached_file_handles=5",
+      impalad_args="--max_cached_file_handles=16",
       catalogd_args="--load_catalog_in_background=false")
   def test_scan_does_cache_fd(self, vector):
     """Tests that an hdfs scan will lead to caching HDFS file descriptors."""
 
     # Maximum number of file handles cached
-    assert self.max_cached_handles() <= 5
-    # One table, one file, one handle
+    assert self.max_cached_handles() <= 16
+    # The table has one file, so there should be one more handle cached after 
the
+    # first select.
     num_handles_before = self.cached_handles()
-    self.execute_query("select * from cachefd.simple limit 1", vector=vector)
+    self.execute_query("select * from cachefd.simple", vector=vector)
     num_handles_after = self.cached_handles()
-    assert self.max_cached_handles() <= 5
+    assert self.max_cached_handles() <= 16
 
-    # Should have at least one more handle cached and not more than three more
-    # as there are three Impalads.
-    assert (num_handles_before + 1) <= num_handles_after <= 
(num_handles_before + 3)
+    # Should have one more file handle
+    assert num_handles_after == (num_handles_before + 1)
 
     # No open handles if scanning is finished
     assert self.outstanding_handles() == 0
 
     # No change when reading the table again
     for x in range(10):
-      self.execute_query("select * from cachefd.simple limit 1", vector=vector)
+      self.execute_query("select * from cachefd.simple", vector=vector)
+      assert self.cached_handles() == num_handles_after
+      assert self.max_cached_handles() <= 16
+      assert self.outstanding_handles() == 0
 
-    assert self.max_cached_handles() <= 5
-    assert num_handles_after == self.cached_handles()
-    assert self.outstanding_handles() == 0
-
-    # Create more files
-    self.create_n_files(10)
+    # Create more files. This means there are more files than the cache size.
+    # The cache size should still be enforced.
+    self.create_n_files(100)
 
     # Read all the files of the table and make sure no FD leak
     for x in range(10):
       self.execute_query("select count(*) from cachefd.simple;", vector=vector)
-      assert self.max_cached_handles() <= 5
+      assert self.max_cached_handles() <= 16
     assert self.outstanding_handles() == 0
 
   def cached_handles(self):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/tests/metadata/test_refresh_partition.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_refresh_partition.py 
b/tests/metadata/test_refresh_partition.py
index a8b5042..187055c 100644
--- a/tests/metadata/test_refresh_partition.py
+++ b/tests/metadata/test_refresh_partition.py
@@ -144,10 +144,12 @@ class TestRefreshPartition(ImpalaTestSuite):
 
     self.run_stmt_in_hive(
         'alter table %s drop partition (y=333, z=5309)' % table_name)
-        # Query the table and check for expected error.
+
+    # Query the table. With file handle caching, this may not produce an error,
+    # because the file handles are still open in the cache. If the system does
+    # produce an error, it should be the expected error.
     try:
       self.client.execute("select * from %s" % table_name)
-      assert False, "Query was expected to fail"
     except ImpalaBeeswaxException as e:
       assert expected_error in str(e)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/tests/query_test/test_hdfs_fd_caching.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_hdfs_fd_caching.py 
b/tests/query_test/test_hdfs_fd_caching.py
index 6961f3b..fa03b24 100644
--- a/tests/query_test/test_hdfs_fd_caching.py
+++ b/tests/query_test/test_hdfs_fd_caching.py
@@ -21,74 +21,104 @@ import pytest
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfS3, SkipIfADLS
+from tests.common.test_vector import ImpalaTestDimension
+from subprocess import call
+from tests.util.filesystem_utils import FILESYSTEM_PREFIX
 
+# Modifications to test with the file handle cache
+MODIFICATION_TYPES=["delete_files", "delete_directory", "move_file", "append"]
 
 @SkipIfS3.caching
 @SkipIfADLS.caching
 class TestHdfsFdCaching(ImpalaTestSuite):
   """
-  This test suite tests the behavior of HDFS file descriptor caching by 
evaluating the
-  metrics exposed by the Impala daemon.
+  This test suite tests the behavior of HDFS file descriptor caching, including
+  potential error cases such as file deletes.
   """
 
-  NUM_ROWS = 10000
-
   @classmethod
   def file_format_constraint(cls, v):
-    return v.get_value('table_format').file_format in ["parquet"]
+    return v.get_value('table_format').file_format in ["text"]
 
   @classmethod
   def add_test_dimensions(cls):
     super(TestHdfsFdCaching, cls).add_test_dimensions()
+    
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('modification_type',\
+        *MODIFICATION_TYPES))
     cls.ImpalaTestMatrix.add_constraint(cls.file_format_constraint)
 
   @classmethod
   def get_workload(cls):
     return 'functional-query'
 
+  def setup_ext_table(self, vector, unique_database, new_table_location):
+    # Use HDFS commands to clone the table's files at the hdfs level
+    old_table_location = 
"{0}/test-warehouse/tinytable".format(FILESYSTEM_PREFIX)
+    call(["hdfs", "dfs", "-mkdir", new_table_location])
+    call(["hdfs", "dfs", "-cp", old_table_location + "/*", new_table_location])
 
-  def setup_method(self, method):
-    self.cleanup_db("cachefd")
-    self.client.execute("create database cachefd")
-    self.client.execute("create table cachefd.simple(id int, col1 int, col2 
int) "
-                        "stored as parquet")
-    buf = "insert into cachefd.simple values"
-    self.client.execute(buf + ", ".join(["({0},{0},{0})".format(x) for x in 
range(self.NUM_ROWS)]))
-
-  def teardown_method(self, methd):
-    self.cleanup_db("cachedfd")
-
+    # Create an external table with the new files (tinytable has two string 
columns)
+    create_table = "create external table {0}.t1 (a string, b string) "\
+        + "row format delimited fields terminated by \',\' location \'{1}\'"
+    self.client.execute(create_table.format(unique_database, 
new_table_location))
 
   @pytest.mark.execute_serially
-  def test_simple_scan(self, vector):
-    """Tests that in the default configuration, file handle caching is 
disabled and no
-    file handles are cached."""
-
-    num_handles_before = self.cached_handles()
-    assert 0 == num_handles_before
-    self.execute_query("select * from cachefd.simple limit 1", vector=vector)
-    num_handles_after = self.cached_handles()
-    assert 0 == num_handles_after
-    assert num_handles_after == num_handles_before
-    assert 0 == self.outstanding_handles()
-
-    # No change when reading the table again
-    for x in range(10):
-      self.execute_query("select * from cachefd.simple limit 1", vector=vector)
-
-    # TODO This assertion fails reliably in the Kudu feature branch build for 
reasons yet
-    # unknown, since it seems unrelated to other changes. Once the root cause 
for the
-    # failure is known this assertion should be uncommented.
-    # assert num_handles_after == self.cached_handles()
-    assert 0 == self.outstanding_handles()
-
-  def cached_handles(self):
-    return self.get_agg_metric("impala-server.io.mgr.num-cached-file-handles")
-
-  def outstanding_handles(self):
-    return 
self.get_agg_metric("impala-server.io.mgr.num-file-handles-outstanding")
-
-  def get_agg_metric(self, key, fun=sum):
-    cluster = ImpalaCluster()
-    return fun([s.service.get_metric_value(key) for s
-                in cluster.impalads])
+  def test_file_modifications(self, vector, unique_database):
+    """Tests file modifications on a file that is cached in the file handle 
cache."""
+
+    new_table_location = "{0}/test-warehouse/{1}".format(FILESYSTEM_PREFIX,\
+        unique_database)
+    self.setup_ext_table(vector, unique_database, new_table_location)
+
+    # Query the table (puts file handle in the cache)
+    count_query = "select count(*) from {0}.t1".format(unique_database)
+    original_result = self.execute_query_expect_success(self.client, 
count_query)
+    assert(original_result.data[0] == '3')
+
+    # Do the modification based on the test settings
+    modification_type = vector.get_value('modification_type')
+    if (modification_type == 'delete_files'):
+      # Delete the data file (not the directory)
+      call(["hdfs", "dfs", "-rm", "-skipTrash", new_table_location + "/*"])
+    elif (modification_type == 'delete_directory'):
+      # Delete the whole directory (including data file)
+      call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", new_table_location])
+    elif (modification_type == 'move_file'):
+      # Move the file underneath the directory
+      call(["hdfs", "dfs", "-mv", new_table_location + "/data.csv", \
+          new_table_location + "/data.csv.moved"])
+    elif (modification_type == 'append'):
+      # Append a copy of the hdfs file to itself (duplicating all entries)
+      local_tmp_location = "/tmp/{0}.data.csv".format(unique_database)
+      call(["hdfs", "dfs", "-copyToLocal", new_table_location + "/data.csv", \
+           local_tmp_location])
+      call(["hdfs", "dfs", "-appendToFile", local_tmp_location, \
+           new_table_location + "/data.csv"])
+      call(["rm", local_tmp_location])
+    else:
+      assert(false)
+
+    # The query might fail, but nothing should crash.
+    self.execute_query(count_query)
+
+    # Invalidate metadata
+    invalidate_metadata_sql = "invalidate metadata 
{0}.t1".format(unique_database)
+    self.execute_query_expect_success(self.client, invalidate_metadata_sql)
+
+    # Verify that nothing crashes and the query should succeed
+    new_result = self.execute_query_expect_success(self.client, count_query)
+    if (modification_type == 'move_file'):
+      assert(new_result.data[0] == '3')
+    elif (modification_type == 'delete_files' or \
+          modification_type == 'delete_directory'):
+      assert(new_result.data[0] == '0')
+    elif (modification_type == 'append'):
+      assert(new_result.data[0] == '6')
+
+    # Drop table
+    drop_table_sql = "drop table {0}.t1".format(unique_database)
+    self.execute_query_expect_success(self.client, drop_table_sql)
+
+    # Cleanup directory (which may already be gone)
+    call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", new_table_location])
+

Reply via email to