IMPALA-1878: Support INSERT and LOAD DATA on S3 and between filesystems Previously Impala disallowed LOAD DATA and INSERT on S3. This patch functionally enables LOAD DATA and INSERT on S3 without making major changes for the sake of improving performance over S3. This patch also enables both INSERT and LOAD DATA between file systems.
S3 does not support the rename operation, so the staged files in S3 are copied instead of renamed, which contributes to the slow performance on S3. The FinalizeSuccessfulInsert() function now does not make any underlying assumptions of the filesystem it is on and works across all supported filesystems. This is done by adding a full URI field to the base directory for a partition in the TInsertPartitionStatus. Also, the HdfsOp class now does not assume a single filesystem and gets connections to the filesystems based on the URI of the file it is operating on. Added a python S3 client called 'boto3' to access S3 from the python tests. A new class called S3Client is introduced which creates wrappers around the boto3 functions and have the same function signatures as PyWebHdfsClient by deriving from a base abstract class BaseFileSystem so that they can be interchangeably through a 'generic_client'. test_load.py is refactored to use this generic client. The ImpalaTestSuite setup creates a client according to the TARGET_FILESYSTEM environment variable and assigns it to the 'generic_client'. P.S: Currently, the test_load.py runs 4x slower on S3 than on HDFS. Performance needs to be improved in future patches. INSERT performance is slower than on HDFS too. This is mainly because of an extra copy that happens between staging and the final location of a file. However, larger INSERTs come closer to HDFS permformance than smaller inserts. ACLs are not taken care of for S3 in this patch. It is something that still needs to be discussed before implementing. Change-Id: I94e15ad67752dce21c9b7c1dced6e114905a942d Reviewed-on: http://gerrit.cloudera.org:8080/2574 Reviewed-by: Sailesh Mukil <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ed7f5ebf Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ed7f5ebf Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ed7f5ebf Branch: refs/heads/master Commit: ed7f5ebf53e9e318fbf1a9842e0e77c1b81498d3 Parents: 99879aa Author: Sailesh Mukil <[email protected]> Authored: Thu Jan 28 10:52:16 2016 -0800 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 14:17:49 2016 -0700 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-table-writer.cc | 2 +- be/src/exec/hdfs-table-sink.cc | 39 ++--- be/src/exec/hdfs-table-sink.h | 22 ++- be/src/runtime/coordinator.cc | 65 +++++--- be/src/runtime/disk-io-mgr-scan-range.cc | 2 +- be/src/runtime/disk-io-mgr.cc | 4 +- be/src/runtime/hdfs-fs-cache.h | 2 +- be/src/util/CMakeLists.txt | 1 + be/src/util/hdfs-bulk-ops.cc | 120 +++++++++------ be/src/util/hdfs-bulk-ops.h | 39 ++--- be/src/util/hdfs-util-test.cc | 78 ++++++++++ be/src/util/hdfs-util.cc | 43 +++++- be/src/util/hdfs-util.h | 4 +- common/thrift/ImpalaInternalService.thrift | 3 + .../cloudera/impala/analysis/InsertStmt.java | 6 +- .../cloudera/impala/analysis/LoadDataStmt.java | 18 +-- .../cloudera/impala/common/FileSystemUtil.java | 98 ++++++++---- .../com/cloudera/impala/service/Frontend.java | 9 +- .../impala/analysis/AnalyzeStmtsTest.java | 6 +- .../cloudera/impala/planner/S3PlannerTest.java | 2 +- infra/python/deps/requirements.txt | 8 +- .../queries/QueryTest/insert_permutation.test | 5 +- .../queries/QueryTest/multiple-filesystems.test | 63 +++++--- .../queries/QueryTest/truncate-table.test | 8 +- .../workloads/tpch/queries/insert_parquet.test | 5 +- tests/common/impala_test_suite.py | 3 + tests/common/skip.py | 8 +- tests/custom_cluster/test_insert_behaviour.py | 2 +- .../test_parquet_max_page_header.py | 1 - tests/data_errors/test_data_errors.py | 1 - tests/metadata/test_col_stats.py | 1 - tests/metadata/test_compute_stats.py | 3 - tests/metadata/test_ddl.py | 151 +++++++++---------- tests/metadata/test_explain.py | 18 +-- tests/metadata/test_hdfs_encryption.py | 4 +- tests/metadata/test_hdfs_permissions.py | 2 +- tests/metadata/test_last_ddl_time_update.py | 1 - tests/metadata/test_load.py | 30 ++-- tests/metadata/test_partition_metadata.py | 7 +- tests/metadata/test_recover_partitions.py | 46 +++--- tests/metadata/test_show_create_table.py | 1 - tests/query_test/test_aggregation.py | 1 - tests/query_test/test_cancellation.py | 1 - tests/query_test/test_chars.py | 1 - tests/query_test/test_compressed_formats.py | 2 - tests/query_test/test_delimited_text.py | 3 - tests/query_test/test_insert.py | 3 - tests/query_test/test_insert_behaviour.py | 81 +++++----- tests/query_test/test_insert_parquet.py | 16 +- tests/query_test/test_insert_permutation.py | 1 - tests/query_test/test_join_queries.py | 1 - tests/query_test/test_multiple_filesystems.py | 1 - tests/query_test/test_partitioning.py | 4 +- tests/query_test/test_queries.py | 1 - tests/query_test/test_scanners.py | 2 - tests/shell/test_shell_commandline.py | 1 - tests/util/filesystem_base.py | 61 ++++++++ tests/util/filesystem_utils.py | 3 + tests/util/hdfs_util.py | 46 +++--- tests/util/s3_util.py | 99 ++++++++++++ 60 files changed, 809 insertions(+), 450 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index e2fb59c..3743503 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -814,7 +814,7 @@ Status HdfsParquetTableWriter::InitNewFile() { per_file_mem_pool_->Clear(); // Get the file limit - RETURN_IF_ERROR(HdfsTableSink::GetFileBlockSize(output_, &file_size_limit_)); + file_size_limit_ = output_->block_size; if (file_size_limit_ < HDFS_MIN_FILE_SIZE) { stringstream ss; ss << "Hdfs file size (" << file_size_limit_ << ") is too small."; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 74ff018..d145d9a 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -68,7 +68,7 @@ HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, OutputPartition::OutputPartition() : hdfs_connection(NULL), tmp_hdfs_file(NULL), num_rows(0), num_files(0), - partition_descriptor(NULL) { + partition_descriptor(NULL), block_size(0) { } Status HdfsTableSink::PrepareExprs(RuntimeState* state) { @@ -135,7 +135,7 @@ Status HdfsTableSink::Prepare(RuntimeState* state) { return Status(error_msg.str()); } - staging_dir_ = Substitute("$0/_impala_insert_staging/$1/", table_desc_->hdfs_base_dir(), + staging_dir_ = Substitute("$0/_impala_insert_staging/$1", table_desc_->hdfs_base_dir(), PrintId(state->query_id(), "_")); RETURN_IF_ERROR(PrepareExprs(state)); @@ -298,6 +298,25 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, output_partition->tmp_hdfs_file = hdfsOpenFile(hdfs_connection_, tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size); + + if (IsS3APath(output_partition->current_file_name.c_str())) { + // On S3A, the file cannot be stat'ed until after it's closed, and even so, the block + // size reported will be just the filesystem default. So, remember the requested + // block size. + output_partition->block_size = block_size; + } else { + // HDFS may choose to override the block size that we've recommended, so for non-S3 + // files, we get the block size by stat-ing the file. + hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection, + output_partition->current_file_name.c_str()); + if (info == NULL) { + return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS file: ", + output_partition->current_file_name)); + } + output_partition->block_size = info->mBlockSize; + hdfsFreeFileInfo(info, 1); + } + VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr; if (output_partition->tmp_hdfs_file == NULL) { return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ", @@ -478,6 +497,7 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, partition_status.__set_num_appended_rows(0L); partition_status.__set_id(partition_descriptor->id()); partition_status.__set_stats(TInsertStats()); + partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir()); state->per_partition_status()->insert( make_pair(partition->partition_name, partition_status)); @@ -644,21 +664,6 @@ void HdfsTableSink::Close(RuntimeState* state) { closed_ = true; } -Status HdfsTableSink::GetFileBlockSize(OutputPartition* output_partition, int64_t* size) { - hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection, - output_partition->current_file_name.c_str()); - - if (info == NULL) { - return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS file: ", - output_partition->current_file_name)); - } - - *size = info->mBlockSize; - hdfsFreeFileInfo(info, 1); - - return Status::OK(); -} - string HdfsTableSink::DebugString() const { stringstream out; out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false") http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index 9c5ff71..2083ab2 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -35,16 +35,16 @@ class RuntimeState; class HdfsTableWriter; class MemTracker; -/// Records the temporary and final Hdfs file name, the opened temporary Hdfs file, and the -/// number of appended rows of an output partition. +/// Records the temporary and final Hdfs file name, the opened temporary Hdfs file, and +/// the number of appended rows of an output partition. struct OutputPartition { /// In the below, <unique_id_str> is the unique ID passed to HdfsTableSink in string /// form. It is typically the fragment ID that owns the sink. /// Full path to root of the group of files that will be created for this partition. /// Each file will have a sequence number appended. A table writer may produce multiple - /// files per partition. The root is either partition_descriptor->location (if non-empty, - /// i.e. the partition has a custom location) or table_dir/partition_name/ + /// files per partition. The root is either partition_descriptor->location (if + /// non-empty, i.e. the partition has a custom location) or table_dir/partition_name/ /// Path: <root>/<unique_id_str> std::string final_hdfs_file_name_prefix; @@ -85,12 +85,15 @@ struct OutputPartition { /// The descriptor for this partition. const HdfsPartitionDescriptor* partition_descriptor; + /// The block size decided on for this file. + int64_t block_size; + OutputPartition(); }; -/// The sink consumes all row batches of its child execution tree, and writes the evaluated -/// output_exprs into temporary Hdfs files. The query coordinator moves the temporary files -/// into their final locations after the sinks have finished executing. +/// The sink consumes all row batches of its child execution tree, and writes the +/// evaluated output_exprs into temporary Hdfs files. The query coordinator moves the +/// temporary files into their final locations after the sinks have finished executing. // /// This sink supports static and dynamic partition inserts (Hive terminology), /// as well as inserting into unpartitioned tables, @@ -143,11 +146,6 @@ class HdfsTableSink : public DataSink { /// Closes output_exprs and partition_key_exprs. virtual void Close(RuntimeState* state); - /// Get the block size of the current file opened for this partition. - /// This is a utility routine that can be called by specific table - /// writers. Currently used by the parquet writer. - static Status GetFileBlockSize(OutputPartition* output_partition, int64_t* size); - int skip_header_line_count() const { return skip_header_line_count_; } virtual RuntimeProfile* profile() { return runtime_profile_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 0a792f6..451c5e9 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -760,17 +760,12 @@ void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& path_str, Status Coordinator::FinalizeSuccessfulInsert() { PermissionCache permissions_cache; - hdfsFS hdfs_connection; - // InsertStmt ensures that all partitions are on the same filesystem as the table's - // base directory, so opening a single connection is okay. - // TODO: modify this code so that restriction can be lifted. - RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( - finalize_params_.hdfs_base_dir, &hdfs_connection)); + HdfsFsCache::HdfsFsMap filesystem_connection_cache; + HdfsOperationSet partition_create_ops(&filesystem_connection_cache); // INSERT finalization happens in the five following steps // 1. If OVERWRITE, remove all the files in the target directory // 2. Create all the necessary partition directories. - HdfsOperationSet partition_create_ops(&hdfs_connection); DescriptorTbl* descriptor_table; DescriptorTbl::Create(obj_pool(), desc_tbl_, &descriptor_table); HdfsTableDescriptor* hdfs_table = static_cast<HdfsTableDescriptor*>( @@ -783,6 +778,14 @@ Status Coordinator::FinalizeSuccessfulInsert() { BOOST_FOREACH(const PartitionStatusMap::value_type& partition, per_partition_status_) { SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer", "FinalizationTimer")); + // INSERT allows writes to tables that have partitions on multiple filesystems. + // So we need to open connections to different filesystems as necessary. We use a + // local connection cache and populate it with one connection per filesystem that the + // partitions are on. + hdfsFS partition_fs_connection; + RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( + partition.second.partition_base_dir, &partition_fs_connection, + &filesystem_connection_cache)); // Look up the partition in the descriptor table. stringstream part_path_ss; @@ -798,6 +801,7 @@ Status Coordinator::FinalizeSuccessfulInsert() { part_path_ss << part->location(); } const string& part_path = part_path_ss.str(); + bool is_s3_path = IsS3APath(part_path.c_str()); // If this is an overwrite insert, we will need to delete any updated partitions if (finalize_params_.is_overwrite) { @@ -816,11 +820,11 @@ Status Coordinator::FinalizeSuccessfulInsert() { // Once HDFS-8407 is fixed, the errno reset won't be needed. errno = 0; hdfsFileInfo* existing_files = - hdfsListDirectory(hdfs_connection, part_path.c_str(), &num_files); + hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); if (existing_files == NULL && errno == EAGAIN) { errno = 0; existing_files = - hdfsListDirectory(hdfs_connection, part_path.c_str(), &num_files); + hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files); } // hdfsListDirectory() returns NULL not only when there is an error but also // when the directory is empty(HDFS-8407). Need to check errno to make sure @@ -840,10 +844,18 @@ Status Coordinator::FinalizeSuccessfulInsert() { // recursively with abandon, after checking that it ever existed. // TODO: There's a potential race here between checking for the directory // and a third-party deleting it. - if (FLAGS_insert_inherit_permissions) { - PopulatePathPermissionCache(hdfs_connection, part_path, &permissions_cache); + if (FLAGS_insert_inherit_permissions && !is_s3_path) { + // There is no directory structure in S3, so "inheriting" permissions is not + // possible. + // TODO: Try to mimic inheriting permissions for S3. + PopulatePathPermissionCache( + partition_fs_connection, part_path, &permissions_cache); } - if (hdfsExists(hdfs_connection, part_path.c_str()) != -1) { + // S3 doesn't have a directory structure, so we technically wouldn't need to + // CREATE_DIR on S3. However, libhdfs always checks if a path exists before + // carrying out an operation on that path. So we still need to call CREATE_DIR + // before we access that path due to this limitation. + if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) { partition_create_ops.Add(DELETE_THEN_CREATE, part_path); } else { // Otherwise just create the directory. @@ -851,10 +863,11 @@ Status Coordinator::FinalizeSuccessfulInsert() { } } } else { - if (FLAGS_insert_inherit_permissions) { - PopulatePathPermissionCache(hdfs_connection, part_path, &permissions_cache); + if (FLAGS_insert_inherit_permissions && !is_s3_path) { + PopulatePathPermissionCache( + partition_fs_connection, part_path, &permissions_cache); } - if (hdfsExists(hdfs_connection, part_path.c_str()) == -1) { + if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) { partition_create_ops.Add(CREATE_DIR, part_path); } } @@ -868,18 +881,17 @@ Status Coordinator::FinalizeSuccessfulInsert() { // It's ok to ignore errors creating the directories, since they may already // exist. If there are permission errors, we'll run into them later. if (err.first->op() != CREATE_DIR) { - stringstream ss; - ss << "Error(s) deleting partition directories. First error (of " - << partition_create_ops.errors().size() << ") was: " << err.second; - return Status(ss.str()); + return Status(Substitute( + "Error(s) deleting partition directories. First error (of $0) was: $1", + partition_create_ops.errors().size(), err.second)); } } } } // 3. Move all tmp files - HdfsOperationSet move_ops(&hdfs_connection); - HdfsOperationSet dir_deletion_ops(&hdfs_connection); + HdfsOperationSet move_ops(&filesystem_connection_cache); + HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache); BOOST_FOREACH(FileMoveMap::value_type& move, files_to_move_) { // Empty destination means delete, so this is a directory. These get deleted in a @@ -889,7 +901,11 @@ Status Coordinator::FinalizeSuccessfulInsert() { dir_deletion_ops.Add(DELETE, move.first); } else { VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second; - move_ops.Add(RENAME, move.first, move.second); + if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) { + move_ops.Add(RENAME, move.first, move.second); + } else { + move_ops.Add(MOVE, move.first, move.second); + } } } @@ -917,9 +933,9 @@ Status Coordinator::FinalizeSuccessfulInsert() { } // 5. Optionally update the permissions of the created partition directories - // Do this last in case we make the dirs unwriteable. + // Do this last so that we don't make a dir unwritable before we write to it. if (FLAGS_insert_inherit_permissions) { - HdfsOperationSet chmod_ops(&hdfs_connection); + HdfsOperationSet chmod_ops(&filesystem_connection_cache); BOOST_FOREACH(const PermissionCache::value_type& perm, permissions_cache) { bool new_dir = perm.second.first; if (new_dir) { @@ -1528,6 +1544,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para TInsertPartitionStatus* status = &(per_partition_status_[partition.first]); status->num_appended_rows += partition.second.num_appended_rows; status->id = partition.second.id; + status->partition_base_dir = partition.second.partition_base_dir; if (!status->__isset.stats) status->__set_stats(TInsertStats()); DataSink::MergeInsertStats(partition.second.stats, &status->stats); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/runtime/disk-io-mgr-scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index 52a8e66..01ea4b5 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -299,7 +299,7 @@ void DiskIoMgr::ScanRange::Close() { if (hdfs_file_ == NULL) return; struct hdfsReadStatistics* stats; - if (IsDfsPath(file())) { + if (IsHdfsPath(file())) { int success = hdfsFileGetReadStatistics(hdfs_file_->file(), &stats); if (success == 0) { reader_->bytes_read_local_.Add(stats->totalLocalBytesRead); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 4788739..68745d9 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -1153,7 +1153,9 @@ Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) { // If it's a remote range, check for an appropriate remote disk queue. if (!expected_local) { - if (IsDfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) return RemoteDfsDiskId(); + if (IsHdfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) { + return RemoteDfsDiskId(); + } if (IsS3APath(file)) return RemoteS3DiskId(); } // Assign to a local disk queue. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/runtime/hdfs-fs-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/hdfs-fs-cache.h b/be/src/runtime/hdfs-fs-cache.h index a89ad0f..baf163f 100644 --- a/be/src/runtime/hdfs-fs-cache.h +++ b/be/src/runtime/hdfs-fs-cache.h @@ -20,7 +20,7 @@ #include <boost/scoped_ptr.hpp> #include <boost/unordered_map.hpp> #include <boost/thread/mutex.hpp> -#include <hdfs.h> +#include "common/hdfs.h" #include "common/status.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 4b98ff5..ef20afda 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -134,3 +134,4 @@ ADD_BE_TEST(bitmap-test) ADD_BE_TEST(fixed-size-hash-table-test) ADD_BE_TEST(bloom-filter-test) ADD_BE_TEST(logging-support-test) +ADD_BE_TEST(hdfs-util-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-bulk-ops.cc ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-bulk-ops.cc b/be/src/util/hdfs-bulk-ops.cc index d1508fc..d4d94ed 100644 --- a/be/src/util/hdfs-bulk-ops.cc +++ b/be/src/util/hdfs-bulk-ops.cc @@ -33,7 +33,7 @@ HdfsOp::HdfsOp(HdfsOpType op, const std::string& src, HdfsOperationSet* op_set) HdfsOp::HdfsOp(HdfsOpType op, const std::string& src, const std::string& dst, HdfsOperationSet* op_set) : op_(op), src_(src), dst_(dst), op_set_(op_set) { - DCHECK(op == RENAME); + DCHECK(op == RENAME || op == MOVE); DCHECK(!src_.empty()); DCHECK(!dst_.empty()); } @@ -48,64 +48,85 @@ HdfsOp::HdfsOp(HdfsOpType op, const string& src, short permissions, // Required for ThreadPool HdfsOp::HdfsOp() { } +void HdfsOp::AddError(const string& error_msg) const { + stringstream ss; + ss << "Hdfs op ("; + switch (op_) { + case DELETE: + ss << "DELETE " << src_; + break; + case CREATE_DIR: + ss << "CREATE_DIR " << src_; + break; + case RENAME: + ss << "RENAME " << src_ << " TO " << dst_; + break; + case MOVE: + ss << "MOVE " << src_ << " TO " << dst_; + break; + case DELETE_THEN_CREATE: + ss << "DELETE_THEN_CREATE " << src_; + break; + case CHMOD: + ss << "CHMOD " << src_ << " " << oct << permissions_; + break; + } + ss << ") failed, error was: " << error_msg; + op_set_->AddError(ss.str(), this); +} + void HdfsOp::Execute() const { if (op_set_->ShouldAbort()) return; int err = 0; - hdfsFS* hdfs_connection = op_set_->hdfs_connection(); + hdfsFS src_connection; + Status connection_status = HdfsFsCache::instance()->GetConnection(src_, &src_connection, + op_set_->connection_cache()); + + if (!connection_status.ok()) { + AddError(connection_status.GetDetail()); + op_set_->MarkOneOpDone(); + return; + } switch (op_) { case DELETE: - err = hdfsDelete(*hdfs_connection, src_.c_str(), 1); - VLOG_FILE << "hdfsDelete() file=" << src_.c_str(); + err = hdfsDelete(src_connection, src_.c_str(), 1); + VLOG_FILE << "hdfsDelete() file=" << src_; break; case CREATE_DIR: - err = hdfsCreateDirectory(*hdfs_connection, src_.c_str()); - VLOG_FILE << "hdfsCreateDirectory() file=" << src_.c_str(); + err = hdfsCreateDirectory(src_connection, src_.c_str()); + VLOG_FILE << "hdfsCreateDirectory() file=" << src_; break; case RENAME: - err = hdfsRename(*hdfs_connection, src_.c_str(), dst_.c_str()); - VLOG_FILE << "hdfsRename() src_file=" << src_.c_str() - << " dst_file=" << dst_.c_str(); + err = hdfsRename(src_connection, src_.c_str(), dst_.c_str()); + VLOG_FILE << "hdfsRename() src_file=" << src_ << " dst_file=" << dst_; + break; + case MOVE: + hdfsFS dst_connection; + connection_status = HdfsFsCache::instance()->GetConnection(dst_, &dst_connection, + op_set_->connection_cache()); + if (!connection_status.ok()) break; + err = hdfsMove(src_connection, src_.c_str(), dst_connection, dst_.c_str()); + VLOG_FILE << "hdfsMove() src_file=" << src_ << " dst_file=" << dst_; break; case DELETE_THEN_CREATE: - err = hdfsDelete(*hdfs_connection, src_.c_str(), 1); - VLOG_FILE << "hdfsDelete() file=" << src_.c_str(); + err = hdfsDelete(src_connection, src_.c_str(), 1); + VLOG_FILE << "hdfsDelete() file=" << src_; if (err != -1) { - err = hdfsCreateDirectory(*hdfs_connection, src_.c_str()); - VLOG_FILE << "hdfsCreateDirectory() file=" << src_.c_str(); + err = hdfsCreateDirectory(src_connection, src_.c_str()); + VLOG_FILE << "hdfsCreateDirectory() file=" << src_; } break; case CHMOD: - err = hdfsChmod(*hdfs_connection, src_.c_str(), permissions_); - VLOG_FILE << "hdfsChmod() file=" << src_.c_str(); + err = hdfsChmod(src_connection, src_.c_str(), permissions_); + VLOG_FILE << "hdfsChmod() file=" << src_; break; } - if (err == -1) { - string error_msg = GetStrErrMsg(); - stringstream ss; - ss << "Hdfs op ("; - switch (op_) { - case DELETE: - ss << "DELETE " << src_; - break; - case CREATE_DIR: - ss << "CREATE_DIR " << src_; - break; - case RENAME: - ss << "RENAME " << src_ << " TO " << dst_; - break; - case DELETE_THEN_CREATE: - ss << "DELETE_THEN_CREATE " << src_; - break; - case CHMOD: - ss << "CHMOD " << src_ << " " << oct << permissions_; - break; - } - ss << ") failed, error was: " << error_msg; - - op_set_->AddError(ss.str(), this); + if (err == -1 || !connection_status.ok()) { + string error_msg = + connection_status.ok() ? GetStrErrMsg() : connection_status.GetDetail(); + AddError(error_msg); } - op_set_->MarkOneOpDone(); } @@ -122,24 +143,25 @@ HdfsOpThreadPool* impala::CreateHdfsOpThreadPool(const string& name, uint32_t nu max_queue_length, &HdfsThreadPoolHelper); } -HdfsOperationSet::HdfsOperationSet(hdfsFS* hdfs_connection) - : num_ops_(0L), hdfs_connection_(hdfs_connection) { -} +HdfsOperationSet::HdfsOperationSet(HdfsFsCache::HdfsFsMap* connection_cache) + : connection_cache_(connection_cache) { } -bool HdfsOperationSet::Execute(ThreadPool<HdfsOp>* pool, - bool abort_on_error) { +bool HdfsOperationSet::Execute(ThreadPool<HdfsOp>* pool, bool abort_on_error) { { lock_guard<mutex> l(errors_lock_); abort_on_error_ = abort_on_error; } + int64_t num_ops = ops_.size(); if (num_ops == 0) return true; - num_ops_.Add(num_ops); + ops_complete_barrier_.reset(new CountingBarrier(num_ops)); BOOST_FOREACH(const HdfsOp& op, ops_) { pool->Offer(op); } - return promise_.Get(); + + ops_complete_barrier_->Wait(); + return errors().size() == 0; } void HdfsOperationSet::Add(HdfsOpType op, const string& src) { @@ -160,9 +182,7 @@ void HdfsOperationSet::AddError(const string& err, const HdfsOp* op) { } void HdfsOperationSet::MarkOneOpDone() { - if (num_ops_.Add(-1) == 0) { - promise_.Set(errors().size() == 0); - } + ops_complete_barrier_->Notify(); } bool HdfsOperationSet::ShouldAbort() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-bulk-ops.h ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h index 432029f..e66f0d1 100644 --- a/be/src/util/hdfs-bulk-ops.h +++ b/be/src/util/hdfs-bulk-ops.h @@ -22,6 +22,8 @@ #include "common/atomic.h" #include "common/status.h" #include "util/thread-pool.h" +#include "util/counting-barrier.h" +#include "runtime/hdfs-fs-cache.h" namespace impala { @@ -29,6 +31,7 @@ enum HdfsOpType { DELETE, CREATE_DIR, RENAME, + MOVE, DELETE_THEN_CREATE, CHMOD }; @@ -65,7 +68,7 @@ class HdfsOp { /// First operand std::string src_; - /// Second string operand, ignored except for RENAME + /// Second string operand, ignored except for RENAME and MOVE std::string dst_; /// Permission operand, ignored except for CHMOD @@ -73,6 +76,9 @@ class HdfsOp { /// Containing operation set, used to record errors and to signal completion. HdfsOperationSet* op_set_; + + /// Records an error if it happens during an operation. + void AddError(const string& error_msg) const; }; typedef ThreadPool<HdfsOp> HdfsOpThreadPool; @@ -86,16 +92,14 @@ HdfsOpThreadPool* CreateHdfsOpThreadPool(const std::string& name, uint32_t num_t /// added. class HdfsOperationSet { public: - /// Constructs a new operation set. The hdfsFS parameter is shared between all - /// operations, and is not owned by this class (and therefore should remain valid until - /// Execute returns). - HdfsOperationSet(hdfsFS* hdfs_connection); + /// Initializes an operation set. 'connection_cache' is not owned. + HdfsOperationSet(HdfsFsCache::HdfsFsMap* connection_cache); /// Add an operation that takes only a single 'src' parameter (e.g. DELETE, CREATE_DIR, /// DELETE_THEN_CREATE) void Add(HdfsOpType op, const std::string& src); - /// Add an operation that takes two parameters (e.g. RENAME) + /// Add an operation that takes two parameters (e.g. RENAME, MOVE) void Add(HdfsOpType op, const std::string& src, const std::string& dst); /// Add an operation that takes a permission argument (i.e. CHMOD) @@ -103,7 +107,7 @@ class HdfsOperationSet { /// Run all operations on the given pool, blocking until all are complete. Returns false /// if there were any errors, true otherwise. - /// If abort_on_error is true, execution will finish after the first error seen. + /// If 'abort_on_error' is true, execution will finish after the first error seen. bool Execute(HdfsOpThreadPool* pool, bool abort_on_error); typedef std::pair<const HdfsOp*, std::string> Error; @@ -113,22 +117,19 @@ class HdfsOperationSet { /// until Execute has returned. const Errors& errors() { return errors_; } - hdfsFS* hdfs_connection() const { return hdfs_connection_; } + HdfsFsCache::HdfsFsMap* connection_cache() { return connection_cache_; } private: /// The set of operations to be submitted to HDFS std::vector<HdfsOp> ops_; - /// Used to coordinate between the executing threads and the caller; Execute blocks until - /// this is signalled. - Promise<bool> promise_; - - /// The number of ops remaining to be executed. Used to coordinate between executor - /// threads so that when all ops are finished, promise_ is signalled. - AtomicInt64 num_ops_; + /// Used to coordinate between the executing threads and the caller. This is initialized + /// with the number of operations to be executed. Unblocks once all the operations in + /// the set are complete. + boost::scoped_ptr<CountingBarrier> ops_complete_barrier_; - /// HDFS connection shared between all operations. Not owned by this class. - hdfsFS* hdfs_connection_; + /// A connection cache used by this operation set. Not owned. + HdfsFsCache::HdfsFsMap* connection_cache_; /// Protects errors_ and abort_on_error_ during Execute boost::mutex errors_lock_; @@ -141,8 +142,8 @@ class HdfsOperationSet { friend class HdfsOp; - /// Called by HdfsOp to signal its completion. When the last op has finished, this method - /// signals Execute() so that it can return. + /// Called by HdfsOp to signal its completion. When the last op has finished, this + /// method signals Execute() so that it can return. void MarkOneOpDone(); /// Called by HdfsOp to record an error http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-util-test.cc b/be/src/util/hdfs-util-test.cc new file mode 100644 index 0000000..73ff4bf --- /dev/null +++ b/be/src/util/hdfs-util-test.cc @@ -0,0 +1,78 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "hdfs-util.h" + +#include "testutil/gtest-util.h" +#include "common/init.h" +#include "common/logging.h" +#include "util/test-info.h" +#include "runtime/exec-env.h" +#include "service/fe-support.h" + +using namespace impala; + +DECLARE_bool(enable_webserver); + +TEST(HdfsUtilTest, CheckFilesystemsMatch) { + // We do not want to start the webserver. + FLAGS_enable_webserver = false; + ExecEnv* exec_env = new ExecEnv(); + + // We do this to retrieve the default FS from the frontend. + exec_env->StartServices(); + + // Tests with both paths qualified. + EXPECT_TRUE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path", + "s3a://dummybucket/temp_dir_2/temp_path_2")); + EXPECT_FALSE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path", + "s3a://dummybucket_2/temp_dir_2/temp_path_2")); + EXPECT_FALSE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path", + "hdfs://namenode/temp_dir2/temp_path_2")); + EXPECT_FALSE(FilesystemsMatch("hdfs://namenode/temp_dir/temp_path", + "hdfs://namenode_2/temp_dir2/temp_path_2")); + EXPECT_TRUE(FilesystemsMatch("hdfs://namenode:9999/temp_dir/temp_path", + "hdfs://namenode:9999/temp_dir2/temp_path_2")); + EXPECT_FALSE(FilesystemsMatch("hdfs://namenode:9999/temp_dir/temp_path", + "hdfs://namenode:8888/temp_dir2/temp_path_2")); + EXPECT_TRUE(FilesystemsMatch("file:/path/to/dir/filename.parq", + "file:///path/to/dir/filename.parq")); + EXPECT_TRUE(FilesystemsMatch("file:/path/to/dir/filename.parq", + "file:/path_2/to/dir/filename.parq")); + EXPECT_TRUE(FilesystemsMatch("file:///path/to/dir/filename.parq", + "file:/path_2/to/dir/filename.parq")); + EXPECT_FALSE(FilesystemsMatch("file:/path/to/dir/filename.parq", + "file2:/path/to/dir/filename.parq")); + EXPECT_FALSE(FilesystemsMatch("hdfs://", "s3a://dummybucket/temp_dir/temp_path")); + EXPECT_TRUE(FilesystemsMatch("hdfs://namenode", "hdfs://namenode/")); + + // Tests with both paths paths unqualified. + EXPECT_TRUE(FilesystemsMatch("tempdir/temppath", "tempdir2/temppath2")); + + // Tests with one path qualified and the other unqualified. + const char* default_fs = exec_env->default_fs().c_str(); + EXPECT_TRUE(FilesystemsMatch(default_fs, "temp_dir/temp_path")); + EXPECT_TRUE(FilesystemsMatch("temp_dir/temp_path", default_fs)); + EXPECT_FALSE(FilesystemsMatch("badscheme://namenode/temp_dir/temp_path", + "temp_dir/temp_path")); + EXPECT_FALSE(FilesystemsMatch("badscheme://namenode:1234/temp_dir/temp_path", + "temp_dir/temp_path")); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST); + InitFeSupport(); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc index 6c1b57a..836fa12 100644 --- a/be/src/util/hdfs-util.cc +++ b/be/src/util/hdfs-util.cc @@ -64,7 +64,7 @@ Status CopyHdfsFile(const hdfsFS& src_conn, const string& src_path, return Status::OK(); } -bool IsDfsPath(const char* path) { +bool IsHdfsPath(const char* path) { if (strstr(path, ":/") == NULL) { return ExecEnv::GetInstance()->default_fs().compare(0, 7, "hdfs://") == 0; } @@ -78,4 +78,45 @@ bool IsS3APath(const char* path) { return strncmp(path, "s3a://", 6) == 0; } +// Returns the length of the filesystem name in 'path' which is the length of the +// 'scheme://authority'. Returns 0 if the path is unqualified. +static int GetFilesystemNameLength(const char* path) { + // Special case for "file:/". It will not have an authority following it. + if (strncmp(path, "file:", 5) == 0) return 5; + + const char* after_scheme = strstr(path, "://"); + if (after_scheme == NULL) return 0; + // Some paths may come only with a scheme. We add 3 to skip over "://". + if (*(after_scheme + 3) == '\0') return strlen(path); + + const char* after_authority = strstr(after_scheme + 3, "/"); + if (after_authority == NULL) return strlen(path); + return after_authority - path; +} + +bool FilesystemsMatch(const char* path_a, const char* path_b) { + int fs_a_name_length = GetFilesystemNameLength(path_a); + int fs_b_name_length = GetFilesystemNameLength(path_b); + + const char* default_fs = ExecEnv::GetInstance()->default_fs().c_str(); + int default_fs_name_length = GetFilesystemNameLength(default_fs); + + // Neither is fully qualified: both are on default_fs. + if (fs_a_name_length == 0 && fs_b_name_length == 0) return true; + // One is a relative path: check fully-qualified one against default_fs. + if (fs_a_name_length == 0) { + DCHECK_GT(fs_b_name_length, 0); + return strncmp(path_b, default_fs, default_fs_name_length) == 0; + } + if (fs_b_name_length == 0) { + DCHECK_GT(fs_a_name_length, 0); + return strncmp(path_a, default_fs, default_fs_name_length) == 0; + } + DCHECK_GT(fs_a_name_length, 0); + DCHECK_GT(fs_b_name_length, 0); + // Both fully qualified: check the filesystem prefix. + if (fs_a_name_length != fs_b_name_length) return false; + return strncmp(path_a, path_b, fs_a_name_length) == 0; +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/be/src/util/hdfs-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h index 3a3d6b4..0a6b968 100644 --- a/be/src/util/hdfs-util.h +++ b/be/src/util/hdfs-util.h @@ -42,10 +42,12 @@ Status CopyHdfsFile(const hdfsFS& src_conn, const std::string& src_path, const hdfsFS& dst_conn, const std::string& dst_path); /// Returns true iff the path refers to a location on an HDFS filesystem. -bool IsDfsPath(const char* path); +bool IsHdfsPath(const char* path); /// Returns true iff the path refers to a location on an S3A filesystem. bool IsS3APath(const char* path); +/// Returns true iff 'pathA' and 'pathB' are on the same filesystem. +bool FilesystemsMatch(const char* pathA, const char* pathB); } #endif // IMPALA_UTIL_HDFS_UTIL_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index aa2901e..1fcce1b 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -396,6 +396,9 @@ struct TInsertPartitionStatus { // Detailed statistics gathered by table writers for this partition 3: optional TInsertStats stats + + // Fully qualified URI to the base directory for this partition. + 4: required string partition_base_dir } // The results of an INSERT query, sent to the coordinator as part of http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java index 5eb9a26..dc859c1 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java @@ -373,17 +373,13 @@ public class InsertStmt extends StatementBase { "(%s) because Impala does not have WRITE access to at least one HDFS path" + ": %s", targetTableName_, hdfsTable.getFirstLocationWithoutWriteAccess())); } - if (hdfsTable.spansMultipleFileSystems()) { - throw new AnalysisException(String.format("Unable to INSERT into target table " + - "(%s) because the table spans multiple filesystems.", targetTableName_)); - } StringBuilder error = new StringBuilder(); hdfsTable.parseSkipHeaderLineCount(error); if (error.length() > 0) throw new AnalysisException(error.toString()); try { if (!FileSystemUtil.isImpalaWritableFilesystem(hdfsTable.getLocation())) { throw new AnalysisException(String.format("Unable to INSERT into target " + - "table (%s) because %s is not an HDFS filesystem.", targetTableName_, + "table (%s) because %s is not a supported filesystem.", targetTableName_, hdfsTable.getLocation())); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java index 49966f8..ecc7279 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import com.cloudera.impala.authorization.Privilege; @@ -133,9 +134,9 @@ public class LoadDataStmt extends StatementBase { try { Path source = sourceDataPath_.getPath(); FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration()); - if (!(fs instanceof DistributedFileSystem)) { + if (!(fs instanceof DistributedFileSystem) && !(fs instanceof S3AFileSystem)) { throw new AnalysisException(String.format("INPATH location '%s' " + - "must point to an HDFS filesystem.", sourceDataPath_)); + "must point to an HDFS or S3A filesystem.", sourceDataPath_)); } if (!fs.exists(source)) { throw new AnalysisException(String.format( @@ -154,7 +155,8 @@ public class LoadDataStmt extends StatementBase { } if (FileSystemUtil.containsVisibleSubdirectory(source)) { throw new AnalysisException(String.format( - "INPATH location '%s' cannot contain non-hidden subdirectories.", sourceDataPath_)); + "INPATH location '%s' cannot contain non-hidden subdirectories.", + sourceDataPath_)); } if (!checker.getPermissions(fs, source).checkPermissions( FsAction.READ_WRITE)) { @@ -206,16 +208,6 @@ public class LoadDataStmt extends StatementBase { } Preconditions.checkNotNull(partition); - // Until Frontend.loadTableData() can handle cross-filesystem and filesystems - // that aren't HDFS, require that source and dest are on the same HDFS. - if (!FileSystemUtil.isPathOnFileSystem(new Path(location), fs)) { - throw new AnalysisException(String.format( - "Unable to LOAD DATA into target table (%s) because source path (%s) and " + - "destination %s (%s) are on different filesystems.", - hdfsTable.getFullName(), - source, partitionSpec_ == null ? "table" : "partition", - partition.getLocation())); - } // Verify the files being loaded are supported. for (FileStatus fStatus: fs.listStatus(source)) { if (fs.isDirectory(fStatus.getPath())) continue; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java index 969fe55..1f3da2e 100644 --- a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java @@ -81,10 +81,14 @@ public class FileSystemUtil { } /** - * Returns true if path p1 and path p2 are in the same encryption zone. + * Returns true if path p1 and path p2 are in the same encryption zone in HDFS. + * Returns false if they are in different encryption zones or if either of the paths + * are not on HDFS. */ - private static boolean arePathsInSameEncryptionZone(FileSystem fs, Path p1, + private static boolean arePathsInSameHdfsEncryptionZone(FileSystem fs, Path p1, Path p2) throws IOException { + // Only distributed file systems have encryption zones. + if (!isDistributedFileSystem(p1) || !isDistributedFileSystem(p2)) return false; HdfsAdmin hdfsAdmin = new HdfsAdmin(fs.getUri(), CONF); EncryptionZone z1 = hdfsAdmin.getEncryptionZoneForPath(p1); EncryptionZone z2 = hdfsAdmin.getEncryptionZoneForPath(p2); @@ -104,9 +108,10 @@ public class FileSystemUtil { */ public static int relocateAllVisibleFiles(Path sourceDir, Path destDir) throws IOException { - FileSystem fs = destDir.getFileSystem(CONF); - Preconditions.checkState(fs.isDirectory(destDir)); - Preconditions.checkState(fs.isDirectory(sourceDir)); + FileSystem destFs = destDir.getFileSystem(CONF); + FileSystem sourceFs = sourceDir.getFileSystem(CONF); + Preconditions.checkState(destFs.isDirectory(destDir)); + Preconditions.checkState(sourceFs.isDirectory(sourceDir)); // Use the same UUID to resolve all file name conflicts. This helps mitigate problems // that might happen if there is a conflict moving a set of files that have @@ -115,7 +120,7 @@ public class FileSystemUtil { // Enumerate all the files in the source int numFilesMoved = 0; - for (FileStatus fStatus: fs.listStatus(sourceDir)) { + for (FileStatus fStatus: sourceFs.listStatus(sourceDir)) { if (fStatus.isDirectory()) { LOG.debug("Skipping copy of directory: " + fStatus.getPath()); continue; @@ -124,7 +129,7 @@ public class FileSystemUtil { } Path destFile = new Path(destDir, fStatus.getPath().getName()); - if (fs.exists(destFile)) { + if (destFs.exists(destFile)) { destFile = new Path(destDir, appendToBaseFileName(destFile.getName(), uuid.toString())); } @@ -136,44 +141,65 @@ public class FileSystemUtil { /** * Relocates the given file to a new location (either another directory or a - * file. The file is moved (renamed) to the new location unless the source and - * destination are in different encryption zones, in which case the file is copied - * so that the file can be decrypted and/or encrypted. If renameIfAlreadyExists - * is true, no error will be thrown if a file with the same name already exists in the + * file in the same or different filesystem). The file is generally moved (renamed) to + * the new location. However, the file is copied if the source and destination are in + * different encryption zones so that the file can be decrypted and/or encrypted, or if + * the source and destination are in different filesystems. If renameIfAlreadyExists is + * true, no error will be thrown if a file with the same name already exists in the * destination location. Instead, a UUID will be appended to the base file name, - * preserving the the existing file extension. If renameIfAlreadyExists is false, an + * preserving the existing file extension. If renameIfAlreadyExists is false, an * IOException will be thrown if there is a file name conflict. */ public static void relocateFile(Path sourceFile, Path dest, boolean renameIfAlreadyExists) throws IOException { - FileSystem fs = dest.getFileSystem(CONF); - // TODO: Handle moving between file systems - Preconditions.checkArgument(isPathOnFileSystem(sourceFile, fs)); + FileSystem destFs = dest.getFileSystem(CONF); + FileSystem sourceFs = sourceFile.getFileSystem(CONF); - Path destFile = fs.isDirectory(dest) ? new Path(dest, sourceFile.getName()) : dest; + Path destFile = + destFs.isDirectory(dest) ? new Path(dest, sourceFile.getName()) : dest; // If a file with the same name does not already exist in the destination location // then use the same file name. Otherwise, generate a unique file name. - if (renameIfAlreadyExists && fs.exists(destFile)) { - Path destDir = fs.isDirectory(dest) ? dest : dest.getParent(); + if (renameIfAlreadyExists && destFs.exists(destFile)) { + Path destDir = destFs.isDirectory(dest) ? dest : dest.getParent(); destFile = new Path(destDir, appendToBaseFileName(destFile.getName(), UUID.randomUUID().toString())); } - - if (arePathsInSameEncryptionZone(fs, sourceFile, destFile)) { + boolean sameFileSystem = isPathOnFileSystem(sourceFile, destFs); + boolean destIsDfs = isDistributedFileSystem(destFs); + + // If the source and the destination are on different file systems, or in different + // encryption zones, files can't be moved from one location to the other and must be + // copied instead. + boolean sameEncryptionZone = + arePathsInSameHdfsEncryptionZone(destFs, sourceFile, destFile); + // We can do a rename if the src and dst are in the same encryption zone in the same + // distributed filesystem. + boolean doRename = destIsDfs && sameFileSystem && sameEncryptionZone; + // Alternatively, we can do a rename if the src and dst are on the same + // non-distributed filesystem. + if (!doRename) doRename = !destIsDfs && sameFileSystem; + if (doRename) { LOG.debug(String.format( "Moving '%s' to '%s'", sourceFile.toString(), destFile.toString())); // Move (rename) the file. - fs.rename(sourceFile, destFile); - } else { - // We must copy rather than move if the source and dest are in different encryption - // zones. A move would return an error from the NN because a move is a metadata-only - // operation and the files would not be encrypted/decrypted properly on the DNs. + destFs.rename(sourceFile, destFile); + return; + } + if (destIsDfs && sameFileSystem) { + Preconditions.checkState(!doRename); + // We must copy rather than move if the source and dest are in different + // encryption zones. A move would return an error from the NN because a move is a + // metadata-only operation and the files would not be encrypted/decrypted properly + // on the DNs. LOG.info(String.format( - "Copying source '%s' to '%s' because HDFS encryption zones are different", + "Copying source '%s' to '%s' because HDFS encryption zones are different.", + sourceFile, destFile)); + } else { + Preconditions.checkState(!sameFileSystem); + LOG.info(String.format("Copying '%s' to '%s' between filesystems.", sourceFile, destFile)); - FileUtil.copy(sourceFile.getFileSystem(CONF), sourceFile, fs, destFile, - true, true, CONF); } + FileUtil.copy(sourceFs, sourceFile, destFs, destFile, true, true, CONF); } /** @@ -256,6 +282,20 @@ public class FileSystemUtil { } /** + * Returns true iff the filesystem is a S3AFileSystem. + */ + public static boolean isS3AFileSystem(FileSystem fs) { + return fs instanceof S3AFileSystem; + } + + /** + * Returns true iff the path is on a S3AFileSystem. + */ + public static boolean isS3AFileSystem(Path path) throws IOException { + return isS3AFileSystem(path.getFileSystem(CONF)); + } + + /** * Returns true iff the filesystem is an instance of LocalFileSystem. */ public static boolean isLocalFileSystem(FileSystem fs) { @@ -378,6 +418,6 @@ public class FileSystemUtil { throws IOException { Path path = new Path(location); return (FileSystemUtil.isDistributedFileSystem(path) || - FileSystemUtil.isLocalFileSystem(path)); + FileSystemUtil.isLocalFileSystem(path) || FileSystemUtil.isS3AFileSystem(path)); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/main/java/com/cloudera/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java index d6aa517..4d21895 100644 --- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java +++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java @@ -542,15 +542,16 @@ public class Frontend { } Path destPath = new Path(destPathString); - FileSystem fs = destPath.getFileSystem(FileSystemUtil.getConfiguration()); + Path sourcePath = new Path(request.source_path); + FileSystem destFs = destPath.getFileSystem(FileSystemUtil.getConfiguration()); + FileSystem sourceFs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration()); // Create a temporary directory within the final destination directory to stage the // file move. Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath); - Path sourcePath = new Path(request.source_path); int filesLoaded = 0; - if (fs.isDirectory(sourcePath)) { + if (sourceFs.isDirectory(sourcePath)) { filesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath, tmpDestPath); } else { FileSystemUtil.relocateFile(sourcePath, tmpDestPath, true); @@ -565,7 +566,7 @@ public class Frontend { // Move the files from the temporary location to the final destination. FileSystemUtil.relocateAllVisibleFiles(tmpDestPath, destPath); // Cleanup the tmp directory. - fs.delete(tmpDestPath, true); + destFs.delete(tmpDestPath, true); TLoadDataResp response = new TLoadDataResp(); TColumnValue col = new TColumnValue(); String loadMsg = String.format( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java index 60de666..08d86a6 100644 --- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java @@ -2873,15 +2873,15 @@ public class AnalyzeStmtsTest extends AnalyzerTest { "/test-warehouse/tpch.lineitem/", overwrite), "Table does not exist: functional.notbl"); - // Source must be HDFS. + // Source must be HDFS or S3A. AnalysisError(String.format("load data inpath '%s' %s into table " + "tpch.lineitem", "file:///test-warehouse/test.out", overwrite), "INPATH location 'file:/test-warehouse/test.out' must point to an " + - "HDFS filesystem"); + "HDFS or S3A filesystem"); AnalysisError(String.format("load data inpath '%s' %s into table " + "tpch.lineitem", "s3n://bucket/test-warehouse/test.out", overwrite), "INPATH location 's3n://bucket/test-warehouse/test.out' must point to an " + - "HDFS filesystem"); + "HDFS or S3A filesystem"); // File type / table type mismatch. AnalyzesOk(String.format("load data inpath '%s' %s into table " + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java b/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java index 62f3cc8..74fe1a8 100644 --- a/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java +++ b/fe/src/test/java/com/cloudera/impala/planner/S3PlannerTest.java @@ -49,7 +49,7 @@ public class S3PlannerTest extends PlannerTestBase { String targetFs = System.getenv("TARGET_FILESYSTEM"); // Skip if the config property was not set. i.e. not running against S3. assumeTrue(targetFs != null && targetFs.equals("s3")); - String fsNameStr = System.getenv("FILESYSTEM_PREFIX"); + String fsNameStr = System.getenv("DEFAULT_FS"); fsName = new Path(fsNameStr); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/infra/python/deps/requirements.txt ---------------------------------------------------------------------- diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt index 3a3b6da..9ed842c 100644 --- a/infra/python/deps/requirements.txt +++ b/infra/python/deps/requirements.txt @@ -7,6 +7,13 @@ allpairs == 2.0.1 argparse == 1.4.0 +boto3 == 1.2.3 + simplejson == 3.3.0 # For python version 2.6 + botocore == 1.3.30 + python_dateutil == 2.5.2 + docutils == 0.12 + jmespath == 0.9.0 + futures == 3.0.5 cm-api == 10.0.0 # Already available as part of python on Linux. readline == 6.2.4.1; sys_platform == 'darwin' @@ -45,6 +52,5 @@ requests == 2.7.0 sh == 1.11 sqlparse == 0.1.15 texttable == 0.8.3 - # For dev purposes, not used in scripting. Version 1.2.1 is the latest that supports 2.6. ipython == 1.2.1 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test b/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test index 7b8171c..dfdb2ed 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/insert_permutation.test @@ -1,6 +1,7 @@ ==== ---- QUERY -create database insert_permutation_test +create database insert_permutation_test location +'$FILESYSTEM_PREFIX/test-warehouse/insert_permutation_test' ---- RESULTS ==== ---- QUERY @@ -332,4 +333,4 @@ select * from perm_nopart 1,'NULL',NULL ---- TYPES INT,STRING,INT -==== \ No newline at end of file +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test b/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test index 5a1582b..8b758fa 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test +++ b/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test @@ -30,12 +30,6 @@ show table stats tinytable_like BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING ==== ---- QUERY -# Verify INSERT into non-hdfs table is disallowed. -insert into tinytable_like select * from functional.tinytable ----- CATCH -Unable to INSERT into target table (multi_fs_db.tinytable_like) because $NAMENODE/test-warehouse/tinytable is not an HDFS filesystem. -==== ----- QUERY create external table tinytable_copy (a string, b string) row format delimited fields terminated by ',' location '$FILESYSTEM_PREFIX/test-warehouse/tinytable' @@ -101,13 +95,6 @@ select * from alltypes where id%100=0 order by id INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT ==== ---- QUERY -# Verify INSERT into table with partitions on a different filesystem than the table's -# base directory is disallowed. -insert into alltypes select * from functional.alltypes ----- CATCH -Unable to INSERT into target table (multi_fs_db.alltypes) because the table spans multiple filesystems. -==== ----- QUERY # Verify DROP PARTITION for non-default filesystem. alter table alltypes drop partition(year=2010, month=1) ---- RESULTS @@ -242,12 +229,6 @@ select * from alltypes where id%100=0 order by id INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT ==== ---- QUERY -# Verify INSERT into table with partitions on a different filesystems is disallowed. -insert into alltypes select * from functional.alltypes ----- CATCH -Unable to INSERT into target table (multi_fs_db.alltypes) because the table spans multiple filesystems. -==== ----- QUERY compute stats alltypes ---- RESULTS 'Updated 4 partition(s) and 11 column(s).' @@ -303,3 +284,47 @@ order by id ---- TYPES INT, STRING ==== +---- QUERY +create table alltypes_multipart_insert like functional_parquet.alltypes +---- RESULTS +==== +---- QUERY +# ADD PARTITION on a non-default filesystem. +alter table alltypes_multipart_insert add partition(year=2009, month=1) +location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_multipart_insert/year=2009/month=1' +---- RESULTS +==== +---- QUERY +insert into alltypes_multipart_insert partition (year=2009, month=1) +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, +float_col, double_col, date_string_col, string_col, timestamp_col +from functional.alltypes where year = 2009 and month = 1 +---- RESULTS +year=2009/month=1/: 310 +==== +---- QUERY +# ADD PARTITION on the default filesystem. +alter table alltypes_multipart_insert add partition(year=2009, month=2) +location '/test-warehouse/alltypes_multipart_insert/year=2009/month=2' +---- RESULTS +==== +---- QUERY +insert into alltypes_multipart_insert partition (year=2009, month=2) +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, +float_col, double_col, date_string_col, string_col, timestamp_col +from functional.alltypes where year = 2009 and month = 2 +---- RESULTS +year=2009/month=2/: 280 +==== +---- QUERY +select * from alltypes_multipart_insert where id%100=0 order by id; +---- RESULTS +0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1 +100,true,0,0,0,0,0,0,'01/11/09','0',2009-01-11 01:40:04.500000000,2009,1 +200,true,0,0,0,0,0,0,'01/21/09','0',2009-01-21 03:20:09,2009,1 +300,true,0,0,0,0,0,0,'01/31/09','0',2009-01-31 05:00:13.500000000,2009,1 +400,true,0,0,0,0,0,0,'02/10/09','0',2009-02-10 01:30:04.500000000,2009,2 +500,true,0,0,0,0,0,0,'02/20/09','0',2009-02-20 03:10:08.550000000,2009,2 +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test b/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test index 8382d59..19480ff 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test +++ b/testdata/workloads/functional-query/queries/QueryTest/truncate-table.test @@ -1,7 +1,7 @@ ==== ---- QUERY # First create a partitioned table -create table t1 like functional.alltypes; +create table t1 like functional.alltypes location '$FILESYSTEM_PREFIX/test-warehouse/t1'; insert into t1 partition(year, month) select * from functional.alltypes; compute incremental stats t1; show table stats t1; @@ -117,7 +117,7 @@ STRING, STRING, BIGINT, BIGINT, INT, DOUBLE ==== ---- QUERY # Create an unpartitioned table. -create table t2 like functional.tinytable; +create table t2 like functional.tinytable location '$FILESYSTEM_PREFIX/test-warehouse/t2'; insert into t2 select * from functional.tinytable; compute incremental stats t2; show table stats t2; @@ -167,7 +167,7 @@ truncate table if exists non_existent; ==== ---- QUERY # Create an unpartitioned table. -create table t3 like functional.tinytable; +create table t3 like functional.tinytable location '$FILESYSTEM_PREFIX/test-warehouse/t3'; insert into t3 select * from functional.tinytable; select count(*) from t3; ---- RESULTS @@ -187,4 +187,4 @@ select count(*) from t3; 0 ---- TYPES BIGINT -==== \ No newline at end of file +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/testdata/workloads/tpch/queries/insert_parquet.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/tpch/queries/insert_parquet.test b/testdata/workloads/tpch/queries/insert_parquet.test index 55f84e3..35b7cde 100644 --- a/testdata/workloads/tpch/queries/insert_parquet.test +++ b/testdata/workloads/tpch/queries/insert_parquet.test @@ -1,7 +1,8 @@ ==== ---- QUERY # Tests using a larger table. -create table if not exists orders_insert_test like orders; +create table if not exists orders_insert_test like orders location +'$FILESYSTEM_PREFIX/test-warehouse/orders_insert_table'; insert overwrite table orders_insert_test select * from tpch.orders ---- RESULTS : 1500000 @@ -55,7 +56,7 @@ bigint ---- QUERY # Test to verify that huge (larger than 64k) values can be written, see IMPALA-1705 create table if not exists test_insert_huge_vals (s string) stored as parquet -location '$FILESYSTEM_PREFIX/test_insert_huge_vals'; +location '$FILESYSTEM_PREFIX/test-warehouse/test_insert_huge_vals'; insert overwrite table test_insert_huge_vals select cast(l_orderkey as string) from tpch.lineitem union select group_concat(concat(s_name, s_address, s_phone)) from tpch.supplier http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/common/impala_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 9ef70fe..0ed587b 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -40,6 +40,8 @@ from tests.performance.query import Query from tests.performance.query_executor import JdbcQueryExecConfig from tests.performance.query_exec_functions import execute_using_jdbc from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, get_hdfs_client_from_conf +from tests.util.s3_util import S3Client +from tests.util.filesystem_utils import IS_S3, S3_BUCKET_NAME # Imports required for Hive Metastore Client from hive_metastore import ThriftHiveMetastore @@ -115,6 +117,7 @@ class ImpalaTestSuite(BaseTestSuite): cls.impalad_test_service = cls.create_impala_service() cls.hdfs_client = cls.create_hdfs_client() + cls.filesystem_client = S3Client(S3_BUCKET_NAME) if IS_S3 else cls.hdfs_client @classmethod def teardown_class(cls): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/common/skip.py ---------------------------------------------------------------------- diff --git a/tests/common/skip.py b/tests/common/skip.py index 99d7e2d..3c4fe27 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -27,17 +27,19 @@ from tests.util.filesystem_utils import IS_DEFAULT_FS, IS_S3, IS_ISILON, IS_LOCA class SkipIfS3: # These ones are skipped due to product limitations. - insert = pytest.mark.skipif(IS_S3, reason="INSERT not implemented for S3") - load_data = pytest.mark.skipif(IS_S3, reason="LOAD DATA not implemented for S3") caching = pytest.mark.skipif(IS_S3, reason="SET CACHED not implemented for S3") hive = pytest.mark.skipif(IS_S3, reason="Hive doesn't work with S3") hdfs_block_size = pytest.mark.skipif(IS_S3, reason="S3 uses it's own block size") + hdfs_acls = pytest.mark.skipif(IS_S3, reason="HDFS acls are not supported on S3") jira = partial(pytest.mark.skipif, IS_S3) + hdfs_encryption = pytest.mark.skipif(IS_S3, + reason="HDFS encryption is not supported with S3") + hdfs_purge = pytest.mark.skipif(IS_S3, + reason="PURGE has no effect on S3") # These ones need test infra work to re-enable. udfs = pytest.mark.skipif(IS_S3, reason="udas/udfs not copied to S3") datasrc = pytest.mark.skipif(IS_S3, reason="data sources not copied to S3") - hdfs_client = pytest.mark.skipif(IS_S3, reason="hdfs_client doesn't work with S3") hbase = pytest.mark.skipif(IS_S3, reason="HBase not started with S3") qualified_path = pytest.mark.skipif(IS_S3, reason="Tests rely on HDFS qualified paths, IMPALA-1872") http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/custom_cluster/test_insert_behaviour.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_insert_behaviour.py b/tests/custom_cluster/test_insert_behaviour.py index 03d5188..73acb6c 100644 --- a/tests/custom_cluster/test_insert_behaviour.py +++ b/tests/custom_cluster/test_insert_behaviour.py @@ -21,7 +21,7 @@ from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, get_hdfs_client_fr TEST_TBL = "insert_inherit_permission" [email protected] [email protected]_acls class TestInsertBehaviourCustomCluster(CustomClusterTestSuite): @classmethod http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/custom_cluster/test_parquet_max_page_header.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_parquet_max_page_header.py b/tests/custom_cluster/test_parquet_max_page_header.py index c1210ff..8eaf64b 100644 --- a/tests/custom_cluster/test_parquet_max_page_header.py +++ b/tests/custom_cluster/test_parquet_max_page_header.py @@ -90,7 +90,6 @@ class TestParquetMaxPageHeader(CustomClusterTestSuite): put.wait() @SkipIfS3.hive - @SkipIfS3.insert @SkipIfIsilon.hive @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("-max_page_header_size=31457280") http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/data_errors/test_data_errors.py ---------------------------------------------------------------------- diff --git a/tests/data_errors/test_data_errors.py b/tests/data_errors/test_data_errors.py index 145ab94..019c771 100644 --- a/tests/data_errors/test_data_errors.py +++ b/tests/data_errors/test_data_errors.py @@ -96,7 +96,6 @@ class TestHBaseDataErrors(TestDataErrors): vector.get_value('exec_option')['abort_on_error'] = 0 self.run_test_case('DataErrorsTest/hbase-insert-errors', vector) [email protected] class TestTimestampErrors(TestDataErrors): """ Create test table with various valid/invalid timestamp values, then run http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_col_stats.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_col_stats.py b/tests/metadata/test_col_stats.py index f3440ae..447861a 100644 --- a/tests/metadata/test_col_stats.py +++ b/tests/metadata/test_col_stats.py @@ -25,7 +25,6 @@ from tests.common.skip import SkipIfS3 TEST_DB = 'colstats_test_db' [email protected] # End-to-end validation of Impala column stats usage. class TestColStats(ImpalaTestSuite): @classmethod http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_compute_stats.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py index 145238f..b2f6653 100644 --- a/tests/metadata/test_compute_stats.py +++ b/tests/metadata/test_compute_stats.py @@ -22,7 +22,6 @@ from tests.util.filesystem_utils import WAREHOUSE # Tests the COMPUTE STATS command for gathering table and column stats. # TODO: Merge this test file with test_col_stats.py [email protected] # S3: missing coverage: compute stats @SkipIf.not_default_fs # Isilon: Missing coverage: compute stats class TestComputeStats(ImpalaTestSuite): TEST_DB_NAME = "compute_stats_db" @@ -71,7 +70,6 @@ class TestComputeStats(ImpalaTestSuite): @pytest.mark.execute_serially @SkipIfS3.hive - @SkipIfS3.insert @SkipIfIsilon.hive @SkipIfLocal.hive def test_compute_stats_impala_2201(self, vector): @@ -120,7 +118,6 @@ class TestComputeStats(ImpalaTestSuite): assert("1\tpval\t8" in show_result.data[0]) [email protected] # S3: missing coverage: compute stats @SkipIf.not_default_fs # Isilon: Missing coverage: compute stats class TestCorruptTableStats(TestComputeStats):
