[
https://issues.apache.org/jira/browse/TRAFODION-2259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15546968#comment-15546968
]
ASF GitHub Bot commented on TRAFODION-2259:
-------------------------------------------
Github user DaveBirdsall commented on a diff in the pull request:
https://github.com/apache/incubator-trafodion/pull/743#discussion_r81872504
--- Diff: core/sql/sort/Topn.cpp ---
@@ -0,0 +1,337 @@
+/**********************************************************************
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+**********************************************************************/
+/* -*-C++-*-
+******************************************************************************
+*
+* File: TopN.cpp
+*
+* Description: This file contains the implementation of all member
functions
+* of the class TopN.
+*
+* 1. Sort would initially maintain Top N array of elements to being with.
+* 2. Read records into TopN array.
+* 3. Once TopN array is full, heapify the array into max heap. Top node in
the heap is always the highest node.
+* 4. Subsequent record read either gets discarded( if greater than top
node) or replace top node( if lesser then top node) . if replaced top node,
re-balance the heap.
+* 5. Repeat steps 4 until last record is read.
+* 6. sort the final heap using heap sort.
+*******************************************************************************/
+
+#include <iostream>
+#include <fstream>
+
+#ifndef DEBUG
+#undef NDEBUG
+#define NDEBUG
+#endif
+#include "ex_stdh.h"
+#include "Topn.h"
+#include "ScratchSpace.h"
+#include "logmxevent.h"
+#include "SortUtil.h"
+#include "ex_ex.h"
+#include "ExStats.h"
+
+//------------------------------------------------------------------------
+// Class Constructor.
+//------------------------------------------------------------------------
+TopN::TopN(ULng32 runsize, ULng32 sortmaxmem, ULng32 recsize,
+ NABoolean doNotallocRec, ULng32 keysize,
+ SortScratchSpace* scratch, NABoolean iterSort,
+ CollHeap* heap, SortError* sorterror, Lng32 explainNodeId,
SortUtil* sortutil):
+ SortAlgo(runsize, recsize, doNotallocRec, keysize, scratch,
explainNodeId),
+ loopIndex_(0), heap_(heap), sortError_(sorterror),
+ sortUtil_(sortutil)
+{
+ //runsize is TopN size. Fixed.
+ allocRunSize_ = runsize;
+
+ //Actual run size after all elements read.
+ runSize_ = 0;
+
+ isHeapified_ = FALSE;
+
+ //allocateMemory failureIsFatal is defaulted to TRUE means allocation
failure results in
+ //longjump to jump handler defined in ex_sort.cpp. Only applicable on
NSK.
+ topNKeys_ = (RecKeyBuffer *)heap_->allocateMemory(sizeof(RecKeyBuffer)
* allocRunSize_);
+
+ // Below asserts useful in debug mode. Also asserts if longjmp did not
happen.
+ //ex_assert(rootRecord_!= NULL, "Sort: Initial rootRecord_ allocation
failed");
+ ex_assert(topNKeys_ != NULL, "Sort: Initial topNKeys_ allocation
failed");
+
+ recNum_ = 0;
+ ExOperStats *stat =
sortUtil_->config()->getCallingTcb()->getStatsEntry();
+ if (stat)
+ bmoStats_ = stat->castToExBMOStats();
+ else
+ bmoStats_ = NULL;
+ if (bmoStats_)
+ bmoStats_->updateBMOHeapUsage((NAHeap *)heap_);
+}
+
+
+//------------------------------------------------------------------------
+// Class Destructor: Delete all the heap space pointed by pointers in Qsort
+//------------------------------------------------------------------------
+TopN::~TopN(void)
+{
+
+ for (int i = 0; i < runSize_; i++)
+ topNKeys_[i].rec_->releaseTupp();
+
+
+ if (topNKeys_ != NULL) {
+ NADELETEBASIC(topNKeys_, heap_);
+ topNKeys_ = NULL;
+ }
+ if (bmoStats_)
+ bmoStats_->updateBMOHeapUsage((NAHeap *)heap_);
+
+}
+
+Lng32 TopN::sortSend(void *rec, ULng32 len, void* tupp)
+{
+ //if heap not built means, TopN array has more slots
+ //available to fill.
+ if(! isHeapified_)
+ {
+ ex_assert(loopIndex_ >= 0, "TopN::sortSend: loopIndex_ is < 0");
+ ex_assert(loopIndex_ < allocRunSize_, "TopN::sortSend: loopIndex_ >
allocRunSize_");
+ ex_assert(rec != NULL, "TopN::sortSend: rec is NULL");
+
+ Record * insertRec = new Record(rec, len, tupp, heap_, sortError_);
+
+ topNKeys_[loopIndex_].key_ = insertRec->extractKey(keySize_,
sortUtil_->config()->numberOfBytesForRecordSize());
+ topNKeys_[loopIndex_].rec_ = insertRec;
+ if (++loopIndex_ == allocRunSize_)
+ {
+ //Reaching here means, we have filled up the array.
+ //Now heapify the array to start accepting/eliminating new
elements from now on.
+
+ //Note lookIndex_ contains the current number of filled elements.
+
+ buildHeap();
+ }
+ return SORT_SUCCESS;
+ }
+
+ //Reaching here means, heap is already build.
+ //Check if the new rec belongs to this heap by comparing the
+ //new rec key with the root node of the heap ( root node is always the
greatest).
+ insertRec(rec, len, tupp);
+ return SORT_SUCCESS;
+ }
+
+
+void TopN::buildHeap()
+{
+ if(!isHeapified_)
+ {
+ //loopIndex_ is now <= TopN
+ runSize_ = loopIndex_;
+
+ satisfyHeap();
+
+ isHeapified_ = TRUE;
+ }
+}
+
+//Satisfy Heap makes sure the heap is balanced maxHeap.
+//Note this does not mean heap is sorted. It just makes sure
+//the higher level nodes are greater than lower level nodes.
+//Topmost node or root will be the highest.
+void TopN::satisfyHeap()
+{
+ for (int i = (runSize_/2 ); i >= 0; i--)
+ siftDown(topNKeys_, i, runSize_-1);
+}
+
+
+void TopN::insertRec(void *rec, ULng32 len, void* tupp)
+{
+ ex_assert(isHeapified_, "TopN::insertRec: not heapified");
+
+ int root = 0; //Always, root node is the zero'th element in array.
+
+ Record * insertRec = new Record(rec, len, tupp, heap_, sortError_);
+ insertRecKey_.key_ = insertRec->extractKey(keySize_,
sortUtil_->config()->numberOfBytesForRecordSize());
+ insertRecKey_.rec_ = insertRec;
+
+ if (compare(topNKeys_[root].key_ ,insertRecKey_.key_) ==
KEY1_IS_GREATER)
+ {
+ swap( &topNKeys_[root],&insertRecKey_);
+
+
+ //After swap, satisfy or rebalance the heap.
+ satisfyHeap();
+
+ }
+
+ //Once swapped, or not swapped, delete the unneeded record.
+ //This step is very important to discard the unwanted record.
+ //Note releaseTupp() also deletes the tupp allocated in sql
+ //buffer. This makes space for new records read in.
+ insertRecKey_.rec_->releaseTupp();
+ delete insertRecKey_.rec_;
+
+}
+
+Lng32 TopN::sortSendEnd()
+{
+ Lng32 retcode = SORT_SUCCESS;
+ ex_assert(loopIndex_ >= 0, "TopN::sortSendEnd: loopIndex_ is < 0");
+
+ buildHeap();
+ sortHeap();
+
+ return retcode;
+}
+
+void TopN::sortHeap()
+{
+ for (int i = runSize_-1; i >= 1; i--)
+ {
+ swap(&topNKeys_[0],&topNKeys_[i]);
+ siftDown(topNKeys_, 0, i-1);
+ }
+}
+
+Lng32 TopN::sortReceive(void *rec, ULng32& len)
+{
+ return SORT_FAILURE;
+}
+
+Lng32 TopN::sortReceive(void*& rec, ULng32& len, void*& tupp)
+{
+ //---------------------------------------------------------------
+ // We use Qsort to receive records only in case of internal sort
+ // for merging.
+ //---------------------------------------------------------------
+ if (recNum_ < runSize_) {
+ topNKeys_[recNum_].rec_->getRecordTupp(rec, recSize_, tupp);
+ len = recSize_;
+ recNum_++;
+ }
+ else {
+ len = 0;
+ }
+
+ return SORT_SUCCESS;
+}
+/*
+//----------------------------------------------------------------------
--- End diff --
I guess this commented out code could be removed?
> Sort TopN operator
> ------------------
>
> Key: TRAFODION-2259
> URL: https://issues.apache.org/jira/browse/TRAFODION-2259
> Project: Apache Trafodion
> Issue Type: Improvement
> Components: sql-exe
> Affects Versions: 2.1-incubating
> Reporter: Prashanth Vasudev
> Assignee: Prashanth Vasudev
>
> Sort operator consumes all records before producing sorted records. For
> certain use cases where only Top N records are required, today sort consumes
> all records into memory and overflows( spills ) to disk. This impacts
> performance.
> if topN is pushed down to sort, only required memory can be allocated and
> sort would only hold topN records in memory. Once all the records are read,
> sorted records in topN is returned.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)