IMPALA-1878: Support INSERT and LOAD DATA on S3 and between filesystems

Previously Impala disallowed LOAD DATA and INSERT on S3. This patch
functionally enables LOAD DATA and INSERT on S3 without making major
changes for the sake of improving performance over S3. This patch also
enables both INSERT and LOAD DATA between file systems.

S3 does not support the rename operation, so the staged files in S3
are copied instead of renamed, which contributes to the slow
performance on S3.

The FinalizeSuccessfulInsert() function now does not make any
underlying assumptions of the filesystem it is on and works across
all supported filesystems. This is done by adding a full URI field to
the base directory for a partition in the TInsertPartitionStatus.
Also, the HdfsOp class now does not assume a single filesystem and
gets connections to the filesystems based on the URI of the file it
is operating on.

Added a python S3 client called 'boto3' to access S3 from the python
tests. A new class called S3Client is introduced which creates
wrappers around the boto3 functions and have the same function
signatures as PyWebHdfsClient by deriving from a base abstract class
BaseFileSystem so that they can be interchangeably through a
'generic_client'. test_load.py is refactored to use this generic
client. The ImpalaTestSuite setup creates a client according to the
TARGET_FILESYSTEM environment variable and assigns it to the
'generic_client'.

P.S: Currently, the test_load.py runs 4x slower on S3 than on
HDFS. Performance needs to be improved in future patches. INSERT
performance is slower than on HDFS too. This is mainly because of an
extra copy that happens between staging and the final location of a
file. However, larger INSERTs come closer to HDFS permformance than
smaller inserts.

ACLs are not taken care of for S3 in this patch. It is something
that still needs to be discussed before implementing.

Change-Id: I94e15ad67752dce21c9b7c1dced6e114905a942d
Reviewed-on: http://gerrit.cloudera.org:8080/2574
Reviewed-by: Sailesh Mukil <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ed7f5ebf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ed7f5ebf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ed7f5ebf

Branch: refs/heads/master
Commit: ed7f5ebf53e9e318fbf1a9842e0e77c1b81498d3
Parents: 99879aa
Author: Sailesh Mukil <[email protected]>
Authored: Thu Jan 28 10:52:16 2016 -0800
Committer: Tim Armstrong <[email protected]>
Committed: Thu May 12 14:17:49 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc        |   2 +-
 be/src/exec/hdfs-table-sink.cc                  |  39 ++---
 be/src/exec/hdfs-table-sink.h                   |  22 ++-
 be/src/runtime/coordinator.cc                   |  65 +++++---
 be/src/runtime/disk-io-mgr-scan-range.cc        |   2 +-
 be/src/runtime/disk-io-mgr.cc                   |   4 +-
 be/src/runtime/hdfs-fs-cache.h                  |   2 +-
 be/src/util/CMakeLists.txt                      |   1 +
 be/src/util/hdfs-bulk-ops.cc                    | 120 +++++++++------
 be/src/util/hdfs-bulk-ops.h                     |  39 ++---
 be/src/util/hdfs-util-test.cc                   |  78 ++++++++++
 be/src/util/hdfs-util.cc                        |  43 +++++-
 be/src/util/hdfs-util.h                         |   4 +-
 common/thrift/ImpalaInternalService.thrift      |   3 +
 .../cloudera/impala/analysis/InsertStmt.java    |   6 +-
 .../cloudera/impala/analysis/LoadDataStmt.java  |  18 +--
 .../cloudera/impala/common/FileSystemUtil.java  |  98 ++++++++----
 .../com/cloudera/impala/service/Frontend.java   |   9 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |   6 +-
 .../cloudera/impala/planner/S3PlannerTest.java  |   2 +-
 infra/python/deps/requirements.txt              |   8 +-
 .../queries/QueryTest/insert_permutation.test   |   5 +-
 .../queries/QueryTest/multiple-filesystems.test |  63 +++++---
 .../queries/QueryTest/truncate-table.test       |   8 +-
 .../workloads/tpch/queries/insert_parquet.test  |   5 +-
 tests/common/impala_test_suite.py               |   3 +
 tests/common/skip.py                            |   8 +-
 tests/custom_cluster/test_insert_behaviour.py   |   2 +-
 .../test_parquet_max_page_header.py             |   1 -
 tests/data_errors/test_data_errors.py           |   1 -
 tests/metadata/test_col_stats.py                |   1 -
 tests/metadata/test_compute_stats.py            |   3 -
 tests/metadata/test_ddl.py                      | 151 +++++++++----------
 tests/metadata/test_explain.py                  |  18 +--
 tests/metadata/test_hdfs_encryption.py          |   4 +-
 tests/metadata/test_hdfs_permissions.py         |   2 +-
 tests/metadata/test_last_ddl_time_update.py     |   1 -
 tests/metadata/test_load.py                     |  30 ++--
 tests/metadata/test_partition_metadata.py       |   7 +-
 tests/metadata/test_recover_partitions.py       |  46 +++---
 tests/metadata/test_show_create_table.py        |   1 -
 tests/query_test/test_aggregation.py            |   1 -
 tests/query_test/test_cancellation.py           |   1 -
 tests/query_test/test_chars.py                  |   1 -
 tests/query_test/test_compressed_formats.py     |   2 -
 tests/query_test/test_delimited_text.py         |   3 -
 tests/query_test/test_insert.py                 |   3 -
 tests/query_test/test_insert_behaviour.py       |  81 +++++-----
 tests/query_test/test_insert_parquet.py         |  16 +-
 tests/query_test/test_insert_permutation.py     |   1 -
 tests/query_test/test_join_queries.py           |   1 -
 tests/query_test/test_multiple_filesystems.py   |   1 -
 tests/query_test/test_partitioning.py           |   4 +-
 tests/query_test/test_queries.py                |   1 -
 tests/query_test/test_scanners.py               |   2 -
 tests/shell/test_shell_commandline.py           |   1 -
 tests/util/filesystem_base.py                   |  61 ++++++++
 tests/util/filesystem_utils.py                  |   3 +
 tests/util/hdfs_util.py                         |  46 +++---
 tests/util/s3_util.py                           |  99 ++++++++++++
 60 files changed, 809 insertions(+), 450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc 
b/be/src/exec/hdfs-parquet-table-writer.cc
index e2fb59c..3743503 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -814,7 +814,7 @@ Status HdfsParquetTableWriter::InitNewFile() {
   per_file_mem_pool_->Clear();
 
   // Get the file limit
-  RETURN_IF_ERROR(HdfsTableSink::GetFileBlockSize(output_, &file_size_limit_));
+  file_size_limit_ = output_->block_size;
   if (file_size_limit_ < HDFS_MIN_FILE_SIZE) {
     stringstream ss;
     ss << "Hdfs file size (" << file_size_limit_ << ") is too small.";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 74ff018..d145d9a 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -68,7 +68,7 @@ HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc,
 
 OutputPartition::OutputPartition()
     : hdfs_connection(NULL), tmp_hdfs_file(NULL), num_rows(0), num_files(0),
-      partition_descriptor(NULL) {
+      partition_descriptor(NULL), block_size(0) {
 }
 
 Status HdfsTableSink::PrepareExprs(RuntimeState* state) {
@@ -135,7 +135,7 @@ Status HdfsTableSink::Prepare(RuntimeState* state) {
     return Status(error_msg.str());
   }
 
-  staging_dir_ = Substitute("$0/_impala_insert_staging/$1/", 
table_desc_->hdfs_base_dir(),
+  staging_dir_ = Substitute("$0/_impala_insert_staging/$1", 
table_desc_->hdfs_base_dir(),
       PrintId(state->query_id(), "_"));
 
   RETURN_IF_ERROR(PrepareExprs(state));
@@ -298,6 +298,25 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
 
   output_partition->tmp_hdfs_file = hdfsOpenFile(hdfs_connection_,
       tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size);
+
+  if (IsS3APath(output_partition->current_file_name.c_str())) {
+    // On S3A, the file cannot be stat'ed until after it's closed, and even 
so, the block
+    // size reported will be just the filesystem default. So, remember the 
requested
+    // block size.
+    output_partition->block_size = block_size;
+  } else {
+    // HDFS may choose to override the block size that we've recommended, so 
for non-S3
+    // files, we get the block size by stat-ing the file.
+    hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection,
+        output_partition->current_file_name.c_str());
+    if (info == NULL) {
+      return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS 
file: ",
+          output_partition->current_file_name));
+    }
+    output_partition->block_size = info->mBlockSize;
+    hdfsFreeFileInfo(info, 1);
+  }
+
   VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr;
   if (output_partition->tmp_hdfs_file == NULL) {
     return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ",
@@ -478,6 +497,7 @@ inline Status 
HdfsTableSink::GetOutputPartition(RuntimeState* state,
     partition_status.__set_num_appended_rows(0L);
     partition_status.__set_id(partition_descriptor->id());
     partition_status.__set_stats(TInsertStats());
+    partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir());
     state->per_partition_status()->insert(
         make_pair(partition->partition_name, partition_status));
 
@@ -644,21 +664,6 @@ void HdfsTableSink::Close(RuntimeState* state) {
   closed_ = true;
 }
 
-Status HdfsTableSink::GetFileBlockSize(OutputPartition* output_partition, 
int64_t* size) {
-  hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection,
-      output_partition->current_file_name.c_str());
-
-  if (info == NULL) {
-    return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS file: 
",
-        output_partition->current_file_name));
-  }
-
-  *size = info->mBlockSize;
-  hdfsFreeFileInfo(info, 1);
-
-  return Status::OK();
-}
-
 string HdfsTableSink::DebugString() const {
   stringstream out;
   out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 9c5ff71..2083ab2 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -35,16 +35,16 @@ class RuntimeState;
 class HdfsTableWriter;
 class MemTracker;
 
-/// Records the temporary and final Hdfs file name, the opened temporary Hdfs 
file, and the
-/// number of appended rows of an output partition.
+/// Records the temporary and final Hdfs file name, the opened temporary Hdfs 
file, and
+/// the number of appended rows of an output partition.
 struct OutputPartition {
   /// In the below, <unique_id_str> is the unique ID passed to HdfsTableSink 
in string
   /// form. It is typically the fragment ID that owns the sink.
 
   /// Full path to root of the group of files that will be created for this 
partition.
   /// Each file will have a sequence number appended.  A table writer may 
produce multiple
-  /// files per partition. The root is either partition_descriptor->location 
(if non-empty,
-  /// i.e. the partition has a custom location) or table_dir/partition_name/
+  /// files per partition. The root is either partition_descriptor->location 
(if
+  /// non-empty, i.e. the partition has a custom location) or 
table_dir/partition_name/
   /// Path: <root>/<unique_id_str>
   std::string final_hdfs_file_name_prefix;
 
@@ -85,12 +85,15 @@ struct OutputPartition {
   /// The descriptor for this partition.
   const HdfsPartitionDescriptor* partition_descriptor;
 
+  /// The block size decided on for this file.
+  int64_t block_size;
+
   OutputPartition();
 };
 
-/// The sink consumes all row batches of its child execution tree, and writes 
the evaluated
-/// output_exprs into temporary Hdfs files. The query coordinator moves the 
temporary files
-/// into their final locations after the sinks have finished executing.
+/// The sink consumes all row batches of its child execution tree, and writes 
the
+/// evaluated output_exprs into temporary Hdfs files. The query coordinator 
moves the
+/// temporary files into their final locations after the sinks have finished 
executing.
 //
 /// This sink supports static and dynamic partition inserts (Hive terminology),
 /// as well as inserting into unpartitioned tables,
@@ -143,11 +146,6 @@ class HdfsTableSink : public DataSink {
   /// Closes output_exprs and partition_key_exprs.
   virtual void Close(RuntimeState* state);
 
-  /// Get the block size of the current file opened for this partition.
-  /// This is a utility routine that can be called by specific table
-  /// writers.  Currently used by the parquet writer.
-  static Status GetFileBlockSize(OutputPartition* output_partition, int64_t* 
size);
-
   int skip_header_line_count() const { return skip_header_line_count_; }
 
   virtual RuntimeProfile* profile() { return runtime_profile_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 0a792f6..451c5e9 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -760,17 +760,12 @@ void Coordinator::PopulatePathPermissionCache(hdfsFS fs, 
const string& path_str,
 
 Status Coordinator::FinalizeSuccessfulInsert() {
   PermissionCache permissions_cache;
-  hdfsFS hdfs_connection;
-  // InsertStmt ensures that all partitions are on the same filesystem as the 
table's
-  // base directory, so opening a single connection is okay.
-  // TODO: modify this code so that restriction can be lifted.
-  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-      finalize_params_.hdfs_base_dir, &hdfs_connection));
+  HdfsFsCache::HdfsFsMap filesystem_connection_cache;
+  HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
 
   // INSERT finalization happens in the five following steps
   // 1. If OVERWRITE, remove all the files in the target directory
   // 2. Create all the necessary partition directories.
-  HdfsOperationSet partition_create_ops(&hdfs_connection);
   DescriptorTbl* descriptor_table;
   DescriptorTbl::Create(obj_pool(), desc_tbl_, &descriptor_table);
   HdfsTableDescriptor* hdfs_table = static_cast<HdfsTableDescriptor*>(
@@ -783,6 +778,14 @@ Status Coordinator::FinalizeSuccessfulInsert() {
   BOOST_FOREACH(const PartitionStatusMap::value_type& partition, 
per_partition_status_) {
     SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, 
"Overwrite/PartitionCreationTimer",
           "FinalizationTimer"));
+    // INSERT allows writes to tables that have partitions on multiple 
filesystems.
+    // So we need to open connections to different filesystems as necessary. 
We use a
+    // local connection cache and populate it with one connection per 
filesystem that the
+    // partitions are on.
+    hdfsFS partition_fs_connection;
+    RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+      partition.second.partition_base_dir, &partition_fs_connection,
+          &filesystem_connection_cache));
 
     // Look up the partition in the descriptor table.
     stringstream part_path_ss;
@@ -798,6 +801,7 @@ Status Coordinator::FinalizeSuccessfulInsert() {
       part_path_ss << part->location();
     }
     const string& part_path = part_path_ss.str();
+    bool is_s3_path = IsS3APath(part_path.c_str());
 
     // If this is an overwrite insert, we will need to delete any updated 
partitions
     if (finalize_params_.is_overwrite) {
@@ -816,11 +820,11 @@ Status Coordinator::FinalizeSuccessfulInsert() {
         // Once HDFS-8407 is fixed, the errno reset won't be needed.
         errno = 0;
         hdfsFileInfo* existing_files =
-            hdfsListDirectory(hdfs_connection, part_path.c_str(), &num_files);
+            hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
         if (existing_files == NULL && errno == EAGAIN) {
           errno = 0;
           existing_files =
-              hdfsListDirectory(hdfs_connection, part_path.c_str(), 
&num_files);
+              hdfsListDirectory(partition_fs_connection, part_path.c_str(), 
&num_files);
         }
         // hdfsListDirectory() returns NULL not only when there is an error 
but also
         // when the directory is empty(HDFS-8407). Need to check errno to make 
sure
@@ -840,10 +844,18 @@ Status Coordinator::FinalizeSuccessfulInsert() {
         // recursively with abandon, after checking that it ever existed.
         // TODO: There's a potential race here between checking for the 
directory
         // and a third-party deleting it.
-        if (FLAGS_insert_inherit_permissions) {
-          PopulatePathPermissionCache(hdfs_connection, part_path, 
&permissions_cache);
+        if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+          // There is no directory structure in S3, so "inheriting" 
permissions is not
+          // possible.
+          // TODO: Try to mimic inheriting permissions for S3.
+          PopulatePathPermissionCache(
+              partition_fs_connection, part_path, &permissions_cache);
         }
-        if (hdfsExists(hdfs_connection, part_path.c_str()) != -1) {
+        // S3 doesn't have a directory structure, so we technically wouldn't 
need to
+        // CREATE_DIR on S3. However, libhdfs always checks if a path exists 
before
+        // carrying out an operation on that path. So we still need to call 
CREATE_DIR
+        // before we access that path due to this limitation.
+        if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
           partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
         } else {
           // Otherwise just create the directory.
@@ -851,10 +863,11 @@ Status Coordinator::FinalizeSuccessfulInsert() {
         }
       }
     } else {
-      if (FLAGS_insert_inherit_permissions) {
-        PopulatePathPermissionCache(hdfs_connection, part_path, 
&permissions_cache);
+      if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+        PopulatePathPermissionCache(
+            partition_fs_connection, part_path, &permissions_cache);
       }
-      if (hdfsExists(hdfs_connection, part_path.c_str()) == -1) {
+      if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
         partition_create_ops.Add(CREATE_DIR, part_path);
       }
     }
@@ -868,18 +881,17 @@ Status Coordinator::FinalizeSuccessfulInsert() {
         // It's ok to ignore errors creating the directories, since they may 
already
         // exist. If there are permission errors, we'll run into them later.
         if (err.first->op() != CREATE_DIR) {
-          stringstream ss;
-          ss << "Error(s) deleting partition directories. First error (of "
-             << partition_create_ops.errors().size() << ") was: " << 
err.second;
-          return Status(ss.str());
+          return Status(Substitute(
+              "Error(s) deleting partition directories. First error (of $0) 
was: $1",
+              partition_create_ops.errors().size(), err.second));
         }
       }
     }
   }
 
   // 3. Move all tmp files
-  HdfsOperationSet move_ops(&hdfs_connection);
-  HdfsOperationSet dir_deletion_ops(&hdfs_connection);
+  HdfsOperationSet move_ops(&filesystem_connection_cache);
+  HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
 
   BOOST_FOREACH(FileMoveMap::value_type& move, files_to_move_) {
     // Empty destination means delete, so this is a directory. These get 
deleted in a
@@ -889,7 +901,11 @@ Status Coordinator::FinalizeSuccessfulInsert() {
       dir_deletion_ops.Add(DELETE, move.first);
     } else {
       VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
-      move_ops.Add(RENAME, move.first, move.second);
+      if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
+        move_ops.Add(RENAME, move.first, move.second);
+      } else {
+        move_ops.Add(MOVE, move.first, move.second);
+      }
     }
   }
 
@@ -917,9 +933,9 @@ Status Coordinator::FinalizeSuccessfulInsert() {
   }
 
   // 5. Optionally update the permissions of the created partition directories
-  // Do this last in case we make the dirs unwriteable.
+  // Do this last so that we don't make a dir unwritable before we write to it.
   if (FLAGS_insert_inherit_permissions) {
-    HdfsOperationSet chmod_ops(&hdfs_connection);
+    HdfsOperationSet chmod_ops(&filesystem_connection_cache);
     BOOST_FOREACH(const PermissionCache::value_type& perm, permissions_cache) {
       bool new_dir = perm.second.first;
       if (new_dir) {
@@ -1528,6 +1544,7 @@ Status Coordinator::UpdateFragmentExecStatus(const 
TReportExecStatusParams& para
       TInsertPartitionStatus* status = 
&(per_partition_status_[partition.first]);
       status->num_appended_rows += partition.second.num_appended_rows;
       status->id = partition.second.id;
+      status->partition_base_dir = partition.second.partition_base_dir;
       if (!status->__isset.stats) status->__set_stats(TInsertStats());
       DataSink::MergeInsertStats(partition.second.stats, &status->stats);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc 
b/be/src/runtime/disk-io-mgr-scan-range.cc
index 52a8e66..01ea4b5 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -299,7 +299,7 @@ void DiskIoMgr::ScanRange::Close() {
     if (hdfs_file_ == NULL) return;
 
     struct hdfsReadStatistics* stats;
-    if (IsDfsPath(file())) {
+    if (IsHdfsPath(file())) {
       int success = hdfsFileGetReadStatistics(hdfs_file_->file(), &stats);
       if (success == 0) {
         reader_->bytes_read_local_.Add(stats->totalLocalBytesRead);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 4788739..68745d9 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -1153,7 +1153,9 @@ Status DiskIoMgr::AddWriteRange(RequestContext* writer, 
WriteRange* write_range)
 int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) 
{
   // If it's a remote range, check for an appropriate remote disk queue.
   if (!expected_local) {
-    if (IsDfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) return 
RemoteDfsDiskId();
+    if (IsHdfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) {
+      return RemoteDfsDiskId();
+    }
     if (IsS3APath(file)) return RemoteS3DiskId();
   }
   // Assign to a local disk queue.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/runtime/hdfs-fs-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/hdfs-fs-cache.h b/be/src/runtime/hdfs-fs-cache.h
index a89ad0f..baf163f 100644
--- a/be/src/runtime/hdfs-fs-cache.h
+++ b/be/src/runtime/hdfs-fs-cache.h
@@ -20,7 +20,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/thread/mutex.hpp>
-#include <hdfs.h>
+#include "common/hdfs.h"
 
 #include "common/status.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 4b98ff5..ef20afda 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -134,3 +134,4 @@ ADD_BE_TEST(bitmap-test)
 ADD_BE_TEST(fixed-size-hash-table-test)
 ADD_BE_TEST(bloom-filter-test)
 ADD_BE_TEST(logging-support-test)
+ADD_BE_TEST(hdfs-util-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-bulk-ops.cc
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops.cc b/be/src/util/hdfs-bulk-ops.cc
index d1508fc..d4d94ed 100644
--- a/be/src/util/hdfs-bulk-ops.cc
+++ b/be/src/util/hdfs-bulk-ops.cc
@@ -33,7 +33,7 @@ HdfsOp::HdfsOp(HdfsOpType op, const std::string& src, 
HdfsOperationSet* op_set)
 
 HdfsOp::HdfsOp(HdfsOpType op, const std::string& src, const std::string& dst,
     HdfsOperationSet* op_set) : op_(op), src_(src), dst_(dst), op_set_(op_set) 
{
-  DCHECK(op == RENAME);
+  DCHECK(op == RENAME || op == MOVE);
   DCHECK(!src_.empty());
   DCHECK(!dst_.empty());
 }
@@ -48,64 +48,85 @@ HdfsOp::HdfsOp(HdfsOpType op, const string& src, short 
permissions,
 // Required for ThreadPool
 HdfsOp::HdfsOp() { }
 
+void HdfsOp::AddError(const string& error_msg) const {
+  stringstream ss;
+  ss << "Hdfs op (";
+  switch (op_) {
+    case DELETE:
+      ss << "DELETE " << src_;
+      break;
+    case CREATE_DIR:
+      ss << "CREATE_DIR " << src_;
+      break;
+    case RENAME:
+      ss << "RENAME " << src_ << " TO " << dst_;
+      break;
+    case MOVE:
+      ss << "MOVE " << src_ << " TO " << dst_;
+      break;
+    case DELETE_THEN_CREATE:
+      ss << "DELETE_THEN_CREATE " << src_;
+      break;
+    case CHMOD:
+      ss << "CHMOD " << src_ << " " << oct << permissions_;
+      break;
+  }
+  ss << ") failed, error was: " << error_msg;
+  op_set_->AddError(ss.str(), this);
+}
+
 void HdfsOp::Execute() const {
   if (op_set_->ShouldAbort()) return;
   int err = 0;
-  hdfsFS* hdfs_connection = op_set_->hdfs_connection();
+  hdfsFS src_connection;
+  Status connection_status = HdfsFsCache::instance()->GetConnection(src_, 
&src_connection,
+      op_set_->connection_cache());
+
+  if (!connection_status.ok()) {
+    AddError(connection_status.GetDetail());
+    op_set_->MarkOneOpDone();
+    return;
+  }
   switch (op_) {
     case DELETE:
-      err = hdfsDelete(*hdfs_connection, src_.c_str(), 1);
-      VLOG_FILE << "hdfsDelete() file=" << src_.c_str();
+      err = hdfsDelete(src_connection, src_.c_str(), 1);
+      VLOG_FILE << "hdfsDelete() file=" << src_;
       break;
     case CREATE_DIR:
-      err = hdfsCreateDirectory(*hdfs_connection, src_.c_str());
-      VLOG_FILE << "hdfsCreateDirectory() file=" << src_.c_str();
+      err = hdfsCreateDirectory(src_connection, src_.c_str());
+      VLOG_FILE << "hdfsCreateDirectory() file=" << src_;
       break;
     case RENAME:
-      err = hdfsRename(*hdfs_connection, src_.c_str(), dst_.c_str());
-      VLOG_FILE << "hdfsRename() src_file=" << src_.c_str()
-                << " dst_file=" << dst_.c_str();
+      err = hdfsRename(src_connection, src_.c_str(), dst_.c_str());
+      VLOG_FILE << "hdfsRename() src_file=" << src_ << " dst_file=" << dst_;
+      break;
+    case MOVE:
+      hdfsFS dst_connection;
+      connection_status = HdfsFsCache::instance()->GetConnection(dst_, 
&dst_connection,
+          op_set_->connection_cache());
+      if (!connection_status.ok()) break;
+      err = hdfsMove(src_connection, src_.c_str(), dst_connection, 
dst_.c_str());
+      VLOG_FILE << "hdfsMove() src_file=" << src_ << " dst_file=" << dst_;
       break;
     case DELETE_THEN_CREATE:
-      err = hdfsDelete(*hdfs_connection, src_.c_str(), 1);
-      VLOG_FILE << "hdfsDelete() file=" << src_.c_str();
+      err = hdfsDelete(src_connection, src_.c_str(), 1);
+      VLOG_FILE << "hdfsDelete() file=" << src_;
       if (err != -1) {
-        err = hdfsCreateDirectory(*hdfs_connection, src_.c_str());
-        VLOG_FILE << "hdfsCreateDirectory() file=" << src_.c_str();
+        err = hdfsCreateDirectory(src_connection, src_.c_str());
+        VLOG_FILE << "hdfsCreateDirectory() file=" << src_;
       }
       break;
     case CHMOD:
-      err = hdfsChmod(*hdfs_connection, src_.c_str(), permissions_);
-      VLOG_FILE << "hdfsChmod() file=" << src_.c_str();
+      err = hdfsChmod(src_connection, src_.c_str(), permissions_);
+      VLOG_FILE << "hdfsChmod() file=" << src_;
       break;
   }
 
-  if (err == -1) {
-    string error_msg = GetStrErrMsg();
-    stringstream ss;
-    ss << "Hdfs op (";
-    switch (op_) {
-      case DELETE:
-        ss << "DELETE " << src_;
-        break;
-      case CREATE_DIR:
-        ss << "CREATE_DIR " << src_;
-        break;
-      case RENAME:
-        ss << "RENAME " << src_ << " TO " << dst_;
-        break;
-      case DELETE_THEN_CREATE:
-        ss << "DELETE_THEN_CREATE " << src_;
-        break;
-      case CHMOD:
-        ss << "CHMOD " << src_ << " " << oct << permissions_;
-        break;
-    }
-    ss << ") failed, error was: " << error_msg;
-
-    op_set_->AddError(ss.str(), this);
+  if (err == -1 || !connection_status.ok()) {
+    string error_msg =
+        connection_status.ok() ? GetStrErrMsg() : 
connection_status.GetDetail();
+    AddError(error_msg);
   }
-
   op_set_->MarkOneOpDone();
 }
 
@@ -122,24 +143,25 @@ HdfsOpThreadPool* impala::CreateHdfsOpThreadPool(const 
string& name, uint32_t nu
       max_queue_length, &HdfsThreadPoolHelper);
 }
 
-HdfsOperationSet::HdfsOperationSet(hdfsFS* hdfs_connection)
-    : num_ops_(0L), hdfs_connection_(hdfs_connection) {
-}
+HdfsOperationSet::HdfsOperationSet(HdfsFsCache::HdfsFsMap* connection_cache)
+    : connection_cache_(connection_cache) { }
 
-bool HdfsOperationSet::Execute(ThreadPool<HdfsOp>* pool,
-    bool abort_on_error) {
+bool HdfsOperationSet::Execute(ThreadPool<HdfsOp>* pool, bool abort_on_error) {
   {
     lock_guard<mutex> l(errors_lock_);
     abort_on_error_ = abort_on_error;
   }
+
   int64_t num_ops = ops_.size();
   if (num_ops == 0) return true;
-  num_ops_.Add(num_ops);
 
+  ops_complete_barrier_.reset(new CountingBarrier(num_ops));
   BOOST_FOREACH(const HdfsOp& op, ops_) {
     pool->Offer(op);
   }
-  return promise_.Get();
+
+  ops_complete_barrier_->Wait();
+  return errors().size() == 0;
 }
 
 void HdfsOperationSet::Add(HdfsOpType op, const string& src) {
@@ -160,9 +182,7 @@ void HdfsOperationSet::AddError(const string& err, const 
HdfsOp* op) {
 }
 
 void HdfsOperationSet::MarkOneOpDone() {
-  if (num_ops_.Add(-1) == 0) {
-    promise_.Set(errors().size() == 0);
-  }
+  ops_complete_barrier_->Notify();
 }
 
 bool HdfsOperationSet::ShouldAbort() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-bulk-ops.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h
index 432029f..e66f0d1 100644
--- a/be/src/util/hdfs-bulk-ops.h
+++ b/be/src/util/hdfs-bulk-ops.h
@@ -22,6 +22,8 @@
 #include "common/atomic.h"
 #include "common/status.h"
 #include "util/thread-pool.h"
+#include "util/counting-barrier.h"
+#include "runtime/hdfs-fs-cache.h"
 
 namespace impala {
 
@@ -29,6 +31,7 @@ enum HdfsOpType {
   DELETE,
   CREATE_DIR,
   RENAME,
+  MOVE,
   DELETE_THEN_CREATE,
   CHMOD
 };
@@ -65,7 +68,7 @@ class HdfsOp {
   /// First operand
   std::string src_;
 
-  /// Second string operand, ignored except for RENAME
+  /// Second string operand, ignored except for RENAME and MOVE
   std::string dst_;
 
   /// Permission operand, ignored except for CHMOD
@@ -73,6 +76,9 @@ class HdfsOp {
 
   /// Containing operation set, used to record errors and to signal completion.
   HdfsOperationSet* op_set_;
+
+  /// Records an error if it happens during an operation.
+  void AddError(const string& error_msg) const;
 };
 
 typedef ThreadPool<HdfsOp> HdfsOpThreadPool;
@@ -86,16 +92,14 @@ HdfsOpThreadPool* CreateHdfsOpThreadPool(const std::string& 
name, uint32_t num_t
 /// added.
 class HdfsOperationSet {
  public:
-  /// Constructs a new operation set. The hdfsFS parameter is shared between 
all
-  /// operations, and is not owned by this class (and therefore should remain 
valid until
-  /// Execute returns).
-  HdfsOperationSet(hdfsFS* hdfs_connection);
+  /// Initializes an operation set. 'connection_cache' is not owned.
+  HdfsOperationSet(HdfsFsCache::HdfsFsMap* connection_cache);
 
   /// Add an operation that takes only a single 'src' parameter (e.g. DELETE, 
CREATE_DIR,
   /// DELETE_THEN_CREATE)
   void Add(HdfsOpType op, const std::string& src);
 
-  /// Add an operation that takes two parameters (e.g. RENAME)
+  /// Add an operation that takes two parameters (e.g. RENAME, MOVE)
   void Add(HdfsOpType op, const std::string& src, const std::string& dst);
 
   /// Add an operation that takes a permission argument (i.e. CHMOD)
@@ -103,7 +107,7 @@ class HdfsOperationSet {
 
   /// Run all operations on the given pool, blocking until all are complete. 
Returns false
   /// if there were any errors, true otherwise.
-  /// If abort_on_error is true, execution will finish after the first error 
seen.
+  /// If 'abort_on_error' is true, execution will finish after the first error 
seen.
   bool Execute(HdfsOpThreadPool* pool, bool abort_on_error);
 
   typedef std::pair<const HdfsOp*, std::string> Error;
@@ -113,22 +117,19 @@ class HdfsOperationSet {
   /// until Execute has returned.
   const Errors& errors() { return errors_; }
 
-  hdfsFS* hdfs_connection() const { return hdfs_connection_; }
+  HdfsFsCache::HdfsFsMap* connection_cache() { return connection_cache_; }
 
  private:
   /// The set of operations to be submitted to HDFS
   std::vector<HdfsOp> ops_;
 
-  /// Used to coordinate between the executing threads and the caller; Execute 
blocks until
-  /// this is signalled.
-  Promise<bool> promise_;
-
-  /// The number of ops remaining to be executed. Used to coordinate between 
executor
-  /// threads so that when all ops are finished, promise_ is signalled.
-  AtomicInt64 num_ops_;
+  /// Used to coordinate between the executing threads and the caller. This is 
initialized
+  /// with the number of operations to be executed. Unblocks once all the 
operations in
+  /// the set are complete.
+  boost::scoped_ptr<CountingBarrier> ops_complete_barrier_;
 
-  /// HDFS connection shared between all operations. Not owned by this class.
-  hdfsFS* hdfs_connection_;
+  /// A connection cache used by this operation set. Not owned.
+  HdfsFsCache::HdfsFsMap* connection_cache_;
 
   /// Protects errors_ and abort_on_error_ during Execute
   boost::mutex errors_lock_;
@@ -141,8 +142,8 @@ class HdfsOperationSet {
 
   friend class HdfsOp;
 
-  /// Called by HdfsOp to signal its completion. When the last op has 
finished, this method
-  /// signals Execute() so that it can return.
+  /// Called by HdfsOp to signal its completion. When the last op has 
finished, this
+  /// method signals Execute() so that it can return.
   void MarkOneOpDone();
 
   /// Called by HdfsOp to record an error

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-util-test.cc b/be/src/util/hdfs-util-test.cc
new file mode 100644
index 0000000..73ff4bf
--- /dev/null
+++ b/be/src/util/hdfs-util-test.cc
@@ -0,0 +1,78 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "hdfs-util.h"
+
+#include "testutil/gtest-util.h"
+#include "common/init.h"
+#include "common/logging.h"
+#include "util/test-info.h"
+#include "runtime/exec-env.h"
+#include "service/fe-support.h"
+
+using namespace impala;
+
+DECLARE_bool(enable_webserver);
+
+TEST(HdfsUtilTest, CheckFilesystemsMatch) {
+  // We do not want to start the webserver.
+  FLAGS_enable_webserver = false;
+  ExecEnv* exec_env = new ExecEnv();
+
+  // We do this to retrieve the default FS from the frontend.
+  exec_env->StartServices();
+
+  // Tests with both paths qualified.
+  EXPECT_TRUE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
+                               "s3a://dummybucket/temp_dir_2/temp_path_2"));
+  EXPECT_FALSE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
+                                "s3a://dummybucket_2/temp_dir_2/temp_path_2"));
+  EXPECT_FALSE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
+                                "hdfs://namenode/temp_dir2/temp_path_2"));
+  EXPECT_FALSE(FilesystemsMatch("hdfs://namenode/temp_dir/temp_path",
+                                "hdfs://namenode_2/temp_dir2/temp_path_2"));
+  EXPECT_TRUE(FilesystemsMatch("hdfs://namenode:9999/temp_dir/temp_path",
+                                "hdfs://namenode:9999/temp_dir2/temp_path_2"));
+  EXPECT_FALSE(FilesystemsMatch("hdfs://namenode:9999/temp_dir/temp_path",
+                                "hdfs://namenode:8888/temp_dir2/temp_path_2"));
+  EXPECT_TRUE(FilesystemsMatch("file:/path/to/dir/filename.parq",
+                               "file:///path/to/dir/filename.parq"));
+  EXPECT_TRUE(FilesystemsMatch("file:/path/to/dir/filename.parq",
+                               "file:/path_2/to/dir/filename.parq"));
+  EXPECT_TRUE(FilesystemsMatch("file:///path/to/dir/filename.parq",
+                               "file:/path_2/to/dir/filename.parq"));
+  EXPECT_FALSE(FilesystemsMatch("file:/path/to/dir/filename.parq",
+                                "file2:/path/to/dir/filename.parq"));
+  EXPECT_FALSE(FilesystemsMatch("hdfs://", 
"s3a://dummybucket/temp_dir/temp_path"));
+  EXPECT_TRUE(FilesystemsMatch("hdfs://namenode", "hdfs://namenode/"));
+
+  // Tests with both paths paths unqualified.
+  EXPECT_TRUE(FilesystemsMatch("tempdir/temppath", "tempdir2/temppath2"));
+
+  // Tests with one path qualified and the other unqualified.
+  const char* default_fs = exec_env->default_fs().c_str();
+  EXPECT_TRUE(FilesystemsMatch(default_fs, "temp_dir/temp_path"));
+  EXPECT_TRUE(FilesystemsMatch("temp_dir/temp_path", default_fs));
+  EXPECT_FALSE(FilesystemsMatch("badscheme://namenode/temp_dir/temp_path",
+                                "temp_dir/temp_path"));
+  EXPECT_FALSE(FilesystemsMatch("badscheme://namenode:1234/temp_dir/temp_path",
+                                "temp_dir/temp_path"));
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
+  InitFeSupport();
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc
index 6c1b57a..836fa12 100644
--- a/be/src/util/hdfs-util.cc
+++ b/be/src/util/hdfs-util.cc
@@ -64,7 +64,7 @@ Status CopyHdfsFile(const hdfsFS& src_conn, const string& 
src_path,
   return Status::OK();
 }
 
-bool IsDfsPath(const char* path) {
+bool IsHdfsPath(const char* path) {
   if (strstr(path, ":/") == NULL) {
     return ExecEnv::GetInstance()->default_fs().compare(0, 7, "hdfs://") == 0;
   }
@@ -78,4 +78,45 @@ bool IsS3APath(const char* path) {
   return strncmp(path, "s3a://", 6) == 0;
 }
 
+// Returns the length of the filesystem name in 'path' which is the length of 
the
+// 'scheme://authority'. Returns 0 if the path is unqualified.
+static int GetFilesystemNameLength(const char* path) {
+  // Special case for "file:/". It will not have an authority following it.
+  if (strncmp(path, "file:", 5) == 0) return 5;
+
+  const char* after_scheme = strstr(path, "://");
+  if (after_scheme == NULL) return 0;
+  // Some paths may come only with a scheme. We add 3 to skip over "://".
+  if (*(after_scheme + 3) == '\0') return strlen(path);
+
+  const char* after_authority = strstr(after_scheme + 3, "/");
+  if (after_authority == NULL) return strlen(path);
+  return after_authority - path;
+}
+
+bool FilesystemsMatch(const char* path_a, const char* path_b) {
+  int fs_a_name_length = GetFilesystemNameLength(path_a);
+  int fs_b_name_length = GetFilesystemNameLength(path_b);
+
+  const char* default_fs = ExecEnv::GetInstance()->default_fs().c_str();
+  int default_fs_name_length = GetFilesystemNameLength(default_fs);
+
+  // Neither is fully qualified: both are on default_fs.
+  if (fs_a_name_length == 0 && fs_b_name_length == 0) return true;
+  // One is a relative path: check fully-qualified one against default_fs.
+  if (fs_a_name_length == 0) {
+    DCHECK_GT(fs_b_name_length, 0);
+    return strncmp(path_b, default_fs, default_fs_name_length) == 0;
+  }
+  if (fs_b_name_length == 0) {
+    DCHECK_GT(fs_a_name_length, 0);
+    return strncmp(path_a, default_fs, default_fs_name_length) == 0;
+  }
+  DCHECK_GT(fs_a_name_length, 0);
+  DCHECK_GT(fs_b_name_length, 0);
+  // Both fully qualified: check the filesystem prefix.
+  if (fs_a_name_length != fs_b_name_length) return false;
+  return strncmp(path_a, path_b, fs_a_name_length) == 0;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h
index 3a3d6b4..0a6b968 100644
--- a/be/src/util/hdfs-util.h
+++ b/be/src/util/hdfs-util.h
@@ -42,10 +42,12 @@ Status CopyHdfsFile(const hdfsFS& src_conn, const 
std::string& src_path,
                     const hdfsFS& dst_conn, const std::string& dst_path);
 
 /// Returns true iff the path refers to a location on an HDFS filesystem.
-bool IsDfsPath(const char* path);
+bool IsHdfsPath(const char* path);
 
 /// Returns true iff the path refers to a location on an S3A filesystem.
 bool IsS3APath(const char* path);
 
+/// Returns true iff 'pathA' and 'pathB' are on the same filesystem.
+bool FilesystemsMatch(const char* pathA, const char* pathB);
 }
 #endif // IMPALA_UTIL_HDFS_UTIL_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index aa2901e..1fcce1b 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -396,6 +396,9 @@ struct TInsertPartitionStatus {
 
   // Detailed statistics gathered by table writers for this partition
   3: optional TInsertStats stats
+
+  // Fully qualified URI to the base directory for this partition.
+  4: required string partition_base_dir
 }
 
 // The results of an INSERT query, sent to the coordinator as part of

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java 
b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java
index 5eb9a26..dc859c1 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java
@@ -373,17 +373,13 @@ public class InsertStmt extends StatementBase {
             "(%s) because Impala does not have WRITE access to at least one 
HDFS path" +
             ": %s", targetTableName_, 
hdfsTable.getFirstLocationWithoutWriteAccess()));
       }
-      if (hdfsTable.spansMultipleFileSystems()) {
-        throw new AnalysisException(String.format("Unable to INSERT into 
target table " +
-            "(%s) because the table spans multiple filesystems.", 
targetTableName_));
-      }
       StringBuilder error = new StringBuilder();
       hdfsTable.parseSkipHeaderLineCount(error);
       if (error.length() > 0) throw new AnalysisException(error.toString());
       try {
         if 
(!FileSystemUtil.isImpalaWritableFilesystem(hdfsTable.getLocation())) {
           throw new AnalysisException(String.format("Unable to INSERT into 
target " +
-              "table (%s) because %s is not an HDFS filesystem.", 
targetTableName_,
+              "table (%s) because %s is not a supported filesystem.", 
targetTableName_,
               hdfsTable.getLocation()));
         }
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java 
b/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java
index 49966f8..ecc7279 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 import com.cloudera.impala.authorization.Privilege;
@@ -133,9 +134,9 @@ public class LoadDataStmt extends StatementBase {
     try {
       Path source = sourceDataPath_.getPath();
       FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration());
-      if (!(fs instanceof DistributedFileSystem)) {
+      if (!(fs instanceof DistributedFileSystem) && !(fs instanceof 
S3AFileSystem)) {
         throw new AnalysisException(String.format("INPATH location '%s' " +
-            "must point to an HDFS filesystem.", sourceDataPath_));
+            "must point to an HDFS or S3A filesystem.", sourceDataPath_));
       }
       if (!fs.exists(source)) {
         throw new AnalysisException(String.format(
@@ -154,7 +155,8 @@ public class LoadDataStmt extends StatementBase {
         }
         if (FileSystemUtil.containsVisibleSubdirectory(source)) {
           throw new AnalysisException(String.format(
-              "INPATH location '%s' cannot contain non-hidden 
subdirectories.", sourceDataPath_));
+              "INPATH location '%s' cannot contain non-hidden subdirectories.",
+              sourceDataPath_));
         }
         if (!checker.getPermissions(fs, source).checkPermissions(
             FsAction.READ_WRITE)) {
@@ -206,16 +208,6 @@ public class LoadDataStmt extends StatementBase {
       }
       Preconditions.checkNotNull(partition);
 
-      // Until Frontend.loadTableData() can handle cross-filesystem and 
filesystems
-      // that aren't HDFS, require that source and dest are on the same HDFS.
-      if (!FileSystemUtil.isPathOnFileSystem(new Path(location), fs)) {
-        throw new AnalysisException(String.format(
-            "Unable to LOAD DATA into target table (%s) because source path 
(%s) and " +
-            "destination %s (%s) are on different filesystems.",
-            hdfsTable.getFullName(),
-            source, partitionSpec_ == null ? "table" : "partition",
-            partition.getLocation()));
-      }
       // Verify the files being loaded are supported.
       for (FileStatus fStatus: fs.listStatus(source)) {
         if (fs.isDirectory(fStatus.getPath())) continue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java 
b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java
index 969fe55..1f3da2e 100644
--- a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java
@@ -81,10 +81,14 @@ public class FileSystemUtil {
   }
 
   /**
-   * Returns true if path p1 and path p2 are in the same encryption zone.
+   * Returns true if path p1 and path p2 are in the same encryption zone in 
HDFS.
+   * Returns false if they are in different encryption zones or if either of 
the paths
+   * are not on HDFS.
    */
-  private static boolean arePathsInSameEncryptionZone(FileSystem fs, Path p1,
+  private static boolean arePathsInSameHdfsEncryptionZone(FileSystem fs, Path 
p1,
       Path p2) throws IOException {
+    // Only distributed file systems have encryption zones.
+    if (!isDistributedFileSystem(p1) || !isDistributedFileSystem(p2)) return 
false;
     HdfsAdmin hdfsAdmin = new HdfsAdmin(fs.getUri(), CONF);
     EncryptionZone z1 = hdfsAdmin.getEncryptionZoneForPath(p1);
     EncryptionZone z2 = hdfsAdmin.getEncryptionZoneForPath(p2);
@@ -104,9 +108,10 @@ public class FileSystemUtil {
    */
   public static int relocateAllVisibleFiles(Path sourceDir, Path destDir)
       throws IOException {
-    FileSystem fs = destDir.getFileSystem(CONF);
-    Preconditions.checkState(fs.isDirectory(destDir));
-    Preconditions.checkState(fs.isDirectory(sourceDir));
+    FileSystem destFs = destDir.getFileSystem(CONF);
+    FileSystem sourceFs = sourceDir.getFileSystem(CONF);
+    Preconditions.checkState(destFs.isDirectory(destDir));
+    Preconditions.checkState(sourceFs.isDirectory(sourceDir));
 
     // Use the same UUID to resolve all file name conflicts. This helps 
mitigate problems
     // that might happen if there is a conflict moving a set of files that have
@@ -115,7 +120,7 @@ public class FileSystemUtil {
 
     // Enumerate all the files in the source
     int numFilesMoved = 0;
-    for (FileStatus fStatus: fs.listStatus(sourceDir)) {
+    for (FileStatus fStatus: sourceFs.listStatus(sourceDir)) {
       if (fStatus.isDirectory()) {
         LOG.debug("Skipping copy of directory: " + fStatus.getPath());
         continue;
@@ -124,7 +129,7 @@ public class FileSystemUtil {
       }
 
       Path destFile = new Path(destDir, fStatus.getPath().getName());
-      if (fs.exists(destFile)) {
+      if (destFs.exists(destFile)) {
         destFile = new Path(destDir,
             appendToBaseFileName(destFile.getName(), uuid.toString()));
       }
@@ -136,44 +141,65 @@ public class FileSystemUtil {
 
   /**
    * Relocates the given file to a new location (either another directory or a
-   * file. The file is moved (renamed) to the new location unless the source 
and
-   * destination are in different encryption zones, in which case the file is 
copied
-   * so that the file can be decrypted and/or encrypted. If 
renameIfAlreadyExists
-   * is true, no error will be thrown if a file with the same name already 
exists in the
+   * file in the same or different filesystem). The file is generally moved 
(renamed) to
+   * the new location. However, the file is copied if the source and 
destination are in
+   * different encryption zones so that the file can be decrypted and/or 
encrypted, or if
+   * the source and destination are in different filesystems. If 
renameIfAlreadyExists is
+   * true, no error will be thrown if a file with the same name already exists 
in the
    * destination location. Instead, a UUID will be appended to the base file 
name,
-   * preserving the the existing file extension. If renameIfAlreadyExists is 
false, an
+   * preserving the existing file extension. If renameIfAlreadyExists is 
false, an
    * IOException will be thrown if there is a file name conflict.
    */
   public static void relocateFile(Path sourceFile, Path dest,
       boolean renameIfAlreadyExists) throws IOException {
-    FileSystem fs = dest.getFileSystem(CONF);
-    // TODO: Handle moving between file systems
-    Preconditions.checkArgument(isPathOnFileSystem(sourceFile, fs));
+    FileSystem destFs = dest.getFileSystem(CONF);
+    FileSystem sourceFs = sourceFile.getFileSystem(CONF);
 
-    Path destFile = fs.isDirectory(dest) ? new Path(dest, 
sourceFile.getName()) : dest;
+    Path destFile =
+        destFs.isDirectory(dest) ? new Path(dest, sourceFile.getName()) : dest;
     // If a file with the same name does not already exist in the destination 
location
     // then use the same file name. Otherwise, generate a unique file name.
-    if (renameIfAlreadyExists && fs.exists(destFile)) {
-      Path destDir = fs.isDirectory(dest) ? dest : dest.getParent();
+    if (renameIfAlreadyExists && destFs.exists(destFile)) {
+      Path destDir = destFs.isDirectory(dest) ? dest : dest.getParent();
       destFile = new Path(destDir,
           appendToBaseFileName(destFile.getName(), 
UUID.randomUUID().toString()));
     }
-
-    if (arePathsInSameEncryptionZone(fs, sourceFile, destFile)) {
+    boolean sameFileSystem = isPathOnFileSystem(sourceFile, destFs);
+    boolean destIsDfs = isDistributedFileSystem(destFs);
+
+    // If the source and the destination are on different file systems, or in 
different
+    // encryption zones, files can't be moved from one location to the other 
and must be
+    // copied instead.
+    boolean sameEncryptionZone =
+        arePathsInSameHdfsEncryptionZone(destFs, sourceFile, destFile);
+    // We can do a rename if the src and dst are in the same encryption zone 
in the same
+    // distributed filesystem.
+    boolean doRename = destIsDfs && sameFileSystem && sameEncryptionZone;
+    // Alternatively, we can do a rename if the src and dst are on the same
+    // non-distributed filesystem.
+    if (!doRename) doRename = !destIsDfs && sameFileSystem;
+    if (doRename) {
       LOG.debug(String.format(
           "Moving '%s' to '%s'", sourceFile.toString(), destFile.toString()));
       // Move (rename) the file.
-      fs.rename(sourceFile, destFile);
-    } else {
-      // We must copy rather than move if the source and dest are in different 
encryption
-      // zones. A move would return an error from the NN because a move is a 
metadata-only
-      // operation and the files would not be encrypted/decrypted properly on 
the DNs.
+      destFs.rename(sourceFile, destFile);
+      return;
+    }
+    if (destIsDfs && sameFileSystem) {
+      Preconditions.checkState(!doRename);
+      // We must copy rather than move if the source and dest are in different
+      // encryption zones. A move would return an error from the NN because a 
move is a
+      // metadata-only operation and the files would not be 
encrypted/decrypted properly
+      // on the DNs.
       LOG.info(String.format(
-          "Copying source '%s' to '%s' because HDFS encryption zones are 
different",
+          "Copying source '%s' to '%s' because HDFS encryption zones are 
different.",
+          sourceFile, destFile));
+    } else {
+      Preconditions.checkState(!sameFileSystem);
+      LOG.info(String.format("Copying '%s' to '%s' between filesystems.",
           sourceFile, destFile));
-      FileUtil.copy(sourceFile.getFileSystem(CONF), sourceFile, fs, destFile,
-          true, true, CONF);
     }
+    FileUtil.copy(sourceFs, sourceFile, destFs, destFile, true, true, CONF);
   }
 
   /**
@@ -256,6 +282,20 @@ public class FileSystemUtil {
   }
 
   /**
+   * Returns true iff the filesystem is a S3AFileSystem.
+   */
+  public static boolean isS3AFileSystem(FileSystem fs) {
+    return fs instanceof S3AFileSystem;
+  }
+
+  /**
+   * Returns true iff the path is on a S3AFileSystem.
+   */
+  public static boolean isS3AFileSystem(Path path) throws IOException {
+    return isS3AFileSystem(path.getFileSystem(CONF));
+  }
+
+  /**
    * Returns true iff the filesystem is an instance of LocalFileSystem.
    */
   public static boolean isLocalFileSystem(FileSystem fs) {
@@ -378,6 +418,6 @@ public class FileSystemUtil {
       throws IOException {
     Path path = new Path(location);
     return (FileSystemUtil.isDistributedFileSystem(path) ||
-        FileSystemUtil.isLocalFileSystem(path));
+        FileSystemUtil.isLocalFileSystem(path) || 
FileSystemUtil.isS3AFileSystem(path));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/main/java/com/cloudera/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java 
b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
index d6aa517..4d21895 100644
--- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java
+++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
@@ -542,15 +542,16 @@ public class Frontend {
     }
 
     Path destPath = new Path(destPathString);
-    FileSystem fs = destPath.getFileSystem(FileSystemUtil.getConfiguration());
+    Path sourcePath = new Path(request.source_path);
+    FileSystem destFs = 
destPath.getFileSystem(FileSystemUtil.getConfiguration());
+    FileSystem sourceFs = 
sourcePath.getFileSystem(FileSystemUtil.getConfiguration());
 
     // Create a temporary directory within the final destination directory to 
stage the
     // file move.
     Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath);
 
-    Path sourcePath = new Path(request.source_path);
     int filesLoaded = 0;
-    if (fs.isDirectory(sourcePath)) {
+    if (sourceFs.isDirectory(sourcePath)) {
       filesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath, 
tmpDestPath);
     } else {
       FileSystemUtil.relocateFile(sourcePath, tmpDestPath, true);
@@ -565,7 +566,7 @@ public class Frontend {
     // Move the files from the temporary location to the final destination.
     FileSystemUtil.relocateAllVisibleFiles(tmpDestPath, destPath);
     // Cleanup the tmp directory.
-    fs.delete(tmpDestPath, true);
+    destFs.delete(tmpDestPath, true);
     TLoadDataResp response = new TLoadDataResp();
     TColumnValue col = new TColumnValue();
     String loadMsg = String.format(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
index 60de666..08d86a6 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
@@ -2873,15 +2873,15 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
           "/test-warehouse/tpch.lineitem/", overwrite),
           "Table does not exist: functional.notbl");
 
-      // Source must be HDFS.
+      // Source must be HDFS or S3A.
       AnalysisError(String.format("load data inpath '%s' %s into table " +
           "tpch.lineitem", "file:///test-warehouse/test.out", overwrite),
           "INPATH location 'file:/test-warehouse/test.out' must point to an " +
-          "HDFS filesystem");
+          "HDFS or S3A filesystem");
       AnalysisError(String.format("load data inpath '%s' %s into table " +
           "tpch.lineitem", "s3n://bucket/test-warehouse/test.out", overwrite),
           "INPATH location 's3n://bucket/test-warehouse/test.out' must point 
to an " +
-          "HDFS filesystem");
+          "HDFS or S3A filesystem");
 
       // File type / table type mismatch.
       AnalyzesOk(String.format("load data inpath '%s' %s into table " +

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java 
b/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java
index 62f3cc8..74fe1a8 100644
--- a/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java
+++ b/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java
@@ -49,7 +49,7 @@ public class S3PlannerTest extends PlannerTestBase {
     String targetFs = System.getenv("TARGET_FILESYSTEM");
     // Skip if the config property was not set. i.e. not running against S3.
     assumeTrue(targetFs != null && targetFs.equals("s3"));
-    String fsNameStr = System.getenv("FILESYSTEM_PREFIX");
+    String fsNameStr = System.getenv("DEFAULT_FS");
     fsName = new Path(fsNameStr);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt 
b/infra/python/deps/requirements.txt
index 3a3b6da..9ed842c 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -7,6 +7,13 @@
 
 allpairs == 2.0.1
 argparse == 1.4.0
+boto3 == 1.2.3
+  simplejson == 3.3.0 # For python version 2.6
+  botocore == 1.3.30
+  python_dateutil == 2.5.2
+  docutils == 0.12
+  jmespath == 0.9.0
+  futures == 3.0.5
 cm-api == 10.0.0
   # Already available as part of python on Linux.
   readline == 6.2.4.1; sys_platform == 'darwin'
@@ -45,6 +52,5 @@ requests == 2.7.0
 sh == 1.11
 sqlparse == 0.1.15
 texttable == 0.8.3
-
 # For dev purposes, not used in scripting. Version 1.2.1 is the latest that 
supports 2.6.
 ipython == 1.2.1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test 
b/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test
index 7b8171c..dfdb2ed 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test
@@ -1,6 +1,7 @@
 ====
 ---- QUERY
-create database insert_permutation_test
+create database insert_permutation_test location
+'$FILESYSTEM_PREFIX/test-warehouse/insert_permutation_test'
 ---- RESULTS
 ====
 ---- QUERY
@@ -332,4 +333,4 @@ select * from perm_nopart
 1,'NULL',NULL
 ---- TYPES
 INT,STRING,INT
-====
\ No newline at end of file
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test
 
b/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test
index 5a1582b..8b758fa 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test
@@ -30,12 +30,6 @@ show table stats tinytable_like
 BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
 ====
 ---- QUERY
-# Verify INSERT into non-hdfs table is disallowed.
-insert into tinytable_like select * from functional.tinytable
----- CATCH
-Unable to INSERT into target table (multi_fs_db.tinytable_like) because 
$NAMENODE/test-warehouse/tinytable is not an HDFS filesystem.
-====
----- QUERY
 create external table tinytable_copy (a string, b string)
 row format delimited fields terminated by ','
 location '$FILESYSTEM_PREFIX/test-warehouse/tinytable'
@@ -101,13 +95,6 @@ select * from alltypes where id%100=0 order by id
 INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
 ====
 ---- QUERY
-# Verify INSERT into table with partitions on a different filesystem than the 
table's
-# base directory is disallowed.
-insert into alltypes select * from functional.alltypes
----- CATCH
-Unable to INSERT into target table (multi_fs_db.alltypes) because the table 
spans multiple filesystems.
-====
----- QUERY
 # Verify DROP PARTITION for non-default filesystem.
 alter table alltypes drop partition(year=2010, month=1)
 ---- RESULTS
@@ -242,12 +229,6 @@ select * from alltypes where id%100=0 order by id
 INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
 ====
 ---- QUERY
-# Verify INSERT into table with partitions on a different filesystems is 
disallowed.
-insert into alltypes select * from functional.alltypes
----- CATCH
-Unable to INSERT into target table (multi_fs_db.alltypes) because the table 
spans multiple filesystems.
-====
----- QUERY
 compute stats alltypes
 ---- RESULTS
 'Updated 4 partition(s) and 11 column(s).'
@@ -303,3 +284,47 @@ order by id
 ---- TYPES
 INT, STRING
 ====
+---- QUERY
+create table alltypes_multipart_insert like functional_parquet.alltypes
+---- RESULTS
+====
+---- QUERY
+# ADD PARTITION on a non-default filesystem.
+alter table alltypes_multipart_insert add partition(year=2009, month=1)
+location 
'$FILESYSTEM_PREFIX/test-warehouse/alltypes_multipart_insert/year=2009/month=1'
+---- RESULTS
+====
+---- QUERY
+insert into alltypes_multipart_insert partition (year=2009, month=1)
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
+float_col, double_col, date_string_col, string_col, timestamp_col
+from functional.alltypes where year = 2009 and month = 1
+---- RESULTS
+year=2009/month=1/: 310
+====
+---- QUERY
+# ADD PARTITION on the default filesystem.
+alter table alltypes_multipart_insert add partition(year=2009, month=2)
+location '/test-warehouse/alltypes_multipart_insert/year=2009/month=2'
+---- RESULTS
+====
+---- QUERY
+insert into alltypes_multipart_insert partition (year=2009, month=2)
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
+float_col, double_col, date_string_col, string_col, timestamp_col
+from functional.alltypes where year = 2009 and month = 2
+---- RESULTS
+year=2009/month=2/: 280
+====
+---- QUERY
+select * from alltypes_multipart_insert where id%100=0 order by id;
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+100,true,0,0,0,0,0,0,'01/11/09','0',2009-01-11 01:40:04.500000000,2009,1
+200,true,0,0,0,0,0,0,'01/21/09','0',2009-01-21 03:20:09,2009,1
+300,true,0,0,0,0,0,0,'01/31/09','0',2009-01-31 05:00:13.500000000,2009,1
+400,true,0,0,0,0,0,0,'02/10/09','0',2009-02-10 01:30:04.500000000,2009,2
+500,true,0,0,0,0,0,0,'02/20/09','0',2009-02-20 03:10:08.550000000,2009,2
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test 
b/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test
index 8382d59..19480ff 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test
@@ -1,7 +1,7 @@
 ====
 ---- QUERY
 # First create a partitioned table
-create table t1 like functional.alltypes;
+create table t1 like functional.alltypes location 
'$FILESYSTEM_PREFIX/test-warehouse/t1';
 insert into t1 partition(year, month) select * from functional.alltypes;
 compute incremental stats t1;
 show table stats t1;
@@ -117,7 +117,7 @@ STRING, STRING, BIGINT, BIGINT, INT, DOUBLE
 ====
 ---- QUERY
 # Create an unpartitioned table.
-create table t2 like functional.tinytable;
+create table t2 like functional.tinytable location 
'$FILESYSTEM_PREFIX/test-warehouse/t2';
 insert into t2 select * from functional.tinytable;
 compute incremental stats t2;
 show table stats t2;
@@ -167,7 +167,7 @@ truncate table if exists non_existent;
 ====
 ---- QUERY
 # Create an unpartitioned table.
-create table t3 like functional.tinytable;
+create table t3 like functional.tinytable location 
'$FILESYSTEM_PREFIX/test-warehouse/t3';
 insert into t3 select * from functional.tinytable;
 select count(*) from t3;
 ---- RESULTS
@@ -187,4 +187,4 @@ select count(*) from t3;
 0
 ---- TYPES
 BIGINT
-====
\ No newline at end of file
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/testdata/workloads/tpch/queries/insert_parquet.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/queries/insert_parquet.test 
b/testdata/workloads/tpch/queries/insert_parquet.test
index 55f84e3..35b7cde 100644
--- a/testdata/workloads/tpch/queries/insert_parquet.test
+++ b/testdata/workloads/tpch/queries/insert_parquet.test
@@ -1,7 +1,8 @@
 ====
 ---- QUERY
 # Tests using a larger table.
-create table if not exists orders_insert_test like orders;
+create table if not exists orders_insert_test like orders location
+'$FILESYSTEM_PREFIX/test-warehouse/orders_insert_table';
 insert overwrite table orders_insert_test select * from tpch.orders
 ---- RESULTS
 : 1500000
@@ -55,7 +56,7 @@ bigint
 ---- QUERY
 # Test to verify that huge (larger than 64k) values can be written, see 
IMPALA-1705
 create table if not exists test_insert_huge_vals (s string) stored as parquet
-location '$FILESYSTEM_PREFIX/test_insert_huge_vals';
+location '$FILESYSTEM_PREFIX/test-warehouse/test_insert_huge_vals';
 insert overwrite table test_insert_huge_vals
   select cast(l_orderkey as string) from tpch.lineitem
   union select group_concat(concat(s_name, s_address, s_phone)) from 
tpch.supplier

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 9ef70fe..0ed587b 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -40,6 +40,8 @@ from tests.performance.query import Query
 from tests.performance.query_executor import JdbcQueryExecConfig
 from tests.performance.query_exec_functions import execute_using_jdbc
 from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, 
get_hdfs_client_from_conf
+from tests.util.s3_util import S3Client
+from tests.util.filesystem_utils import IS_S3, S3_BUCKET_NAME
 
 # Imports required for Hive Metastore Client
 from hive_metastore import ThriftHiveMetastore
@@ -115,6 +117,7 @@ class ImpalaTestSuite(BaseTestSuite):
 
     cls.impalad_test_service = cls.create_impala_service()
     cls.hdfs_client = cls.create_hdfs_client()
+    cls.filesystem_client = S3Client(S3_BUCKET_NAME) if IS_S3 else 
cls.hdfs_client
 
   @classmethod
   def teardown_class(cls):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 99d7e2d..3c4fe27 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -27,17 +27,19 @@ from tests.util.filesystem_utils import IS_DEFAULT_FS, 
IS_S3, IS_ISILON, IS_LOCA
 class SkipIfS3:
 
   # These ones are skipped due to product limitations.
-  insert = pytest.mark.skipif(IS_S3, reason="INSERT not implemented for S3")
-  load_data = pytest.mark.skipif(IS_S3, reason="LOAD DATA not implemented for 
S3")
   caching = pytest.mark.skipif(IS_S3, reason="SET CACHED not implemented for 
S3")
   hive = pytest.mark.skipif(IS_S3, reason="Hive doesn't work with S3")
   hdfs_block_size = pytest.mark.skipif(IS_S3, reason="S3 uses it's own block 
size")
+  hdfs_acls = pytest.mark.skipif(IS_S3, reason="HDFS acls are not supported on 
S3")
   jira = partial(pytest.mark.skipif, IS_S3)
+  hdfs_encryption = pytest.mark.skipif(IS_S3,
+      reason="HDFS encryption is not supported with S3")
+  hdfs_purge = pytest.mark.skipif(IS_S3,
+      reason="PURGE has no effect on S3")
 
   # These ones need test infra work to re-enable.
   udfs = pytest.mark.skipif(IS_S3, reason="udas/udfs not copied to S3")
   datasrc = pytest.mark.skipif(IS_S3, reason="data sources not copied to S3")
-  hdfs_client = pytest.mark.skipif(IS_S3, reason="hdfs_client doesn't work 
with S3")
   hbase = pytest.mark.skipif(IS_S3, reason="HBase not started with S3")
   qualified_path = pytest.mark.skipif(IS_S3,
       reason="Tests rely on HDFS qualified paths, IMPALA-1872")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/custom_cluster/test_insert_behaviour.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_insert_behaviour.py 
b/tests/custom_cluster/test_insert_behaviour.py
index 03d5188..73acb6c 100644
--- a/tests/custom_cluster/test_insert_behaviour.py
+++ b/tests/custom_cluster/test_insert_behaviour.py
@@ -21,7 +21,7 @@ from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, 
get_hdfs_client_fr
 
 TEST_TBL = "insert_inherit_permission"
 
[email protected]
[email protected]_acls
 class TestInsertBehaviourCustomCluster(CustomClusterTestSuite):
 
   @classmethod

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/custom_cluster/test_parquet_max_page_header.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_parquet_max_page_header.py 
b/tests/custom_cluster/test_parquet_max_page_header.py
index c1210ff..8eaf64b 100644
--- a/tests/custom_cluster/test_parquet_max_page_header.py
+++ b/tests/custom_cluster/test_parquet_max_page_header.py
@@ -90,7 +90,6 @@ class TestParquetMaxPageHeader(CustomClusterTestSuite):
     put.wait()
 
   @SkipIfS3.hive
-  @SkipIfS3.insert
   @SkipIfIsilon.hive
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("-max_page_header_size=31457280")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/data_errors/test_data_errors.py
----------------------------------------------------------------------
diff --git a/tests/data_errors/test_data_errors.py 
b/tests/data_errors/test_data_errors.py
index 145ab94..019c771 100644
--- a/tests/data_errors/test_data_errors.py
+++ b/tests/data_errors/test_data_errors.py
@@ -96,7 +96,6 @@ class TestHBaseDataErrors(TestDataErrors):
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('DataErrorsTest/hbase-insert-errors', vector)
 
[email protected]
 class TestTimestampErrors(TestDataErrors):
   """
   Create test table with various valid/invalid timestamp values, then run

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_col_stats.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_col_stats.py b/tests/metadata/test_col_stats.py
index f3440ae..447861a 100644
--- a/tests/metadata/test_col_stats.py
+++ b/tests/metadata/test_col_stats.py
@@ -25,7 +25,6 @@ from tests.common.skip import SkipIfS3
 
 TEST_DB = 'colstats_test_db'
 
[email protected]
 # End-to-end validation of Impala column stats usage.
 class TestColStats(ImpalaTestSuite):
   @classmethod

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_compute_stats.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_compute_stats.py 
b/tests/metadata/test_compute_stats.py
index 145238f..b2f6653 100644
--- a/tests/metadata/test_compute_stats.py
+++ b/tests/metadata/test_compute_stats.py
@@ -22,7 +22,6 @@ from tests.util.filesystem_utils import WAREHOUSE
 
 # Tests the COMPUTE STATS command for gathering table and column stats.
 # TODO: Merge this test file with test_col_stats.py
[email protected] # S3: missing coverage: compute stats
 @SkipIf.not_default_fs # Isilon: Missing coverage: compute stats
 class TestComputeStats(ImpalaTestSuite):
   TEST_DB_NAME = "compute_stats_db"
@@ -71,7 +70,6 @@ class TestComputeStats(ImpalaTestSuite):
 
   @pytest.mark.execute_serially
   @SkipIfS3.hive
-  @SkipIfS3.insert
   @SkipIfIsilon.hive
   @SkipIfLocal.hive
   def test_compute_stats_impala_2201(self, vector):
@@ -120,7 +118,6 @@ class TestComputeStats(ImpalaTestSuite):
     assert("1\tpval\t8" in show_result.data[0])
 
 
[email protected] # S3: missing coverage: compute stats
 @SkipIf.not_default_fs # Isilon: Missing coverage: compute stats
 class TestCorruptTableStats(TestComputeStats):
 

Reply via email to