IMPALA-1346/1590/2344: fix sorter buffer mgmt when spilling The Sorter's memory management logic failed to correctly manage buffers when spilling. It would try to make use of all buffers in the system, neglecting to account for other operators' buffer usage.
This patch adjusts the logic so that it handles contention for buffers so long as it can get enough buffers to make progress. Instead of precalculating the number of buffers it thinks it should be able to pin, it just makes a best-effort attempt to pin the initial buffers as many runs as possible, up to a limit. As long as it can pin three runs, it can make progress. Testing: Added an additional test that failed before the patch without OOM. An analytic function test that was meant to fail also started succeeding so I had to adjust the limit there too. Change-Id: Idfe55cc13c7f2b54cba1d05ade44cbcf6bb573c0 Reviewed-on: http://gerrit.cloudera.org:8080/2908 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Tim Armstrong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ee53ddb3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ee53ddb3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ee53ddb3 Branch: refs/heads/master Commit: ee53ddb389549247f5bfe760d446dc7b3b963a29 Parents: 37ec253 Author: Tim Armstrong <[email protected]> Authored: Wed Apr 20 01:11:27 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Mon Jun 6 17:34:07 2016 -0700 ---------------------------------------------------------------------- be/src/runtime/buffered-block-mgr.h | 6 - be/src/runtime/sorter.cc | 228 +++++++++++-------- be/src/runtime/sorter.h | 24 +- .../queries/QueryTest/analytic-fns.test | 7 +- .../queries/QueryTest/spilling.test | 39 ++++ 5 files changed, 188 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/be/src/runtime/buffered-block-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h index 9e54c4f..78fdb37 100644 --- a/be/src/runtime/buffered-block-mgr.h +++ b/be/src/runtime/buffered-block-mgr.h @@ -389,12 +389,6 @@ class BufferedBlockMgr { /// only for error reporting. Status MemLimitTooLowError(Client* client, int node_id); - /// TODO: Remove these two. Not clear what the sorter really needs. - /// TODO: Those are dirty, dangerous reads to two lists whose all other accesses are - /// protected by the lock_. Using those two functions is looking for trouble. - int available_allocated_buffers() const { return all_io_buffers_.size(); } - int num_free_buffers() const { return free_io_buffers_.size(); } - int num_pinned_buffers(Client* client) const; int num_reserved_buffers_remaining(Client* client) const; MemTracker* get_tracker(Client* client) const; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index c242b6b..2e9e6ec 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -32,20 +32,26 @@ using namespace strings; namespace impala { -// Number of pinned blocks required for a merge. -const int BLOCKS_REQUIRED_FOR_MERGE = 3; +// Number of pinned blocks required for a merge with fixed-length data only. +const int MIN_BUFFERS_PER_MERGE = 3; -// Error message when pinning fixed or variable length blocks failed. -// TODO: Add the node id that iniated the sort -const string PIN_FAILED_ERROR_MSG = "Failed to pin block for $0-length data needed " - "for sorting. Reducing query concurrency or increasing the memory limit may help " - "this query to complete successfully."; +// Maximum number of buffers to use in each merge to prevent sorter trying to grab +// all the memory when spilling. Given 8mb buffers, this limits the sorter to using +// 1GB of buffers when merging. +// TODO: this is an arbitrary limit. Once we have reliable reservations (IMPALA-3200) +// we should base this on the number of reservations. +const int MAX_BUFFERS_PER_MERGE = 128; const string MEM_ALLOC_FAILED_ERROR_MSG = "Failed to allocate block for $0-length " "data needed for sorting. Reducing query concurrency or increasing the " "memory limit may help this query to complete successfully."; -/// Delete all non-null blocks in 'blocks' and clear vector. +const string MERGE_FAILED_ERROR_MSG = "Failed to allocate block to merge spilled runs " + "during sorting. Only $0 runs could be merged, but must be able to merge at least 2 " + "to make progress. Reducing query concurrency or increasing the memory limit may " + "help this query to complete successfully."; + +/// Delete all non-null blocks in blocks and clear vector. static void DeleteAndClearBlocks(vector<BufferedBlockMgr::Block*>* blocks) { for (BufferedBlockMgr::Block* block: *blocks) { if (block != NULL) block->Delete(); @@ -135,8 +141,12 @@ class Sorter::Run { void DeleteAllBlocks(); /// Prepare to read a sorted run. Pins the first block(s) in the run if the run was - /// previously unpinned. - Status PrepareRead(); + /// previously unpinned. If the run was unpinned, try to pin the initial fixed and + /// var len blocks in the run. If it couldn't pin them, set pinned_all_blocks to false. + /// In that case, none or one of the initial blocks may be pinned and it is valid to + /// call PrepareRead() again to retry pinning the remainder. pinned_all_blocks is + /// always set to true if the run is pinned. + Status PrepareRead(bool* pinned_all_blocks); /// Interface for merger - get the next batch of rows from this run. This run still /// owns the returned batch. Calls GetNext(RowBatch*, bool*). @@ -723,7 +733,7 @@ Status Sorter::Run::UnpinAllBlocks() { return Status::OK(); } -Status Sorter::Run::PrepareRead() { +Status Sorter::Run::PrepareRead(bool* pinned_all_blocks) { DCHECK(is_finalized_); DCHECK(is_sorted_); @@ -737,33 +747,32 @@ Status Sorter::Run::PrepareRead() { sorter_->state_->batch_size(), sorter_->mem_tracker_)); // If the run is pinned, all blocks are already pinned, so we're ready to read. - if (is_pinned_) return Status::OK(); + if (is_pinned_) { + *pinned_all_blocks = true; + return Status::OK(); + } // Attempt to pin the first fixed and var-length blocks. In either case, pinning may // fail if the number of reserved blocks is oversubscribed, see IMPALA-1590. if (fixed_len_blocks_.size() > 0) { bool pinned; RETURN_IF_ERROR(fixed_len_blocks_[0]->Pin(&pinned)); - // Temporary work-around for IMPALA-1868. Fail the query with OOM rather than - // DCHECK in case block pin fails. if (!pinned) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "fixed")); - return status; + *pinned_all_blocks = false; + return Status::OK(); } } if (HasVarLenBlocks()) { bool pinned; RETURN_IF_ERROR(var_len_blocks_[0]->Pin(&pinned)); - // Temporary work-around for IMPALA-1590. Fail the query with OOM rather than - // DCHECK in case block pin fails. if (!pinned) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(PIN_FAILED_ERROR_MSG, "variable")); - return status; + *pinned_all_blocks = false; + return Status::OK(); } } + + *pinned_all_blocks = true; return Status::OK(); } @@ -1328,6 +1337,7 @@ Sorter::Sorter(const TupleRowComparator& compare_less_than, mem_tracker_(mem_tracker), output_row_desc_(output_row_desc), unsorted_run_(NULL), + merge_output_run_(NULL), profile_(profile), initial_runs_counter_(NULL), num_merges_counter_(NULL), @@ -1339,6 +1349,7 @@ Sorter::~Sorter() { DCHECK(sorted_runs_.empty()); DCHECK(merging_runs_.empty()); DCHECK(unsorted_run_ == NULL); + DCHECK(merge_output_run_ == NULL); } Status Sorter::Init() { @@ -1355,13 +1366,13 @@ Status Sorter::Init() { in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime"); sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES); - int min_blocks_required = BLOCKS_REQUIRED_FOR_MERGE; - // Fixed and var-length blocks are separate, so we need BLOCKS_REQUIRED_FOR_MERGE + int min_buffers_required = MIN_BUFFERS_PER_MERGE; + // Fixed and var-length blocks are separate, so we need MIN_BUFFERS_PER_MERGE // blocks for both if there is var-length data. - if (has_var_len_slots_) min_blocks_required *= 2; + if (has_var_len_slots_) min_buffers_required *= 2; RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this), - min_blocks_required, false, mem_tracker_, state_, &block_mgr_client_)); + min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_)); DCHECK(unsorted_run_ != NULL); RETURN_IF_ERROR(unsorted_run_->Init()); @@ -1394,44 +1405,25 @@ Status Sorter::InputDone() { RETURN_IF_ERROR(SortCurrentInputRun()); if (sorted_runs_.size() == 1) { - // The entire input fit in one run. Read sorted rows in GetNext() directly - // from the sorted run. - RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead()); + // The entire input fit in one run. Read sorted rows in GetNext() directly from the + // in-memory sorted run. + DCHECK(sorted_runs_.back()->is_pinned()); + bool success; + RETURN_IF_ERROR(sorted_runs_.back()->PrepareRead(&success)); + DCHECK(success) << "Should always be able to prepare pinned run for read."; return Status::OK(); } - // At least one merge is necessary. - int blocks_per_run = has_var_len_slots_ ? 2 : 1; - int min_buffers_for_merge = sorted_runs_.size() * blocks_per_run; - // Check if the final run needs to be unpinned. - bool unpinned_final = false; - if (block_mgr_->num_free_buffers() < min_buffers_for_merge - blocks_per_run) { - // Number of available buffers is less than the size of the final run and - // the buffers needed to read the remainder of the runs in memory. - // Unpin the final run. - RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks()); - unpinned_final = true; - } else { - // No need to unpin the current run. There is enough memory to stream the - // other runs. - // TODO: revisit. It might be better to unpin some from this run if it means - // we can get double buffering in the other runs. - } + // Unpin the final run to free up memory for the merge. + // TODO: we could keep it in memory in some circumstances as an optimisation, once + // we have a buffer pool with more reliable reservations (IMPALA-3200). + RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllBlocks()); - // For an intermediate merge, intermediate_merge_batch contains deep-copied rows from - // the input runs. If (unmerged_sorted_runs_.size() > max_runs_per_final_merge), - // one or more intermediate merges are required. + // Merge intermediate runs until we have a final merge set-up. // TODO: Attempt to allocate more memory before doing intermediate merges. This may // be possible if other operators have relinquished memory after the sort has built - // its runs. - if (min_buffers_for_merge > block_mgr_->available_allocated_buffers()) { - DCHECK(unpinned_final); - RETURN_IF_ERROR(MergeIntermediateRuns()); - } - - // Create the final merger. - RETURN_IF_ERROR(CreateMerger(sorted_runs_.size())); - return Status::OK(); + // its runs. This depends on more reliable reservations (IMPALA-3200) + return MergeIntermediateRuns(); } Status Sorter::GetNext(RowBatch* output_batch, bool* eos) { @@ -1444,7 +1436,7 @@ Status Sorter::GetNext(RowBatch* output_batch, bool* eos) { } Status Sorter::Reset() { - DCHECK(unsorted_run_ == NULL); + DCHECK(unsorted_run_ == NULL) << "Cannot Reset() before calling InputDone()"; merger_.reset(); // Free resources from the current runs. CleanupAllRuns(); @@ -1466,6 +1458,8 @@ void Sorter::CleanupAllRuns() { Run::CleanupRuns(&merging_runs_); if (unsorted_run_ != NULL) unsorted_run_->DeleteAllBlocks(); unsorted_run_ = NULL; + if (merge_output_run_ != NULL) merge_output_run_->DeleteAllBlocks(); + merge_output_run_ = NULL; } Status Sorter::SortCurrentInputRun() { @@ -1484,55 +1478,52 @@ Status Sorter::SortCurrentInputRun() { } Status Sorter::MergeIntermediateRuns() { - int blocks_per_run = has_var_len_slots_ ? 2 : 1; - int max_runs_per_final_merge = - block_mgr_->available_allocated_buffers() / blocks_per_run; + DCHECK_GE(sorted_runs_.size(), 2); + int pinned_blocks_per_run = has_var_len_slots_ ? 2 : 1; + int max_runs_per_final_merge = MAX_BUFFERS_PER_MERGE / pinned_blocks_per_run; - // During an intermediate merge, blocks from the output sorted run will have to be - // pinned. + // During an intermediate merge, the one or two blocks from the output sorted run + // that are being written must be pinned. int max_runs_per_intermediate_merge = max_runs_per_final_merge - 1; DCHECK_GT(max_runs_per_intermediate_merge, 1); - // For an intermediate merge, intermediate_merge_batch contains deep-copied rows from - // the input runs. If (sorted_runs_.size() > max_runs_per_final_merge), - // one or more intermediate merges are required. - scoped_ptr<RowBatch> intermediate_merge_batch; - while (sorted_runs_.size() > max_runs_per_final_merge) { + + while (true) { // An intermediate merge adds one merge to unmerged_sorted_runs_. - // Merging 'runs - (max_runs_final_ - 1)' number of runs is sufficient to guarantee - // that the final merge can be performed. - int num_runs_to_merge = min<int>(max_runs_per_intermediate_merge, - sorted_runs_.size() - max_runs_per_intermediate_merge); - RETURN_IF_ERROR(CreateMerger(num_runs_to_merge)); - RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(), - mem_tracker_); - // 'merged_run' is the new sorted run that is produced by the intermediate merge. - // We added 'merged_run' to 'sorted_runs_' immediately so that it is cleaned up - // in Close(). - Run* merged_run = obj_pool_.Add( + // TODO: once we have reliable reservations (IMPALA-3200), we should calculate this + // based on the available reservations. + int num_runs_to_merge = + min<int>(max_runs_per_intermediate_merge, sorted_runs_.size()); + + DCHECK(merge_output_run_ == NULL) << "Should have finished previous merge."; + // Create the merged run in case we need to do intermediate merges. We need the + // output run and at least two input runs in memory to make progress on the + // intermediate merges. + // TODO: this isn't optimal: we could defer creating the merged run if we have + // reliable reservations (IMPALA-3200). + merge_output_run_ = obj_pool_.Add( new Run(this, output_row_desc_->tuple_descriptors()[0], false)); - sorted_runs_.push_back(merged_run); - RETURN_IF_ERROR(merged_run->Init()); - bool eos = false; - while (!eos) { - // Copy rows into the new run until done. - int num_copied; - RETURN_IF_CANCELLED(state_); - RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos)); - RETURN_IF_ERROR( - merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, &num_copied)); - - DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows()); - intermediate_merge_batch.Reset(); - } + RETURN_IF_ERROR(merge_output_run_->Init()); + RETURN_IF_ERROR(CreateMerger(num_runs_to_merge)); - RETURN_IF_ERROR(merged_run->FinalizeInput()); + // If CreateMerger() consumed all the sorted runs, we have set up the final merge. + if (sorted_runs_.empty()) { + // Don't need intermediate run for final merge. + if (merge_output_run_ != NULL) { + merge_output_run_->DeleteAllBlocks(); + merge_output_run_ = NULL; + } + return Status::OK(); + } + RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_)); + sorted_runs_.push_back(merge_output_run_); + merge_output_run_ = NULL; } - return Status::OK(); } -Status Sorter::CreateMerger(int num_runs) { - DCHECK_GT(num_runs, 1); +Status Sorter::CreateMerger(int max_num_runs) { + DCHECK_GE(max_num_runs, 2); + DCHECK_GE(sorted_runs_.size(), 2); // Clean up the runs from the previous merge. Run::CleanupRuns(&merging_runs_); @@ -1543,10 +1534,24 @@ Status Sorter::CreateMerger(int num_runs) { new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, true)); vector<function<Status (RowBatch**)> > merge_runs; - merge_runs.reserve(num_runs); - for (int i = 0; i < num_runs; ++i) { + merge_runs.reserve(max_num_runs); + for (int i = 0; i < max_num_runs; ++i) { Run* run = sorted_runs_.front(); - RETURN_IF_ERROR(run->PrepareRead()); + bool success; + RETURN_IF_ERROR(run->PrepareRead(&success)); + if (!success) { + // If we can merge at least two runs, we can continue, otherwise we have a problem + // because we can't make progress on the merging. + // TODO: IMPALA-3200: we should not need this logic once we have reliable + // reservations (IMPALA-3200). + if (merging_runs_.size() < 2) { + Status status = Status::MemLimitExceeded(); + status.AddDetail(Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size())); + return status; + } + // Merge the runs that we were able to prepare. + break; + } // Run::GetNextBatch() is used by the merger to retrieve a batch of rows to merge // from this run. merge_runs.push_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1)); @@ -1559,4 +1564,25 @@ Status Sorter::CreateMerger(int num_runs) { return Status::OK(); } + +Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) { + RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(), + mem_tracker_); + bool eos = false; + while (!eos) { + // Copy rows into the new run until done. + int num_copied; + RETURN_IF_CANCELLED(state_); + RETURN_IF_ERROR(merger_->GetNext(&intermediate_merge_batch, &eos)); + RETURN_IF_ERROR( + merged_run->AddIntermediateBatch(&intermediate_merge_batch, 0, &num_copied)); + + DCHECK_EQ(num_copied, intermediate_merge_batch.num_rows()); + intermediate_merge_batch.Reset(); + } + + RETURN_IF_ERROR(merged_run->FinalizeInput()); + return Status::OK(); +} + } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/be/src/runtime/sorter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index 16b55c2..2b76fa9 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -124,20 +124,28 @@ class Sorter { class TupleIterator; class TupleSorter; - /// Create a SortedRunMerger from the first 'num_runs' sorted runs in sorted_runs_ and - /// assign it to merger_. The runs to be merged are removed from sorted_runs_. - /// The Sorter sets the deep_copy_input flag to true for the merger, since the blocks - /// containing input run data will be unpinned as input runs are read. - Status CreateMerger(int num_runs); + /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign it to + /// 'merger_'. Attempts to set up merger with 'max_num_runs' runs but may set it + /// up with fewer if it cannot pin the initial blocks of all of the runs. Fails + /// if it cannot merge at least two runs. The runs to be merged are removed from + /// 'sorted_runs_'. The Sorter sets the 'deep_copy_input' flag to true for the + /// merger, since the blocks containing input run data will be deleted as input + /// runs are read. + Status CreateMerger(int max_num_runs); /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger - /// merged run until the number of remaining runs is small enough for a single merge. + /// merged run until there are few enough runs to be merged with a single merger. + /// Returns when 'merger_' is set up to merge the final runs. /// At least 1 (2 if var-len slots) block from each sorted run must be pinned for /// a merge. If the number of sorted runs is too large, merge sets of smaller runs /// into large runs until a final merge can be performed. An intermediate row batch /// containing deep copied rows is used for the output of each intermediate merge. Status MergeIntermediateRuns(); + /// Execute a single step of the intermediate merge, pulling rows from 'merger_' + /// and adding them to 'merged_run'. + Status ExecuteIntermediateMerge(Sorter::Run* merged_run); + /// Called once there no more rows to be added to 'unsorted_run_'. Sorts /// 'unsorted_run_' and appends it to the list of sorted runs. Status SortCurrentInputRun(); @@ -195,6 +203,10 @@ class Sorter { /// These runs can be deleted when we are done with the current merge. std::deque<Run*> merging_runs_; + /// Output run for the merge. Stored in Sorter() so that it can be cleaned up + /// in Sorter::Close() in case of errors. + Run* merge_output_run_; + /// Pool of owned Run objects. Maintains Runs objects across non-freeing Reset() calls. ObjectPool obj_pool_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test index 02feb31..b31620f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test +++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test @@ -1585,15 +1585,16 @@ int, bigint, bigint, double ---- QUERY # Regression test for IMPALA-2265, IMPALA-2559. The max_block_mgr_memory is tuned to # reproduce the issue when running this query against functional_parquet. -SET max_block_mgr_memory=36m; +SET max_block_mgr_memory=16m; SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 LIMIT 10; ---- CATCH Memory limit exceeded ==== ---- QUERY -# Check that the above query can succeed with more buffers. -SET max_block_mgr_memory=64m; +# Check that the above query can succeed with the minimum buffers (3 buffers for sort, +# 1 buffer for analytic). +SET max_block_mgr_memory=32m; SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 LIMIT 10; ---- TYPES http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee53ddb3/testdata/workloads/functional-query/queries/QueryTest/spilling.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test index f462435..e97afc5 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test @@ -589,3 +589,42 @@ STRING row_regex: .*SpilledRuns: .* \([1-9][0-9]*\) row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\) ==== +---- QUERY +# IMPALA-1346/IMPALA-1546: fix sorter memory management so that it can complete +# successfully when in same pipeline as a spilling join. +set num_nodes=0; +set mem_limit=200m; +set max_block_mgr_memory=50m; +set disable_outermost_topn=1; +select * from lineitem + inner join orders on l_orderkey = o_orderkey +order by l_linenumber, l_suppkey, l_partkey, l_orderkey +limit 20 +---- RESULTS +4567296,2500,1,1,48.00,67320.00,0.06,0.05,'N','O','1997-05-15','1997-05-19','1997-05-27','DELIVER IN PERSON','REG AIR','ccounts cajole quickly ',4567296,100420,'O',113399.69,'1997-02-21','1-URGENT','Clerk#000000779',0,'ously ironic instructions. pa' +5427587,2500,1,1,40.00,56100.00,0.05,0.07,'N','O','1997-04-01','1997-05-22','1997-04-29','TAKE BACK RETURN','RAIL','oxes wake even theodolites: bold requests',5427587,110356,'O',182983.43,'1997-03-10','1-URGENT','Clerk#000000279',0,'osits wake along the ca' +3834597,7500,1,1,3.00,4222.50,0.03,0.01,'N','O','1998-07-03','1998-05-12','1998-07-04','COLLECT COD','AIR','sly final instructions boost about',3834597,29839,'O',147632.36,'1998-03-23','1-URGENT','Clerk#000000284',0,'le carefully blithel' +4888512,10000,1,1,14.00,12740.00,0.03,0.01,'A','F','1993-02-07','1992-12-29','1993-02-19','NONE','SHIP','al braids. unusual, silent sentiments c',4888512,93742,'F',12481.37,'1992-11-21','4-NOT SPECIFIED','Clerk#000000591',0,' requests hinder blithely. closely ironic theodolites cajole among the car' +693345,12497,1,1,14.00,19732.86,0.03,0.07,'N','O','1998-10-12','1998-08-18','1998-10-23','NONE','FOB','. blithely',693345,49472,'O',111825.01,'1998-07-06','4-NOT SPECIFIED','Clerk#000000132',0,'es wake regularly furiously pending orbits. quickly even requests according t' +710784,12497,1,1,11.00,15504.39,0.02,0.07,'R','F','1992-08-21','1992-08-11','1992-09-05','TAKE BACK RETURN','SHIP','haggle furiously special accounts? final th',710784,82009,'F',91959.19,'1992-06-20','1-URGENT','Clerk#000000001',0,'rve quickly after the express theodolites. furiou' +1240672,12497,1,1,41.00,57789.09,0.08,0.05,'N','O','1998-09-18','1998-08-04','1998-09-24','NONE','RAIL','o the furiously unusual requests sleep alo',1240672,59153,'O',55824.25,'1998-05-28','1-URGENT','Clerk#000000657',0,'ular pinto beans are above the furiously regular accounts: furiously even foxe' +2024230,12497,1,1,16.00,22551.84,0.05,0.00,'A','F','1992-04-10','1992-04-14','1992-04-13','NONE','RAIL','pecial theodolites wake slyly. care',2024230,145955,'F',186715.47,'1992-02-22','5-LOW','Clerk#000000904',0,'uses. accounts are furiously. fluffily regular ideas haggle b' +3039715,12497,1,1,14.00,19732.86,0.01,0.02,'R','F','1993-03-24','1993-02-12','1993-03-28','TAKE BACK RETURN','AIR','onic requests. furiously spe',3039715,27274,'F',80817.94,'1992-11-23','2-HIGH','Clerk#000000766',0,'lently regular packages believe slyly around the regular, regula' +3105635,12497,1,1,16.00,22551.84,0.03,0.07,'R','F','1993-07-30','1993-08-28','1993-08-19','TAKE BACK RETURN','SHIP','nal, ironic theodolites solve carefully',3105635,23665,'F',256886.23,'1993-06-22','5-LOW','Clerk#000000029',0,'otes-- blithely special accounts cajole slyly furiously silent dugout' +3764485,12497,1,1,22.00,31008.78,0.05,0.08,'R','F','1993-02-07','1993-03-18','1993-02-10','COLLECT COD','AIR',' pinto beans nag',3764485,19375,'F',107449.65,'1992-12-26','5-LOW','Clerk#000000364',0,' carefully regular foxes bo' +4767393,12497,1,1,3.00,4228.47,0.09,0.06,'A','F','1992-09-15','1992-10-02','1992-09-17','DELIVER IN PERSON','FOB',' asymptotes. blithely',4767393,22522,'F',163054.32,'1992-08-10','2-HIGH','Clerk#000000174',0,'unusual foxes after the furiously regular multipliers detect ca' +1955428,14998,1,1,47.00,89910.53,0.07,0.02,'R','F','1993-01-25','1993-04-02','1993-01-31','DELIVER IN PERSON','MAIL','thely regular frays',1955428,41801,'F',134123.84,'1993-01-08','4-NOT SPECIFIED','Clerk#000000422',0,'ld deposits are slyly. quickly bold ideas bo' +4197636,14998,1,1,6.00,11477.94,0.03,0.01,'A','F','1993-11-28','1993-11-27','1993-12-03','TAKE BACK RETURN','SHIP','ly. slyly final ex',4197636,77494,'F',283248.52,'1993-10-18','1-URGENT','Clerk#000000087',0,'final asymptotes? packages doze against the ironic packages. ironic, bold dec' +4899558,17499,1,1,12.00,16997.88,0.00,0.08,'A','F','1992-08-06','1992-08-09','1992-08-14','COLLECT COD','RAIL','ourts would sleep fluffily express accou',4899558,111973,'F',300951.70,'1992-05-21','1-URGENT','Clerk#000000826',0,'eodolites wake ironic sentiments. r' +4725760,20000,1,1,48.00,44160.00,0.02,0.05,'R','F','1992-03-06','1992-03-04','1992-03-20','NONE','SHIP','liers. slyly regular request',4725760,45176,'F',202199.68,'1992-01-31','4-NOT SPECIFIED','Clerk#000000853',0,'riously regular accounts. ironic, bold requests was slyly; slyly regula' +49444,22494,1,1,22.00,31162.78,0.06,0.08,'N','O','1997-03-03','1997-04-19','1997-03-25','TAKE BACK RETURN','MAIL','l requests among the blithely fin',49444,11725,'O',273794.80,'1997-01-23','2-HIGH','Clerk#000000814',0,'s. quickly bold packages integrate. furio' +396839,24996,1,1,9.00,17288.91,0.03,0.07,'N','O','1996-06-19','1996-04-27','1996-07-13','COLLECT COD','MAIL',' accounts ',396839,63389,'O',193790.34,'1996-03-11','5-LOW','Clerk#000000436',0,'eans. instructions are quickly--' +1109894,24996,1,1,22.00,42261.78,0.01,0.05,'A','F','1995-01-10','1995-03-01','1995-01-21','TAKE BACK RETURN','RAIL','nusual requests wake. qu',1109894,35489,'F',78070.57,'1994-12-23','2-HIGH','Clerk#000000524',0,' bold deposits engage fluffily among the s' +5825859,24996,1,1,5.00,9604.95,0.02,0.00,'A','F','1995-05-05','1995-03-17','1995-05-21','TAKE BACK RETURN','MAIL','y special a',5825859,13285,'F',90407.81,'1995-01-15','3-MEDIUM','Clerk#000000818',0,'ajole. quickly ironic theodolites ' +---- TYPES +BIGINT,BIGINT,BIGINT,INT,DECIMAL,DECIMAL,DECIMAL,DECIMAL,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,BIGINT,BIGINT,STRING,DECIMAL,STRING,STRING,STRING,INT,STRING +---- RUNTIME_PROFILE +# Verify that the sort and join actually spilled +row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\) +row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\) +====
