IMPALA-3452: S3: Disable Impala staging for INSERTs via flag for speedup INSERTs on S3 are slower because of double buffering where we buffer once locally and once in a staging directoy in S3 before moving the file(s) to the final location. Also, moving the file from the staging directory to the final location in HDFS is a quick rename which is only a metadata operation. However, on S3, renames are not supported, thus becoming a full file copy instead of just a metadata rename operation.
This patch instroduces a boolean query option "s3_skip_insert_staging" which avoids the staging step on S3 and allows the sinks to write to the final location directly. This trades in consistency for the sake of performance. If a node(s) fails during the query, then we will end up with inconsistent results in the final location. P.S: This option is disabled for INSERT OVERWRITE queries as that would require cleaning the destination directory before moving the final files there. However, the coordinator is responsible for the cleaning which takes place only after the table sinks have moved the files to the final location. Thus, INSERT OVERWRITE queries must still have their files moved to a staging location by the table sinks. Performance gains: - For non-partitioned tables, the INSERT queries run 4-4.5x faster on S3. (Tested on a 63GB INSERT to a table) - For heavily partitioned tables, there is considerable improvement in the order of 4-5 minutes on queries that take ~27 minutes but queries are still slow because of IMPALA-3482 where the catalog takes too long to update all the metadata. (Tested with a query that creates 2.4K partitions in a table totalling ~19GB). Change-Id: Iff9620d41ba0d5fb1aa0c9f4abb48866fc2b0698 Reviewed-on: http://gerrit.cloudera.org:8080/2905 Reviewed-by: Sailesh Mukil <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/27815818 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/27815818 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/27815818 Branch: refs/heads/master Commit: 27815818b92362bc8b913a3da789c6debc88d551 Parents: 616eb2f Author: Sailesh Mukil <[email protected]> Authored: Fri Apr 29 12:30:59 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 14:18:00 2016 -0700 ---------------------------------------------------------------------- be/src/exec/hdfs-table-sink.cc | 37 ++++++---- be/src/exec/hdfs-table-sink.h | 9 +++ be/src/runtime/coordinator.cc | 4 +- be/src/service/query-options.cc | 5 ++ be/src/service/query-options.h | 5 +- common/thrift/ImpalaInternalService.thrift | 6 ++ common/thrift/ImpalaService.thrift | 13 +++- .../queries/QueryTest/insert.test | 76 ++++++++++++++++++++ 8 files changed, 137 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index f345b73..d3a2ade 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -278,11 +278,21 @@ void HdfsTableSink::BuildHdfsFileNames( Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition) { SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer")); - stringstream filename; - filename << output_partition->tmp_hdfs_file_name_prefix - << "." << output_partition->num_files - << "." << output_partition->writer->file_extension(); - output_partition->current_file_name = filename.str(); + string final_location = Substitute("$0.$1.$2", + output_partition->final_hdfs_file_name_prefix, output_partition->num_files, + output_partition->writer->file_extension()); + + // If ShouldSkipStaging() is true, then the table sink will write the file(s) for this + // partition to the final location directly. If it is false, the file(s) will be written + // to a temporary staging location which will be moved by the coordinator to the final + // location. + if (ShouldSkipStaging(state, output_partition)) { + output_partition->current_file_name = final_location; + } else { + output_partition->current_file_name = Substitute("$0.$1.$2", + output_partition->tmp_hdfs_file_name_prefix, output_partition->num_files, + output_partition->writer->file_extension()); + } // Check if tmp_hdfs_file_name exists. const char* tmp_hdfs_file_name_cstr = output_partition->current_file_name.c_str(); @@ -323,12 +333,10 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(1); COUNTER_ADD(files_created_counter_, 1); - // Save the ultimate destination for this file (it will be moved by the coordinator) - stringstream dest; - dest << output_partition->final_hdfs_file_name_prefix - << "." << output_partition->num_files - << "." << output_partition->writer->file_extension(); - (*state->hdfs_files_to_move())[output_partition->current_file_name] = dest.str(); + if (!ShouldSkipStaging(state, output_partition)) { + // Save the ultimate destination for this file (it will be moved by the coordinator) + (*state->hdfs_files_to_move())[output_partition->current_file_name] = final_location; + } ++output_partition->num_files; output_partition->num_rows = 0; @@ -498,7 +506,7 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, state->per_partition_status()->insert( make_pair(partition->partition_name, partition_status)); - if (!no_more_rows) { + if (!no_more_rows && ShouldSkipStaging(state, partition)) { // Indicate that temporary directory is to be deleted after execution (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = ""; } @@ -660,6 +668,11 @@ void HdfsTableSink::Close(RuntimeState* state) { closed_ = true; } +bool HdfsTableSink::ShouldSkipStaging(RuntimeState* state, OutputPartition* partition) { + return IsS3APath(partition->final_hdfs_file_name_prefix.c_str()) && !overwrite_ && + state->query_options().s3_skip_insert_staging; +} + string HdfsTableSink::DebugString() const { stringstream out; out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false") http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index 2083ab2..a06e66d 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -56,6 +56,7 @@ struct OutputPartition { /// Name of the temporary directory that files for this partition are staged to before /// the coordinator moves them to their permanent location once the query completes. + /// Not used if 'skip_staging' is true. /// Path: <base_table_dir/<staging_dir>/<unique_id>_dir/ std::string tmp_hdfs_dir_name; @@ -210,6 +211,14 @@ class HdfsTableSink : public DataSink { /// Closes the hdfs file for this partition as well as the writer. void ClosePartitionFile(RuntimeState* state, OutputPartition* partition); + // Returns TRUE if the staging step should be skipped for this partition. This allows + // for faster INSERT query completion time for the S3A filesystem as the coordinator + // does not have to copy the file(s) from the staging locaiton to the final location. We + // do not skip for INSERT OVERWRITEs because the coordinator will delete all files in + // the final location before moving the staged files there, so we cannot write directly + // to the final location and need to write to the temporary staging location. + bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition); + /// Descriptor of target table. Set in Prepare(). const HdfsTableDescriptor* table_desc_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 6753c2b..2af596c 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -861,7 +861,9 @@ Status Coordinator::FinalizeSuccessfulInsert() { partition_create_ops.Add(CREATE_DIR, part_path); } } - } else { + } else if (!is_s3_path || !query_ctx_.request.query_options.s3_skip_insert_staging) { + // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories + // would have already been created by the table sinks. if (FLAGS_insert_inherit_permissions && !is_s3_path) { PopulatePathPermissionCache( partition_fs_connection, part_path, &permissions_cache); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 45571bf..ce538bf 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -400,6 +400,11 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_mt_num_cores(num_cores); break; } + case TImpalaQueryOptions::S3_SKIP_INSERT_STAGING: { + query_options->__set_s3_skip_insert_staging( + iequals(value, "true") || iequals(value, "1")); + break; + } default: // We hit this DCHECK(false) if we forgot to add the corresponding entry here // when we add a new query option. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index a727c8d..56e2e1a 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -32,7 +32,7 @@ class TQueryOptions; // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::MT_NUM_CORES + 1);\ + TImpalaQueryOptions::S3_SKIP_INSERT_STAGING + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -76,7 +76,8 @@ class TQueryOptions; QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\ QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\ QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\ - QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES); + QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\ + QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING); /// Converts a TQueryOptions struct into a map of key, value pairs. void TQueryOptionsToMap(const TQueryOptions& query_options, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 6c2fc3e..611155c 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -186,6 +186,12 @@ struct TQueryOptions { // 1: single-threaded execution mode // 0: multi-threaded execution mode, number of cores is the pool default 44: optional i32 mt_num_cores = 1 + + // If true, INSERT writes to S3 go directly to their final location rather than being + // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for + // those queries, the coordinator deletes all files in the final location before copying + // the files there. + 45: optional bool s3_skip_insert_staging = true } // Impala currently has two types of sessions: Beeswax and HiveServer2 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index c9535eb..0a030ad 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -204,14 +204,21 @@ enum TImpalaQueryOptions { // If true, use UTF-8 annotation for string columns. Note that char and varchar columns // always use the annotation. - PARQUET_ANNOTATE_STRINGS_UTF8 + PARQUET_ANNOTATE_STRINGS_UTF8, // Determines how to resolve Parquet files' schemas in the absence of field IDs (which // is always, since fields IDs are NYI). Valid values are "position" and "name". - PARQUET_FALLBACK_SCHEMA_RESOLUTION + PARQUET_FALLBACK_SCHEMA_RESOLUTION, // Multi-threaded execution: number of cores per machine - MT_NUM_CORES + MT_NUM_CORES, + + // If true, INSERT writes to S3 go directly to their final location rather than being + // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for + // those queries, the coordinator deletes all files in the final location before copying + // the files there. + // TODO: Find a way to get this working for INSERT OVERWRITEs too. + S3_SKIP_INSERT_STAGING } // The summary of an insert. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27815818/testdata/workloads/functional-query/queries/QueryTest/insert.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test b/testdata/workloads/functional-query/queries/QueryTest/insert.test index 29450cd..157fd9a 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/insert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test @@ -768,3 +768,79 @@ select * from table_with_header_insert; ---- TYPES INT ==== +---- QUERY +# The following 4 queries are to test IMPALA-3452 which test S3 INSERTs with staging. +SET S3_SKIP_INSERT_STAGING=false; +# static partition overwrite +insert overwrite table alltypesinsert +partition (year=2009, month=4) +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, +float_col, double_col, date_string_col, string_col, timestamp_col +from alltypessmall +where year=2009 and month=4 +---- SETUP +DROP PARTITIONS alltypesinsert +RESET alltypesinsert +---- RESULTS +year=2009/month=4/: 25 +==== +---- QUERY +# search the overwritten partition to verify the results +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, +double_col, date_string_col, string_col +from alltypesinsert +where year=2009 and month=4 +---- RESULTS +75,false,0,0,0,0,0,0,'04/01/09','0' +76,true,1,1,1,10,1.100000023841858,10.1,'04/01/09','1' +77,false,2,2,2,20,2.200000047683716,20.2,'04/01/09','2' +78,true,3,3,3,30,3.299999952316284,30.3,'04/01/09','3' +79,false,4,4,4,40,4.400000095367432,40.4,'04/01/09','4' +80,true,5,5,5,50,5.5,50.5,'04/01/09','5' +81,false,6,6,6,60,6.599999904632568,60.6,'04/01/09','6' +82,true,7,7,7,70,7.699999809265137,70.7,'04/01/09','7' +83,false,8,8,8,80,8.800000190734863,80.8,'04/01/09','8' +84,true,9,9,9,90,9.899999618530273,90.90000000000001,'04/01/09','9' +85,false,0,0,0,0,0,0,'04/02/09','0' +86,true,1,1,1,10,1.100000023841858,10.1,'04/02/09','1' +87,false,2,2,2,20,2.200000047683716,20.2,'04/02/09','2' +88,true,3,3,3,30,3.299999952316284,30.3,'04/02/09','3' +89,false,4,4,4,40,4.400000095367432,40.4,'04/02/09','4' +90,true,5,5,5,50,5.5,50.5,'04/02/09','5' +91,false,6,6,6,60,6.599999904632568,60.6,'04/02/09','6' +92,true,7,7,7,70,7.699999809265137,70.7,'04/02/09','7' +93,false,8,8,8,80,8.800000190734863,80.8,'04/02/09','8' +94,true,9,9,9,90,9.899999618530273,90.90000000000001,'04/02/09','9' +95,false,0,0,0,0,0,0,'04/03/09','0' +96,true,1,1,1,10,1.100000023841858,10.1,'04/03/09','1' +97,false,2,2,2,20,2.200000047683716,20.2,'04/03/09','2' +98,true,3,3,3,30,3.299999952316284,30.3,'04/03/09','3' +99,false,4,4,4,40,4.400000095367432,40.4,'04/03/09','4' +---- TYPES +int, boolean, tinyint, smallint, int, bigint, float, double, string, string +==== +---- QUERY +SET S3_SKIP_INSERT_STAGING=false; +# fully dynamic partition insert$TABLE, check partition creation +insert into table alltypesinsert +partition (year, month) +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, +float_col, double_col, date_string_col, string_col, timestamp_col, year, month +from alltypessmall +---- SETUP +DROP PARTITIONS alltypesinsert +---- RESULTS +year=2009/month=1/: 25 +year=2009/month=2/: 25 +year=2009/month=3/: 25 +year=2009/month=4/: 25 +==== +---- QUERY +# search the partitions to verify they contain all 100 rows +select count(timestamp_col) from alltypesinsert +where year=2009 and month>=1 and month<=4 +---- RESULTS +100 +---- TYPES +bigint +====
