http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/021ebd6c/core/sql/ustat/hs_globals.cpp ---------------------------------------------------------------------- diff --git a/core/sql/ustat/hs_globals.cpp b/core/sql/ustat/hs_globals.cpp index aa1c860..43f582e 100644 --- a/core/sql/ustat/hs_globals.cpp +++ b/core/sql/ustat/hs_globals.cpp @@ -120,6 +120,8 @@ Lng32 setBufferValue(MCWrapper& value, const HSColGroupStruct *mgroup, HSDataBuf template <class T> void createHistogram(HSColGroupStruct *group, Lng32 numIntervals, Int64 estRowCount, NABoolean usingSample, T* dummyPtr); +static Lng32 drop_I(NAString& sampTblName); + // // Initialize the GLOBAL instances of ISFixedChar and ISVarChar values. // See the "as lightweight as possible" comments in hs_globals.h @@ -1413,7 +1415,8 @@ HSColGroupStruct::HSColGroupStruct() mcis_colsMissingMap(NULL), mcis_memFreed(FALSE), mcis_totalMCmemNeeded(0), mcis_groupHead(TRUE), mcis_next(NULL), mcis_readAsIs (FALSE), delayedRead(FALSE), cbf(NULL), - boundaryValues(NULL), MFVValues(NULL), allKeysInsertedIntoCBF(FALSE) + boundaryValues(NULL), MFVValues(NULL), allKeysInsertedIntoCBF(FALSE), + backwardWarningCount(0) { strcpy(readTime, "0001-01-01 00:00:00"); // default if new #ifdef _TEST_ALLOC_FAILURE @@ -1583,6 +1586,13 @@ NABoolean HSColGroupStruct::allocateISMemory(Int64 rows, // avoid freeing strData when this fn is called to remove the old data array. void HSColGroupStruct::freeISMemory(NABoolean freeStrData, NABoolean freeMCData) { + HSLogMan *LM = HSLogMan::Instance(); + if (LM->LogNeeded()) + { + sprintf(LM->msg, "Freeing IS memory for column %s", colNames->data()); + LM->Log(LM->msg); + } + // used by MC in-memory since a column might have been processed but kept // in memory to be used by MCs mcis_memFreed = TRUE; @@ -2835,6 +2845,7 @@ HSGlobalsClass::HSGlobalsClass(ComDiagsArea &diags) numPartitions(0), hstogram_table(new(STMTHEAP) NAString(STMTHEAP)), hsintval_table(new(STMTHEAP) NAString(STMTHEAP)), + hsperssamp_table(new(STMTHEAP) NAString(STMTHEAP)), hssample_table(new(STMTHEAP) NAString(STMTHEAP)), statstime(new(STMTHEAP) NAString(STMTHEAP)), @@ -2865,9 +2876,13 @@ HSGlobalsClass::HSGlobalsClass(ComDiagsArea &diags) //iusSampleInMem(NULL), iusSampleDeletedInMem(NULL), iusSampleInsertedInMem(NULL), + sampleIExists_(FALSE), + PST_IUSrequestedSampleRows_(NULL), + PST_IUSactualSampleRows_(NULL), sampleRateAsPercetageForIUS(0), minRowCtPerPartition_(-1), sample_I_generated(FALSE), + PSRowUpdated(FALSE), jitLogThreshold(0), stmtStartTime(0), jitLogOn(FALSE), @@ -2898,6 +2913,16 @@ HSGlobalsClass::HSGlobalsClass(ComDiagsArea &diags) HSGlobalsClass::~HSGlobalsClass() { + // If this was an IUS execution, make sure the row for the source table in + // SB_PERSISTENT_SAMPLES is modified to reflect that an IUS operation is no + // longer in progress. + if (PSRowUpdated) + end_IUS_work(); + + // Used in end_IUS_work(), must call it first. + NADELETEBASIC(PST_IUSrequestedSampleRows_, STMTHEAP); + NADELETEBASIC(PST_IUSactualSampleRows_, STMTHEAP); + // reset the parser flags that were set in the constructor SQL_EXEC_ResetParserFlagsForExSqlComp_Internal(savedParserFlags); @@ -3035,7 +3060,7 @@ Lng32 HSGlobalsClass::Initialize() inserts, deletes, updates, numPartitions, minRowCtPerPartition_, - optFlags & SAMPLE_REQUESTED); + optFlags & (SAMPLE_REQUESTED | IUS_OPT)); LM->StopTimer(); if (LM->LogNeeded()) { @@ -3403,7 +3428,7 @@ NABoolean HSGlobalsClass::isAuthorized(NABoolean isShowStats) return TRUE; // no privilege support available for hbase and hive tables - assert (objDef->getNATable()); + HS_ASSERT (objDef->getNATable()); if (CmpSeabaseDDL::isHbase(objDef->getCatName()) || isHiveCat(objDef->getCatName())) return TRUE; @@ -3621,14 +3646,104 @@ void HSGlobalsClass::startJitLogging(const char* checkPointName, Int64 elapsedSe } } +// The optimal degree of parallelism for a LOAD or UPSERT is the number of +// partitions of the source table. This forces that by setting the cqd +// PARALLEL_NUM_ESPS. Note that when the default for AGGRESSIVE_ESP_ALLOCATION_PER_CORE +// is permanently changed to 'ON', we may be able to remove this. +// tblDef -- ptr to HSTableDef from which to get the catalog and schema name of +// the source table. +// tblName -- unqualified name of the source table. If NULL, then the source +// table is the one represented by tblDef. +// Returns TRUE if the cqd was successfully set, FALSE otherwise. If TRUE is returned, +// then resetEspParallelism() may be called to reset the cqd. +static NABoolean setEspParallelism(HSTableDef* tblDef, const char* tblName = NULL) +{ + HSLogMan *LM = HSLogMan::Instance(); + Lng32 retcode = 0; + Lng32 numPartitions = 0; + if (!tblName) + numPartitions = tblDef->getNumPartitions(); + else + { + HSCursor cursor; + NAString numPartitionsQuery; + numPartitionsQuery.append("select t.num_salt_partns from \"_MD_\".OBJECTS O, \"_MD_\".TABLES T where o.catalog_name = '") + .append(tblDef->getCatName()) + .append("' and o.schema_name = '") + .append(tblDef->getSchemaName()) + .append("' and o.object_name = '") + .append(tblName) + .append("' and o.object_uid = t.table_uid"); + retcode = cursor.fetchNumColumn(numPartitionsQuery, &numPartitions, NULL); + if (retcode != 0) + { + if (LM->LogNeeded()) + { + sprintf(LM->msg, "PARALLEL_NUM_ESPS not set; query to get # partitions received sqlcode %d", retcode); + LM->Log(LM->msg); + } + return FALSE; + } + } + NABoolean espCQDUsed = FALSE; + if (numPartitions > 1) + { + char temp[25]; + sprintf(temp, "'%d'", numPartitions); + NAString espsCQD = "CONTROL QUERY DEFAULT PARALLEL_NUM_ESPS "; + espsCQD += temp; + retcode = HSFuncExecQuery(espsCQD); + if (retcode < 0) + { + if (LM->LogNeeded()) + { + sprintf(LM->msg, "cqd statement to set PARALLEL_NUM_ESPS failed with %d", retcode); + LM->Log(LM->msg); + } + } + else + { + espCQDUsed = TRUE; + if (LM->LogNeeded()) + { + sprintf(LM->msg, "PARALLEL_NUM_ESPS was set to %d", numPartitions); + LM->Log(LM->msg); + } + } + } + else if (LM->LogNeeded()) + { + sprintf(LM->msg, "PARALLEL_NUM_ESPS not set; # partitions reported as %d", numPartitions); + LM->Log(LM->msg); + } + + return espCQDUsed; +} + +// Reset the cqd PARALLEL_NUM_ESPS. This is the other bookend for setEspParallelism(), +// which returns TRUE if the cqd is actually set within that function. If so, this +// function can be called to reset it. +static void resetEspParallelism() +{ + HSLogMan *LM = HSLogMan::Instance(); + Lng32 retcode = HSFuncExecQuery("CONTROL QUERY DEFAULT PARALLEL_NUM_ESPS RESET"); + if (retcode) + { + HSLogMan *LM = HSLogMan::Instance(); + if (LM->LogNeeded()) + { + HSLogMan *LM = HSLogMan::Instance(); + sprintf(LM->msg, "cqd statement to reset PARALLEL_NUM_ESPS returned %d", retcode); + LM->Log(LM->msg); + } + } +} /**********************************************/ /* METHOD: createSampleOption() */ /* PURPOSE: create text for use in sample */ /* query. */ -/* RETCODE: 0 - on success */ -/* non-zero otherwise */ /* INPUT: sampleType - the sample option type*/ /* set when parsing. */ /* samplePercent - % */ @@ -3636,10 +3751,9 @@ void HSGlobalsClass::startJitLogging(const char* checkPointName, Int64 elapsedSe /* when parsing. */ /* OUTPUT: sampleOpt - sample text option. */ /**********************************************/ -Lng32 createSampleOption(Lng32 sampleType, double samplePercent, NAString &sampleOpt, +void createSampleOption(Lng32 sampleType, double samplePercent, NAString &sampleOpt, Int64 sampleValue1, Int64 sampleValue2) { - Lng32 retcode = 0; char floatStr[30], intStr[30]; switch (sampleType) { @@ -3671,10 +3785,9 @@ Lng32 createSampleOption(Lng32 sampleType, double samplePercent, NAString &sampl sampleOpt += " ROWS "; break; default: // Invalid option. - retcode = -1; + HS_ASSERT(FALSE); break; } - return retcode; } @@ -3690,7 +3803,7 @@ void HSSample::makeTableName(NABoolean isPersSample) NABoolean unpartitionedSample = FALSE; if (objDef->getObjectFormat() == SQLMX) { - //The naming convention used for the temporary sample table is 'SQLMX_' + //The naming convention used for the temporary sample table is 'TRAF_SAMPLE_' //followed by the object_uid of the source table and a portion of //the timestamp. The object_uid ensures no collisions with update stats //for other tables, while the timestamp chars help avoid collision when @@ -3742,7 +3855,7 @@ void HSSample::makeTableName(NABoolean isPersSample) convertInt64ToAscii(objDef->getObjectUID(), objectIDStr); sprintf(timestampStr, "_%u_%u", (UInt32)tv.tv_sec, (UInt32)tv.tv_usec); - sampleTable += "SQLMX_"; + sampleTable += "TRAF_SAMPLE_"; sampleTable += objectIDStr; sampleTable += timestampStr; @@ -3789,7 +3902,6 @@ Lng32 HSSample::make(NABoolean rowCountIsEstimate, // input Int64 &sampleRowCnt, // input/output NABoolean isPersSample, // input. Default value is FALSE NABoolean unpartitionedSample,// input. Default value is TRUE - NABoolean createDandI, Int64 minRowCtPerPartition ) { @@ -3812,10 +3924,10 @@ Lng32 HSSample::make(NABoolean rowCountIsEstimate, // input // Create sample option based on sampling type, using 'samplePercent'. if (hs_globals) - retcode = createSampleOption(sampleType, samplePercent, sampleOption, - hs_globals->sampleValue1, hs_globals->sampleValue2); - else retcode = createSampleOption(sampleType, samplePercent, sampleOption); - HSHandleError(retcode); + createSampleOption(sampleType, samplePercent, sampleOption, + hs_globals->sampleValue1, hs_globals->sampleValue2); + else + createSampleOption(sampleType, samplePercent, sampleOption); //Normally, we want to create an AUDITED scratch table. Although, for //performance and TMF timeouts, we should use a NON-AUDITED table and @@ -3826,7 +3938,7 @@ Lng32 HSSample::make(NABoolean rowCountIsEstimate, // input //AUTOABORT time. This process may take a long time and blow away //your transaction. LM->StartTimer("Create sample table"); - retcode = create(unpartitionedSample, isPersSample, createDandI); + retcode = create(unpartitionedSample, isPersSample); LM->StopTimer(); if (retcode == -HS_PKEY_FLOAT_ERROR) { // If creation of sample table fails with -1120, then the primary key @@ -3851,19 +3963,7 @@ Lng32 HSSample::make(NABoolean rowCountIsEstimate, // input // For Hive tables the sample table used is a Trafodion table if (hs_globals->isHbaseTable || hs_globals->isHiveTable) { - // The optimal degree of parallelism for the LOAD or UPSERT is - // the number of partitions of the original table. Force that. - // Note that when the default for AGGRESSIVE_ESP_ALLOCATION_PER_CORE - // is permanently changed to 'ON', we may be able to remove this CQD. - if (hs_globals->objDef->getNumPartitions() > 1) - { - char temp[40]; // way more space than needed, but it's safe - sprintf(temp,"'%d'",hs_globals->objDef->getNumPartitions()); - NAString EspsCQD = "CONTROL QUERY DEFAULT PARALLEL_NUM_ESPS "; - EspsCQD += temp; - HSFuncExecQuery(EspsCQD); - EspCQDUsed = TRUE; // remember to reset later - } + EspCQDUsed = setEspParallelism(hs_globals->objDef); // Set CQDs controlling HBase cache size (number of rows returned by HBase // in a batch) to avoid scanner timeout. Reset these after the sample query @@ -4020,7 +4120,7 @@ Lng32 HSSample::make(NABoolean rowCountIsEstimate, // input } if (EspCQDUsed) { - HSFuncExecQuery("CONTROL QUERY DEFAULT PARALLEL_NUM_ESPS RESET"); + resetEspParallelism(); } if (retcode) TM->Rollback(); @@ -4038,6 +4138,7 @@ Lng32 HSSample::make(NABoolean rowCountIsEstimate, // input if ((sampleRowCount == 0) && // sample set is empty; (CmpCommon::getDefault(USTAT_USE_BULK_LOAD) == DF_OFF)) { // cannot generate histograms + drop(); // drop the sample table we created HSFuncMergeDiags(- UERR_SAMPLE_SET_IS_ZERO); retcode = -1; } @@ -4471,6 +4572,7 @@ static void mapInternalSortTypes(HSColGroupStruct *groupList, NABoolean forHive { HSColumnStruct &col = group->colSet[0]; NAString columnName, dblQuote="\""; + group->ISSelectExpn = ""; // If retrieving from the Hive backing sample for a table, positional names // are used. This avoids any issues with Trafodion delimited ids that do @@ -4847,7 +4949,7 @@ void HSGlobalsClass::getMemoryRequirements(HSColGroupStruct* group, Int64 rows) if (LM->LogNeeded()) { - sprintf(LM->msg, "Memory estimates for signle group based on " PF64 " rows", rows); + sprintf(LM->msg, "Memory estimates for single group based on " PF64 " rows", rows); LM->Log(LM->msg); } @@ -5103,7 +5205,8 @@ Lng32 HSGlobalsClass::CollectStatistics() // Create a persistent sample table. It will be used for this non-IUS // execution of Update Stats, and updated incrementally for subsequent // IUS operations. - HSPersSamples *sampleList = HSPersSamples::Instance(objDef->getCatName()); + HSPersSamples *sampleList = HSPersSamples::Instance(objDef->getCatName(), + objDef->getSchemaName()); if (!sampleList) return -1; // sample list didn't exist and failed creation retcode = sampleList->createAndInsert(objDef, @@ -5132,254 +5235,21 @@ Lng32 HSGlobalsClass::CollectStatistics() { return CollectStatisticsWithFastStats(); } - else if ( canDoIUS() ) + else if (canDoIUS()) { - // Make sure the Where clause doesn't contain any constructs we don't allow - // in the context of an IUS statement. - retcode = validateIUSWhereClause(); - HSHandleError(retcode); - - ISMemPercentage_ = (float)CmpCommon::getDefaultNumeric(USTAT_IS_MEMORY_FRACTION); - - char ius_update_history_buffer[129]; - retcode = begin_IUS_work(ius_update_history_buffer); + // Use IUS and use output param 'done' to indicate if we need to carry + // on with RUS code below, or if IUS did it all and we can return. + NABoolean done = FALSE; + retcode = doIUS(done); HSHandleError(retcode); + if (done) + return retcode; - Int64 currentSampleSize = 0; - Int64 futureSampleSize = 0; - - retcode = computeSampleSizeForIUS(currentSampleSize, futureSampleSize); - HSHandleErrorIUS(retcode); - - // Setup the memory requirement for singlegroup in each memNeeded field, - // Since singleGroup will receive the computed new stats, we compute - // the memory needs by using futureSampleSize. - mapInternalSortTypes(singleGroup); - getMemoryRequirements(singleGroup, futureSampleSize); - - // ===================================================================== - // create the inMemory delete table, the cstr also setups the memory - // requirement utilizing the currentSampleSize as #rows. - // ===================================================================== - iusSampleDeletedInMem = new(STMTHEAP) - HSInMemoryTable(*hssample_table, - getWherePredicateForIUS(), - currentSampleSize - ); - - // ===================================================================== - // Similarly, create the inMemory insert table. - // ===================================================================== - NAString sampleTable_I(*hssample_table); - sampleTable_I.append("_I"); - - iusSampleInsertedInMem = new(STMTHEAP) - HSInMemoryTable(sampleTable_I, - getWherePredicateForIUS(), - futureSampleSize,// worse case - sampleRateAsPercetageForIUS); - - // ===================================================================== - // Find out which group has persistent CBFs and set the delayedRead flag - // for it. Do it for all groups. - // ===================================================================== - detectPersistentCBFsForIUS(*hssample_table, singleGroup); - - - Int32 colsSelected = 0; - NABoolean ranOutOfMem = FALSE; - - while ( moreColsForIUS() > 0 ) { - - // Select a set of columns for IUS, based on the availability - // of memory. Selected columns will be marked in PENDING state. - // The number of columns selected is returned. - // - // If a particular column has persistent CBF, its column - // structure's delayedRead is set to TRUE. - // - retcode = selectIUSBatch(currentSampleSize, futureSampleSize, ranOutOfMem, colsSelected); - HSHandleErrorIUS(retcode); - checkTime("after selecting batch of columns for IUS"); - - // - // Require at least one column in the persistent sample table - // to be read into memory in one batch - // - if ( colsSelected == 0 ) { - if ( ranOutOfMem ) { - diagsArea << DgSqlCode(UERR_IUS_NO_SUFFICIENT_MEMORY); - retcode = -1; - HSHandleErrorIUS(retcode); - } else { - if (LM->LogNeeded()) - LM->Log("Empty IUS batch, not because of memory."); - } - } - - - // process column groups that are in PENDING state - // Data from columns with delayedRead set to FALSE (i.e., no - // corresponding persistent CBFs) will be read in. - // - // The rows to be deleted in one shot later on will be read in - // from the sample table. - // - retcode = CollectStatisticsForIUS(currentSampleSize, futureSampleSize); - HSHandleErrorIUS(retcode); - - // - // Fall back to let internal sort based RUS to take care of any - // groups failing to be updated via IUS. These groups are in PENDING - // state. If all groups are processed, no more read and we are done! - // - HSColGroupStruct *group = singleGroup; - Int32 cols = 0; - Int32 colsToRead = 0; - - // First mask out those groups that already have data read in. - // These groups should have delayedRead flag set to FALSE. - // - // We do need to properly merge the data from S(i-1), D and I - // together, so that group->data points at the merged data and - // group->nextdata points at the end+1 of the merged data. - // - // The merge algorithm: - // - // Allocate a temp. buffer of size (|S(i-1)| + |I|) - // For all data item v in S(i-1) and I do - // if ( v in cbf ) { - // append data to the temp. buffer - // remove one instance of v from cbf - // } - // delete group->data - // set group->data = temp. buffer - // set group->nextdata = temp. buffer's size + 1 - - while (group) { - if (group->state==PENDING) { - - if ( group->delayedRead == FALSE ) { - group->state=SKIP; - } else - colsToRead++; - - cols++; - } - group = group->next; - } - - - if ( cols > 0 ) { - - if (LM->LogNeeded()) - LM->StartTimer("IUS: process failed groups with RUS"); - - // First we need to read in column data if it has not been - // read previously (i.e., those with delayedRead is TRUE, or - // it has persistent CBF). - if ( colsToRead > 0 ) - { - HSCursor cursor; - // Read from the persistent sample table and use smplGroup to - // hold the data read. All columns in PENDING state - // will be read in. - retcode = readColumnsIntoMem(&cursor, currentSampleSize); - - HSHandleErrorIUS(retcode); - checkTime("after reading pending columns from persistent sample table into memory for IUS->RUS reversion"); - - } - - group = singleGroup; - while (group) { - if (group->state==SKIP) { - group->state=PENDING; - } - group = group->next; - } - - // Fill each group (in PENDING state)'s data area with data merged from - // cbf, S(I-1) and I. - retcode = mergeDatasetsForIUS(); - HSHandleErrorIUS(retcode); - checkTime("after merging datasets for IUS->RUS reversion"); - - group = singleGroup; - while (group) { - if (group->state==PENDING) { - - // Delete the histogram allocated for IUS. The RUS step - // below will recompute groupHist. - delete group->groupHist; - group->groupHist = NULL; - } - group = group->next; - } - - - // Remove all persistent CBFs with PENDING state - // because RUS may generates a histogram with different - // #intervals than that recorded in CBF! If the CBFs - // are left undeleted, the encoded intervals (as - // # of buckets) in CBF can be in conflict with - // the actual # of intervals computed by RUS. - retcode = deletePersistentCBFsForIUS(*hssample_table, singleGroup); - HSHandleErrorIUS(retcode); - - retcode = sortByColInMem(); - HSHandleErrorIUS(retcode); - - retcode = createStats(0 /* dummy argument */); - HSHandleErrorIUS(retcode); - - if (LM->LogNeeded()) - LM->StopTimer(); - } - } - - - - Int32 iusUnprocessed = 0; - // Reverse the NO_STATS state to UNPROCESSED and count - // total unprocessed - HSColGroupStruct *group = singleGroup; - while (group != NULL) { - if (group->state == NO_STATS ) { - group->state = UNPROCESSED; - iusUnprocessed ++; - } else - if (group->state == UNPROCESSED ) - iusUnprocessed ++; - - group = group->next; - } - - - - if ( iusUnprocessed > 0 ) { - // - // Fall back to the RUS code below when there is no sufficient memory to - // process even one batch of columns, or there is no stats as a base to - // compute IUS. - - // Remove all persistent CBFs with PENDING state - // because RUS may generates a histogram with different - // #intervals than that recorded in CBF! If the CBFs - retcode = deletePersistentCBFsForIUS(*hssample_table, singleGroup); - HSHandleErrorIUS(retcode); - - - // setup the sample size - sampleRowCount = futureSampleSize; - - } else { - - if (multiGroup) // don't return if there are MCs to process - sampleRowCount = futureSampleSize; - else - return retcode; - } + // Set sampling parameters to do an RUS corresponding to the existing + // persistent sample. + useSampling = TRUE; + externalSampleTable = TRUE; + sampleTblPercent = sampleRateAsPercetageForIUS * 100; // used for scaling results } NAString internalSortCQDValue = ActiveSchemaDB()->getDefaults().getValue(USTAT_INTERNAL_SORT); @@ -5389,8 +5259,8 @@ Lng32 HSGlobalsClass::CollectStatistics() // internal sort disabled if (LM->LogNeeded()) LM->Log("Internal sort is disabled"); if (useSampling && !externalSampleTable) - retcode = sampleTable.make(currentRowCountIsEstimate_, - *hssample_table, + retcode = sampleTable.make(currentRowCountIsEstimate_, + *hssample_table, actualRowCount, sampleRowCount); // hssample_table assigned, actualRowCount and sampleRowCount may get adjusted. else if (!externalSampleTable) @@ -5510,10 +5380,9 @@ Lng32 HSGlobalsClass::CollectStatistics() LM->Log("Internal sort: reading sample directly from base table; no sample table created"); *hssample_table = getTableName(user_table->data(), nameSpace); // sampleTblPercent and sampleRowCount may get adjusted. - retcode = createSampleOption(optFlags & SAMPLE_REQUESTED, - sampleTblPercent, *sampleOption, - sampleValue1, sampleValue2); - HSHandleError(retcode); + createSampleOption(optFlags & SAMPLE_REQUESTED, + sampleTblPercent, *sampleOption, + sampleValue1, sampleValue2); sampleTableUsed = FALSE; samplingUsed = TRUE; @@ -5526,8 +5395,8 @@ Lng32 HSGlobalsClass::CollectStatistics() else { if (useSampling && !externalSampleTable) - retcode = sampleTable.make(currentRowCountIsEstimate_, - *hssample_table, + retcode = sampleTable.make(currentRowCountIsEstimate_, + *hssample_table, actualRowCount, sampleRowCount); // hssample_table assigned, actualRowCount and sampleRowCount may get adjusted. else if (!externalSampleTable) @@ -5831,6 +5700,311 @@ Lng32 HSGlobalsClass::CollectStatistics() return retcode; } +// Do the setup work for IUS, and call either doFullIUS() or prepareToUsePersistentSample(), +// depending on whether the USTAT_INCREMENTAL_UPDATE_STATISTICS cqd is ON or +// SAMPLE. In the latter case prepareToUsePersistentSample() is called to update +// the persistent sample incrementally, and then the normal Update Stats algorithm +// executes using the persistent sample table. +// +// The 'done' parameter is returned with a value of TRUE if stats are completely +// handled by doFullIUS(). If prepareToUsePersistentSample() is called instead, +// or if doFullIUS() is unable to incrementally update the stats for one or more +// columns (e.g., shape test failure), 'done' will be set to FALSE. +Lng32 HSGlobalsClass::doIUS(NABoolean& done) +{ + done = FALSE; // set to TRUE if IUS successfully updates the stats for all columns + Lng32 retcode = 0; + + // Make sure the Where clause doesn't contain any constructs we don't allow + // in the context of an IUS statement. + retcode = validateIUSWhereClause(); + HSHandleError(retcode); + + retcode = begin_IUS_work(); + HSHandleError(retcode); + + Int64 currentSampleSize = 0; + Int64 futureSampleSize = 0; + + retcode = computeSampleSizeForIUS(currentSampleSize, futureSampleSize); + HSHandleErrorIUS(retcode); + + DefaultToken iusOption = CmpCommon::getDefault(USTAT_INCREMENTAL_UPDATE_STATISTICS); + if (iusOption == DF_ON) + return doFullIUS(currentSampleSize, futureSampleSize, done); + else if (iusOption == DF_SAMPLE) + // Leave 'done' FALSE; prepareToUsePersistentSample() updates the persistent sample + // table in preparation for use by RUS. + return prepareToUsePersistentSample(currentSampleSize, futureSampleSize); + else + { + // Exception will be thrown, ~HSGlobalsClass will call end_IUS_work(). + HS_ASSERT(false); + return -1; // avoid 'no return' warning + } +} + +// Try to incrementally update existing histograms using in-memory tables and +// CBFs. If reversion to RUS is required for one or more columns, the 'done' +// output parameter will be set to FALSE. +Lng32 HSGlobalsClass::doFullIUS(Int64 currentSampleSize, + Int64 futureSampleSize, + NABoolean& done) +{ + done = FALSE; // unless IUS handles all columns + HSLogMan* LM = HSLogMan::Instance(); + Lng32 retcode = 0; + ISMemPercentage_ = (float)CmpCommon::getDefaultNumeric(USTAT_IS_MEMORY_FRACTION); + + // Setup the memory requirement for singlegroup in each memNeeded field, + // Since singleGroup will receive the computed new stats, we compute + // the memory needs by using futureSampleSize. + mapInternalSortTypes(singleGroup); + getMemoryRequirements(singleGroup, futureSampleSize); + + // ===================================================================== + // create the inMemory delete table, the cstr also setups the memory + // requirement utilizing the currentSampleSize as #rows. + // ===================================================================== + iusSampleDeletedInMem = new(STMTHEAP) + HSInMemoryTable(*hssample_table, + getWherePredicateForIUS(), + currentSampleSize + ); + + // ===================================================================== + // Similarly, create the inMemory insert table. + // ===================================================================== + NAString sampleTable_I(*hssample_table); + sampleTable_I.append("_I"); + + iusSampleInsertedInMem = new(STMTHEAP) + HSInMemoryTable(sampleTable_I, + getWherePredicateForIUS(), + futureSampleSize,// worse case + sampleRateAsPercetageForIUS); + + // ===================================================================== + // Find out which group has persistent CBFs and set the delayedRead flag + // for it. Do it for all groups. + // ===================================================================== + detectPersistentCBFsForIUS(*hssample_table, singleGroup); + + + Int32 colsSelected = 0; + NABoolean ranOutOfMem = FALSE; + + while ( moreColsForIUS() > 0 ) { + + // Select a set of columns for IUS, based on the availability + // of memory. Selected columns will be marked in PENDING state. + // The number of columns selected is returned. + // + // If a particular column has persistent CBF, its column + // structure's delayedRead is set to TRUE. + // + retcode = selectIUSBatch(currentSampleSize, futureSampleSize, ranOutOfMem, colsSelected); + HSHandleErrorIUS(retcode); + checkTime("after selecting batch of columns for IUS"); + + // + // Require at least one column in the persistent sample table + // to be read into memory in one batch + // + if ( colsSelected == 0 ) { + if ( ranOutOfMem ) { + diagsArea << DgSqlCode(UERR_WARNING_IUS_INSUFFICIENT_MEMORY) + << DgInt0(moreColsForIUS()); + break; // Let RUS handle the rest + } else { + if (LM->LogNeeded()) + LM->Log("Empty IUS batch, not because of memory."); + } + } + + + // process column groups that are in PENDING state + // Data from columns with delayedRead set to FALSE (i.e., no + // corresponding persistent CBFs) will be read in. + // + // The rows to be deleted in one shot later on will be read in + // from the sample table. + // + retcode = CollectStatisticsForIUS(currentSampleSize, futureSampleSize); + HSHandleErrorIUS(retcode); + + // + // Fall back to let internal sort based RUS to take care of any + // groups failing to be updated via IUS. These groups are in PENDING + // state. If all groups are processed, no more read and we are done! + // + HSColGroupStruct *group = singleGroup; + Int32 cols = 0; + Int32 colsToRead = 0; + + // First mask out those groups that already have data read in. + // These groups should have delayedRead flag set to FALSE. + // + // We do need to properly merge the data from S(i-1), D and I + // together, so that group->data points at the merged data and + // group->nextdata points at the end+1 of the merged data. + // + // The merge algorithm: + // + // Allocate a temp. buffer of size (|S(i-1)| + |I|) + // For all data item v in S(i-1) and I do + // if ( v in cbf ) { + // append data to the temp. buffer + // remove one instance of v from cbf + // } + // delete group->data + // set group->data = temp. buffer + // set group->nextdata = temp. buffer's size + 1 + + while (group) { + if (group->state==PENDING) { + + if ( group->delayedRead == FALSE ) { + group->state=SKIP; + } else + colsToRead++; + + cols++; + } + group = group->next; + } + + + if ( cols > 0 ) { + + if (LM->LogNeeded()) + LM->StartTimer("IUS: process failed groups with RUS"); + + // First we need to read in column data if it has not been + // read previously (i.e., those with delayedRead is TRUE, or + // it has persistent CBF). + if ( colsToRead > 0 ) + { + HSCursor cursor; + // Read from the persistent sample table and use smplGroup to + // hold the data read. All columns in PENDING state + // will be read in. + retcode = readColumnsIntoMem(&cursor, currentSampleSize); + + HSHandleErrorIUS(retcode); + checkTime("after reading pending columns from persistent sample table into memory for IUS->RUS reversion"); + + } + + group = singleGroup; + while (group) { + if (group->state==SKIP) { + group->state=PENDING; + } + group = group->next; + } + + // Fill each group (in PENDING state)'s data area with data merged from + // cbf, S(I-1) and I. + retcode = mergeDatasetsForIUS(); + HSHandleErrorIUS(retcode); + checkTime("after merging datasets for IUS->RUS reversion"); + + group = singleGroup; + while (group) { + if (group->state==PENDING) { + + // Delete the histogram allocated for IUS. The RUS step + // below will recompute groupHist. + delete group->groupHist; + group->groupHist = NULL; + } + group = group->next; + } + + + // Remove all persistent CBFs with PENDING state + // because RUS may generates a histogram with different + // #intervals than that recorded in CBF! If the CBFs + // are left undeleted, the encoded intervals (as + // # of buckets) in CBF can be in conflict with + // the actual # of intervals computed by RUS. + retcode = deletePersistentCBFsForIUS(*hssample_table, singleGroup, PENDING); + HSHandleErrorIUS(retcode); + + retcode = sortByColInMem(); + HSHandleErrorIUS(retcode); + + retcode = createStats(0 /* dummy argument */); + HSHandleErrorIUS(retcode); + + if (LM->LogNeeded()) + LM->StopTimer(); + } + } // while ( moreColsForIUS() > 0 ) + + // The _I table can be dropped after using it to update the persistent sample + // table, which must be done before doing RUS on any unprocessed columns (RUS + // will use the updated persistent sample). + retcode = UpdateIUSPersistentSampleTable(currentSampleSize, futureSampleSize, sampleRowCount); + HSHandleErrorIUS(retcode); + if (sampleIExists_) { + retcode = drop_I(*hssample_table); + HSHandleErrorIUS(retcode); + sampleIExists_ = FALSE; // only try to drop it once + } + + Int32 iusUnprocessed = 0; + // Reverse the NO_STATS state to UNPROCESSED and count + // total unprocessed + HSColGroupStruct *group = singleGroup; + while (group != NULL) { + if (group->state == NO_STATS ) { + group->state = UNPROCESSED; + iusUnprocessed ++; + } else + if (group->state == UNPROCESSED ) + iusUnprocessed ++; + + group = group->next; + } + + // Leave the 'done' parameter FALSE so we continue with the RUS code upon return + // if there are unprocessed columns (not enough memory or no prior stats as a + // base to compute IUS), or if there are MCs to process. + if ( iusUnprocessed > 0 ) { + // Remove all persistent CBFs with UNPROCESSED state because RUS may generate + // a histogram with different #intervals than that recorded in CBF! If the CBFs + // are left undeleted, the encoded intervals (as # of buckets) in CBF can be in + // conflict with the actual # of intervals computed by RUS. + retcode = deletePersistentCBFsForIUS(*hssample_table, singleGroup, UNPROCESSED); + HSHandleErrorIUS(retcode); + } else if (multiGroup) { // not done if there are MCs to process + } else { + done = TRUE; // no need to use RUS code + } + + return retcode; +} + +// This function makes all preparations for doing RUS using an updated IUS +// persistent sample table. The sample table is updated, and obsolete CBFs +// are discarded, +Lng32 HSGlobalsClass::prepareToUsePersistentSample(Int64 currentSampleSize, + Int64 futureSampleSize) +{ + Lng32 retcode = 0; + retcode = UpdateIUSPersistentSampleTable(currentSampleSize, futureSampleSize, sampleRowCount); + HSHandleErrorIUS(retcode); + + // If there are existing CBFs, they will be obsolete once the current operation + // completes. + retcode = deletePersistentCBFsForIUS(*hssample_table, singleGroup, UNPROCESSED); + HSHandleErrorIUS(retcode); + + return retcode; +} + // // A help function to generate a SQL timestamp constant. Example: '2012-01-01 23:59:00' // @@ -5896,7 +6070,7 @@ void genArkcmpInfo(NAString& nidpid) // against a target table. The transaction is established by updating the row in the // PERSISTENT_SAMPLES table about the persistent sample table used by the IUS: // 1. UPDATE_DATE field is populated with the current timestamp; -// 2. IUS_UPDATE_HISTORY field is populated with the SQ node ID and process ID +// 2. UPDATER_INFO field is populated with the SQ node ID and process ID // of the tdm_arkcmp process performing the IUS work // The method will return 0 after the above successful updates, indicating that the IUS // work can proceed. @@ -5909,8 +6083,8 @@ void genArkcmpInfo(NAString& nidpid) // CQD(USTAT_IUS_MAX_TRANSACTION_DURATION)), the ongoing transaction is // considered legitimate, and the current call to the method will return an error // indicating that a concurrent IUS is in progress. -// The argument ius_update_history_buffer will be filled with the string -// read from the corresponding field IUS_UPDATE_HISTORY. +// ius_update_history_buffer will be filled with the string read from the +// UPDATER_INFO column. // // When P2-P1 > CQD(USTAT_IUS_MAX_TRANSACTION_DURATION), the on-going transaction is // considered over-due and will be discarded. The method proceeds as if there was @@ -5923,14 +6097,17 @@ void genArkcmpInfo(NAString& nidpid) // The CQD USTAT_IUS_MAX_TRANSACTION_DURATION specifies the max transaction // duration allowed with the unit in minutes. The default value is 720 minutes (12 hours). -Lng32 HSGlobalsClass::begin_IUS_work(char* ius_update_history_buffer) +Lng32 HSGlobalsClass::begin_IUS_work() { + sampleIExists_ = FALSE; // keep track of whether a _I table needs to be dropped + #ifdef _DEBUG if (CmpCommon::getDefault(USTAT_IUS_NO_BLOCK) == DF_ON) return 0; -#endif +#endif - HSPersSamples *sampleList = HSPersSamples::Instance(objDef->getCatName()); + HSPersSamples *sampleList = HSPersSamples::Instance(objDef->getCatName(), + objDef->getSchemaName()); if ( !sampleList ) return -1; Int64 updTimestamp = 0; @@ -5938,11 +6115,20 @@ Lng32 HSGlobalsClass::begin_IUS_work(char* ius_update_history_buffer) HSTranMan *TM = HSTranMan::Instance(); TM->Begin("READ AND UPDATE THE UPDATE DATE AND HISTORY from PERSISTENT SAMPLE TABLE"); - + char ius_update_history_buffer[129]; Lng32 retcode = sampleList->readIUSUpdateInfo(objDef, ius_update_history_buffer, &updTimestamp); - if (retcode != 100) // need to carry on if no row for this table - HSHandleError(retcode); + if (retcode == 100) + { + HSFuncMergeDiags(- UERR_IUS_NO_PERSISTENT_SAMPLE, + objDef->getObjectFullName().data()); + retcode = -1; + HSHandleError(retcode); + } + else + { + HSHandleError(retcode); + } time_t t; @@ -5988,19 +6174,23 @@ Lng32 HSGlobalsClass::begin_IUS_work(char* ius_update_history_buffer) retcode = sampleList->updIUSUpdateInfo(objDef, (char*)nid_pid_str.data(), - (char*)updTimestampStr.data()); + (char*)updTimestampStr.data(), + 0 /* don't write where condition now */); - if (retcode) - TM->Rollback(); - else - TM->Commit(); + if (retcode == 100) + { + HSFuncMergeDiags(- UERR_IUS_NO_PERSISTENT_SAMPLE, + objDef->getObjectFullName().data()); + retcode = -1; + } HSHandleError(retcode); // - // If we reach here, it means we have successfully stored the bdt and our process info - // into the UPDATE_DATE and IUS_UPDATE_HISTORY column. We can return TRUE. + // If we reach here, we have successfully stored the bdt and our process info + // into the UPDATE_DATE and UPDATER_INFO columns of SB_PERSISTENT_SAMPLES. // + PSRowUpdated = TRUE; return 0; } @@ -6010,16 +6200,25 @@ Lng32 HSGlobalsClass::begin_IUS_work(char* ius_update_history_buffer) // as follows. // // 1. UPDATE_DATE field is reset to a timestamp representing epoch time; -// 2. IUS_UPDATE_HISTORY field is reset to an empty string. +// 2. UPDATER_INFO field is reset to an empty string. Lng32 HSGlobalsClass::end_IUS_work() { + Lng32 retcode = 0; + if (sampleIExists_) + { + retcode = drop_I(*hssample_table); + // ignore retcode; we want to try the rest of this method as well + sampleIExists_ = FALSE; + } + #ifdef _DEBUG if (CmpCommon::getDefault(USTAT_IUS_NO_BLOCK) == DF_ON) return 0; #endif - HSPersSamples *sampleList = HSPersSamples::Instance(objDef->getCatName()); + HSPersSamples *sampleList = HSPersSamples::Instance(objDef->getCatName(), + objDef->getSchemaName()); if ( !sampleList ) return -1; // The epoch time @@ -6032,8 +6231,13 @@ Lng32 HSGlobalsClass::end_IUS_work() NAString updTimestampStr; genSQLTimestampConstant(bdt, updTimestampStr); - Lng32 retcode = - sampleList->updIUSUpdateInfo(objDef, (char*)"", (char*)updTimestampStr.data()); + retcode = + sampleList->updIUSUpdateInfo(objDef, + (char*)"", + (char*)updTimestampStr.data(), + getWherePredicateForIUS(), + PST_IUSrequestedSampleRows_, + PST_IUSactualSampleRows_); HSHandleError(retcode); return 0; @@ -6104,35 +6308,36 @@ Lng32 HSGlobalsClass::computeSampleSizeForIUS(Int64& currentSampleSize, Int64& f return 0; } +// Before we have the PERSISTENT_DATA table available to us, we will +// save the CBFs as binary files on disk. One CBF maps to one binary file. +// The path of the directory for these files is specified in CQD +// USTAT_IUS_PERSISTENT_CBF_PATH, and the cbf file name is +// sampleTableName + '.' + 'colName'. This function builds the common initial +// text of the path for all columns in the same table, and assigns it to the +// output parameter filePrefix. +void HSGlobalsClass::getCBFFilePrefix(NAString& sampleTableName, NAString& filePrefix) +{ + filePrefix = ActiveSchemaDB()->getDefaults().getValue(USTAT_IUS_PERSISTENT_CBF_PATH); + filePrefix.append("/") + .append(sampleTableName) + .append("."); +} void HSGlobalsClass::detectPersistentCBFsForIUS(NAString& sampleTableName, HSColGroupStruct *group) { - // Before we have the PERSISTENT_DATA table available to us, we will - // save the CBFs as binary files on disk. One CBF maps to one binary file. - // The path of the directory for these files is specified in CQD - // USTAT_IUS_PERSISTENT_CBF_PATH, and the cbf binary file name is - // sampleTableName + '.' + 'colName'; - - NAString path = - ActiveSchemaDB()->getDefaults().getValue(USTAT_IUS_PERSISTENT_CBF_PATH); - - NAString cbfFilePrefix(path); - cbfFilePrefix.append("/"); - cbfFilePrefix.append(sampleTableName); - cbfFilePrefix.append("."); - + NAString cbfFilePrefix; + getCBFFilePrefix(sampleTableName, cbfFilePrefix); struct stat sts; while (group) { - NAString cbfFile(cbfFilePrefix); - cbfFile.append(group->colSet[0].colname->data()); + cbfFile.append(group->cbfFileNameSuffix()); if (stat(cbfFile, &sts) == -1 && errno == ENOENT) group->delayedRead = FALSE; else - group->delayedRead = TRUE; + group->delayedRead = TRUE; group = group->next; } @@ -6197,7 +6402,7 @@ Lng32 HSGlobalsClass::prepareForIUSAlgorithm1(Int64& rows) // Populate the deleted rows into a in-memory table. NAString delQuery; - iusSampleDeletedInMem->generateDeleteQuery(*hssample_table, delQuery); + generateIUSDeleteQuery(*hssample_table, delQuery); @@ -6266,6 +6471,27 @@ Lng32 HSGlobalsClass::prepareForIUSAlgorithm1(Int64& rows) return 0; } +static Lng32 create_I(NAString& sampTblName) +{ + HSLogMan *LM = HSLogMan::Instance(); + if (LM->LogNeeded()) + LM->StartTimer("IUS: create _I table"); + + NAString createI("create table "); + createI += sampTblName; + createI += "_I LIKE "; + createI += sampTblName; + createI += " WITH PARTITIONS"; + Lng32 retcode = HSFuncExecQuery(createI, -UERR_INTERNAL_ERROR, + NULL, "IUS create I", + NULL, NULL, TRUE/*doRetry*/); + if (LM->LogNeeded()) + LM->StopTimer(); + + HSHandleError(retcode); + return retcode; +} + Lng32 HSGlobalsClass::generateSampleI(Int64 currentSampleSize, Int64 futureSampleSize, @@ -6280,11 +6506,11 @@ Lng32 HSGlobalsClass::generateSampleI(Int64 currentSampleSize, if (LM->LogNeeded()) LM->StartTimer("IUS: select-insert data set I"); - // performing: - // - // insert into <sample_I> - // (select * from <sourceTable> where <where> sample); - // + // performing: + // + // upsert using load into <sample_I> + // (select * from <sourceTable> where <where> sample); + // NAString sampleTable_I(*hssample_table); sampleTable_I.append("_I"); @@ -6292,18 +6518,20 @@ Lng32 HSGlobalsClass::generateSampleI(Int64 currentSampleSize, HSTranMan *TM = HSTranMan::Instance(); NABoolean transStarted = (TM->Begin("IUS clean data set I") == 0); - if (LM->LogNeeded()) LM->StartTimer("IUS: select-insert to form persistent I"); - - NAString insertSelectIQuery; iusSampleInsertedInMem->generateInsertSelectIQuery(sampleTable_I, *user_table, insertSelectIQuery, currentSampleSize, futureSampleSize, deleteSetSize, actualRowCount); + NABoolean needEspParReset = setEspParallelism(objDef); retcode = HSFuncExecQuery(insertSelectIQuery, -UERR_INTERNAL_ERROR, &xRows, "IUS data set I creation", - NULL, NULL, TRUE/*doRetry*/ ); + NULL, NULL, TRUE/*doRetry*/, + 0, TRUE); // check for MDAM usage + + if (needEspParReset) + resetEspParallelism(); if (retcode) TM->Rollback(); @@ -6323,22 +6551,17 @@ Lng32 HSGlobalsClass::generateSampleI(Int64 currentSampleSize, static Lng32 drop_I(NAString& sampTblName) { - NAString cleanupI("drop table "); + HSLogMan *LM = HSLogMan::Instance(); + if (LM->LogNeeded()) + LM->StartTimer("IUS: drop _I table"); + + NAString cleanupI("drop table if exists "); cleanupI.append(sampTblName).append("_I"); Lng32 retcode = HSFuncExecQuery(cleanupI, -UERR_INTERNAL_ERROR, NULL, "IUS cleanup I", NULL, NULL, TRUE/*doRetry*/); - HSHandleError(retcode); - return retcode; -} - -static Lng32 drop_D(NAString& sampTblName) -{ - NAString cleanupI("drop table "); - cleanupI.append(sampTblName).append("_D"); - Lng32 retcode = HSFuncExecQuery(cleanupI, -UERR_INTERNAL_ERROR, - NULL, "IUS cleanup D", - NULL, NULL, TRUE/*doRetry*/); + if (LM->LogNeeded()) + LM->StopTimer(); HSHandleError(retcode); return retcode; } @@ -6346,49 +6569,54 @@ static Lng32 drop_D(NAString& sampTblName) Lng32 HSGlobalsClass::CollectStatisticsForIUS(Int64 currentSampleSize, Int64 futureSampleSIze) { - Lng32 retcode = 0; - Int64 xRows = 0; - - HSLogMan *LM = HSLogMan::Instance(); - - // create help table -I and -D - { - diagsArea << DgSqlCode(-4222) - << DgString0("IUS Sample Table"); - } - + Lng32 retcode = 0; + Int64 xRows = 0; - if (LM->LogNeeded()) - LM->StartTimer("IUS: read in Si"); + HSLogMan *LM = HSLogMan::Instance(); - HSColGroupStruct *group = singleGroup; - while (group) { - if (group->delayedRead && group->state == PENDING) - group->state = SKIP ; // temp. set so that the column - // data is not to be read. - group = group->next; - } + // create help table _I + if (!sampleIExists_) + { + retcode = create_I(*hssample_table); + HSHandleError(retcode); + sampleIExists_ = TRUE; // so we remember to drop it + } + NABoolean havePending = FALSE; + HSColGroupStruct *group = singleGroup; + while (group) + { + if (group->state == PENDING) + { + if (group->delayedRead) + group->state = SKIP; // temp. set so that the column data is not to be read + else + havePending = TRUE; + } + group = group->next; + } - // Populate the selected rows into singleGroup. Use a C scope to allow - // the cursor to be deallocated after the reading of S. - // - // Only read in columns that are in PENDING state - { - HSCursor cursor; - // Read from the persistent sample table and use smplGroup to - // hold the data read. All columns in PENDING state - // will be read in. - retcode = readColumnsIntoMem(&cursor, currentSampleSize); - HSHandleError(retcode); - checkTime("after reading pending columns into memory for IUS"); - } + // Only read in Si if there is at least one column in this batch that doesn't + // already have a persistent CBF. + if (havePending) + { + if (LM->LogNeeded()) + LM->StartTimer("IUS: read in Si"); + // Populate the data areas for the PENDING columns from the persistent + // sample table. + HSCursor cursor; // on block exit, dtor will close/dealloc stmt and descriptors + retcode = readColumnsIntoMem(&cursor, currentSampleSize); + HSHandleError(retcode); + checkTime("after reading pending columns into memory for IUS"); - if (LM->LogNeeded()) - LM->StopTimer(); + if (LM->LogNeeded()) + LM->StopTimer(); + } + else if (LM->LogNeeded()) + LM->Log("IUS: skipped reading in Si; delayedRead true for all columns in batch"); - // restore the state field to PENDING. + // restore the state field to PENDING for any skipped groups. group = singleGroup; while (group) { if (group->delayedRead && group->state == SKIP) @@ -6415,6 +6643,7 @@ Lng32 HSGlobalsClass::CollectStatisticsForIUS(Int64 currentSampleSize, selectDQuery); retcode = iusSampleDeletedInMem->populate(selectDQuery); + HSHandleError(retcode); if (LM->LogNeeded()) LM->StopTimer(); @@ -6466,63 +6695,119 @@ Lng32 HSGlobalsClass::CollectStatisticsForIUS(Int64 currentSampleSize, if (LM->LogNeeded()) LM->StopTimer(); - - // Update the persistent sample table in several steps - { - HSFuncExecQuery("CONTROL QUERY DEFAULT ALLOW_DML_ON_NONAUDITED_TABLE 'ON'"); - - // start a new scope for the trasaction - HSTranController TC("IUS: update PS table", &retcode); - HSHandleError(retcode); + // Write CBFs for groups that are PROCESSED and delayedRead back to disk. + writeCBFstoDiskForIUS(*hssample_table, singleGroup); - // step 1 - delete the affected rows from PS - NAString deleteQuery; - iusSampleDeletedInMem->generateDeleteQuery(*hssample_table, deleteQuery); + return retcode; +} +// Returns the table component of the fully qualified name passed in, using the +// catalog and schema name from tblDef to determine where it starts (the table +// is in the same schema as the one referenced by tblDef). This avoids problems +// in parsing the fully qualified name posed by the possibility of periods within +// delimited identifiers. +static const char* extractTblName(const NAString& fullyQualifiedName, + HSTableDef* tblDef) +{ + Lng32 tblNameOffset = tblDef->getCatName().length() + + tblDef->getSchemaName().length() + + 2; // 2 dot separators + return fullyQualifiedName.data() + tblNameOffset; +} + +// Update the persistent sample table and determine its new cardinality. +// 1) Delete rows in the persistent sample satisfying the IUS predicate. +// 2) Insert the rows from <sampleTblName>_I into the persistent sample +// if the _I table was created. If building histograms from scratch +// using the persistent sample rather than incrementally changing them +// (USTAT_INCREMENTAL_UPDATE_STATISTICS is DF_SAMPLE), insert sampled +// rows satisfying the IUS where clause directly from the source table +// In either case, these rows constitute a random sample of rows from +// the source table that satisfy the IUS predicate. +// 3) From the prior cardinality of the sample table (oldSampleSize), subtract +// the number of rows deleted, add the number of rows inserted, and return +// the result in the newSampleSize parameter. +// +// This can't be done as part of end_IUS_work(), because that is called even +// when the IUS fails; its purpose is just to modify the SB_PERSISTENT_SAMPLES +// table to indicate that IUS is no longer in progress on the source table. +// The persistent sample table itself is only modified if IUS is successful. +Lng32 HSGlobalsClass::UpdateIUSPersistentSampleTable(Int64 oldSampleSize, + Int64 requestedSampleSize, + Int64& newSampleSize) +{ + Lng32 retcode = 0; + Int64 rowsAffected; + HSLogMan *LM = HSLogMan::Instance(); + newSampleSize = oldSampleSize; // before deleting/adding rows - if (LM->LogNeeded()) { - LM->Log("query to delete from PS:"); - LM->Log(deleteQuery.data()); - } + HSFuncExecQuery("CONTROL QUERY DEFAULT ALLOW_DML_ON_NONAUDITED_TABLE 'ON'"); + HSTranController TC("IUS: update PS table", &retcode); + HSHandleError(retcode); - retcode = HSFuncExecQuery(deleteQuery, -UERR_INTERNAL_ERROR, - &xRows, - "IUS delete from PS where", - NULL, NULL, TRUE/*doRetry*/ ); - HSHandleError(retcode); + // step 1 - delete the affected rows from PS + NAString deleteQuery; + generateIUSDeleteQuery(*hssample_table, deleteQuery); - // step 2 - add all rows from _I to PS - NAString selectInsertQuery; - iusSampleInsertedInMem->generateSelectInsertQuery(*hssample_table, *user_table, selectInsertQuery); + if (LM->LogNeeded()) { + LM->Log("query to delete from PS:"); + LM->Log(deleteQuery.data()); + LM->StartTimer("IUS: execute query to delete from PS"); + } - if (LM->LogNeeded()) { - LM->Log("query to insert into PS:"); - LM->Log(selectInsertQuery.data()); - } - - retcode = HSFuncExecQuery(selectInsertQuery, -UERR_INTERNAL_ERROR, - &xRows, - "IUS insert into PS (select from _I)", - NULL, NULL, TRUE/*doRetry*/ ); - HSHandleError(retcode); + rowsAffected = 0; + retcode = HSFuncExecQuery(deleteQuery, -UERR_INTERNAL_ERROR, + &rowsAffected, + "IUS delete from PS where", + NULL, NULL, TRUE/*doRetry*/ ); + if (LM->LogNeeded()) { + LM->StopTimer(); + sprintf(LM->msg, PF64 " rows deleted from persistent sample table.", rowsAffected); + LM->Log(LM->msg); + } + HSHandleError(retcode); + newSampleSize -= rowsAffected; - // step3 - drop _I table - retcode = drop_I(*hssample_table); - HSHandleError(retcode); + // step 2 - add all rows from _I to PS, or sampled from source table, + // depending on USTAT_INCREMENTAL_UPDATE_STATISTICS value. + NAString selectInsertQuery; + generateIUSSelectInsertQuery(*hssample_table, *user_table, selectInsertQuery); - // step4 - drop _D table - retcode = drop_D(*hssample_table); - HSHandleError(retcode); + if (LM->LogNeeded()) { + LM->Log("query to insert into PS:"); + LM->Log(selectInsertQuery.data()); + LM->StartTimer("IUS: execute query to insert into PS"); + } - HSFuncExecQuery("CONTROL QUERY DEFAULT ALLOW_DML_ON_NONAUDITED_TABLE reset"); + rowsAffected = 0; + const char* insSourceTblName = extractTblName(*hssample_table + "_I", objDef); + NABoolean needEspParReset = setEspParallelism(objDef, insSourceTblName); + retcode = HSFuncExecQuery(selectInsertQuery, -UERR_INTERNAL_ERROR, + &rowsAffected, + "IUS insert into PS (select from _I)", + NULL, NULL, TRUE/*doRetry*/, 0, + // check mdam usage if reading incremental sample directly from source table + CmpCommon::getDefault(USTAT_INCREMENTAL_UPDATE_STATISTICS) == DF_SAMPLE); //checkMdam + if (LM->LogNeeded()) { + LM->StopTimer(); + sprintf(LM->msg, PF64 " rows inserted into persistent sample table.", rowsAffected); + LM->Log(LM->msg); } + if (needEspParReset) + resetEspParallelism(); + HSHandleError(retcode); + newSampleSize += rowsAffected; - checkTime("after updating persistent sample table for IUS"); + // Save sample count values to update row in SB_PERSISTENT_SAMPLES table. + PST_IUSrequestedSampleRows_ = new(STMTHEAP) Int64; + *PST_IUSrequestedSampleRows_ = requestedSampleSize; + PST_IUSactualSampleRows_ = new(STMTHEAP) Int64; + *PST_IUSactualSampleRows_ = newSampleSize; - // Write CBFs for groups that are PENDING and delayedRead back to disk. - Int32 count = writeCBFstoDiskForIUS(*hssample_table,singleGroup); + HSFuncExecQuery("CONTROL QUERY DEFAULT ALLOW_DML_ON_NONAUDITED_TABLE reset"); + checkTime("after updating persistent sample table for IUS"); return retcode; } @@ -6531,25 +6816,13 @@ Int32 HSGlobalsClass::readCBFsIntoMemForIUS(NAString& sampleTableName, HSColGroupStruct* group ) { - // Before we have the PERSISTENT_DATA table available to us, we will - // save the CBFs as binary files on disk. One CBF maps to one binary file. - // The path of the directory for these files is specified in CQD - // USTAT_IUS_PERSISTENT_CBF_PATH, and the cbf binary file name is - // sampleTableName + '.' + 'colName'; - - NAString path = - ActiveSchemaDB()->getDefaults().getValue(USTAT_IUS_PERSISTENT_CBF_PATH); - - NAString cbfFilePrefix(path); - cbfFilePrefix.append("/"); - cbfFilePrefix.append(sampleTableName); - cbfFilePrefix.append("."); + NAString cbfFilePrefix; + getCBFFilePrefix(sampleTableName, cbfFilePrefix); Lng32 sz; Lng32 bufSz = 0; char* bufptr = NULL; struct stat sts; - char buffer[20]; while (group) { @@ -6560,8 +6833,7 @@ Int32 HSGlobalsClass::readCBFsIntoMemForIUS(NAString& sampleTableName, group->delayedRead = FALSE; NAString cbfFile(cbfFilePrefix); - str_itoa(group->colSet[0].colnum, buffer); - cbfFile.append(buffer); + cbfFile.append(group->cbfFileNameSuffix()); if (stat(cbfFile, &sts) == 0) { if ( bufSz < sts.st_size ) { @@ -6672,10 +6944,9 @@ Int32 HSGlobalsClass::writeCBFstoDiskForIUS(NAString& sampleTableName, HSColGroupStruct* group ) { - // We save the CBFs as binary files on disk. One CBF maps to one binary file. - // The path of the directory for these files is specified in CQD - // USTAT_IUS_PERSISTENT_CBF_PATH, and the cbf binary file name is - // sampleTableName + '.' + 'col_position_in_table'; + HSLogMan *LM = HSLogMan::Instance(); + if (LM->LogNeeded()) + LM->StartTimer("IUS: write CBF files to disk"); NAString path = ActiveSchemaDB()->getDefaults().getValue(USTAT_IUS_PERSISTENT_CBF_PATH); @@ -6688,21 +6959,21 @@ Int32 HSGlobalsClass::writeCBFstoDiskForIUS(NAString& sampleTableName, UInt64 totalSpaceInBlocks = 0; - if ( !getTotalDiskSizeInBlocks(path, totalSpaceInBlocks) ) + if ( !getTotalDiskSizeInBlocks(path, totalSpaceInBlocks) ) { + if (LM->LogNeeded()) + LM->StopTimer(); return 0; + } UInt64 totalAllowedInBlocks = MINOF(totalCBFsizeInMB * 1024 / 2, totalSpaceInBlocks * percentage); - NAString cbfFilePrefix(path); - cbfFilePrefix.append("/"); - cbfFilePrefix.append(sampleTableName); - cbfFilePrefix.append("."); + NAString cbfFilePrefix; + getCBFFilePrefix(sampleTableName, cbfFilePrefix); Lng32 sz; Lng32 bufSz = 0; char* bufptr = NULL; - char buffer[20]; Int32 count = 0; @@ -6711,9 +6982,7 @@ Int32 HSGlobalsClass::writeCBFstoDiskForIUS(NAString& sampleTableName, if ( group->cbf && group->state == PROCESSED ) { NAString cbfFile(cbfFilePrefix); - str_itoa(group->colSet[0].colnum, buffer); - cbfFile.append(buffer); - + cbfFile.append(group->cbfFileNameSuffix()); Lng32 cbfSz = group->cbf->getTotalMemSize(); @@ -6736,8 +7005,8 @@ Int32 HSGlobalsClass::writeCBFstoDiskForIUS(NAString& sampleTableName, char* buffer = bufptr; sz = group->cbf->packIntoBuffer(buffer, FALSE /* no bytes swapping */ ); - assert( sz <= bufSz); - assert( sz <= buffer - bufptr); + HS_ASSERT( sz <= bufSz); + HS_ASSERT( sz <= buffer - bufptr); ssize_t wsz = write(fd, bufptr, sz); @@ -6750,42 +7019,39 @@ Int32 HSGlobalsClass::writeCBFstoDiskForIUS(NAString& sampleTableName, } + // Make sure we don't write it again on next batch. + delete group->cbf; + group->cbf = NULL; } group = group->next; } NADELETEBASIC(bufptr, STMTHEAP); + if (LM->LogNeeded()) + LM->StopTimer(); return count; } Int32 HSGlobalsClass::deletePersistentCBFsForIUS(NAString& sampleTableName, - HSColGroupStruct* group) + HSColGroupStruct* group, + SortState stateToDelete) { - // Before we have the PERSISTENT_DATA table available to us, we will - // save the CBFs as binary files on disk. One CBF maps to one binary file. - // The path of the directory for these files is specified in CQD - // USTAT_IUS_PERSISTENT_CBF_PATH, and the cbf binary file name is - // sampleTableName + '.' + 'colName'; - - NAString path = - ActiveSchemaDB()->getDefaults().getValue(USTAT_IUS_PERSISTENT_CBF_PATH); - - NAString cbfFilePrefix(path); - cbfFilePrefix.append("/"); - cbfFilePrefix.append(sampleTableName); - cbfFilePrefix.append("."); - char buffer[20]; + NAString cbfFilePrefix; + getCBFFilePrefix(sampleTableName, cbfFilePrefix); while (group) { - if ( group->cbf && group->state == PENDING ) { + if ( group->cbf && group->state == stateToDelete ) { NAString cbfFile(cbfFilePrefix); - str_itoa(group->colSet[0].colnum, buffer); - cbfFile.append(buffer); + cbfFile.append(group->cbfFileNameSuffix()); remove(cbfFile.data()); + + // Make sure this unused CBF does not get persisted. + delete group->cbf; + group->cbf = NULL; } group = group->next; @@ -6797,8 +7063,13 @@ Int32 HSGlobalsClass::deletePersistentCBFsForIUS(NAString& sampleTableName, Lng32 HSGlobalsClass::selectIUSBatch(Int64 currentRows, Int64 futureRows, NABoolean& ranOut, Int32& colsSelected) { HSLogMan *LM = HSLogMan::Instance(); + if (LM->LogNeeded()) + LM->StartTimer("IUS: selectIUSBatch()"); colsSelected = 0; + Int32 colsSuggested = 0; // number of cols we try to allocate + iusSampleDeletedInMem->depopulate(); + iusSampleInsertedInMem->depopulate(); Int64 memAllowed = getMaxMemory(); Int64 memLeft = memAllowed; @@ -6806,6 +7077,8 @@ Lng32 HSGlobalsClass::selectIUSBatch(Int64 currentRows, Int64 futureRows, NABool Lng32 retcode = 0; Int64 tableUID = objDef->getObjectUID(); + char UIDStr[30]; + convertInt64ToAscii(tableUID,UIDStr); Lng32 colnum = 0; UInt32 histID = 0; @@ -6814,11 +7087,20 @@ Lng32 HSGlobalsClass::selectIUSBatch(Int64 currentRows, Int64 futureRows, NABool Int64 totalUEC = 0; Int64 v2; - HSCliStatement histCursor(HSCliStatement::CURSOR107_MX_2300, - (char*)hstogram_table->data(), - (char*)&tableUID, - (char*)&colnum); + // SELECT HISTOGRAM_ID, INTERVAL_COUNT, ROWCOUNT, TOTAL_UEC, V2 + // FROM SB_HISTOGRAMS + // WHERE TABLE_UID = tableUID AND COLCOUNT = 1 AND COLUMN_NUMBER = CAST(? AS INTEGER) + + HSErrorCatcher errorCatcher(retcode, - UERR_INTERNAL_ERROR, "HSGlobalsClass::selectIUSBatch", TRUE); + NAString query = "SELECT HISTOGRAM_ID, INTERVAL_COUNT, ROWCOUNT, TOTAL_UEC, V2 FROM "; + query += *hstogram_table; + query += " WHERE TABLE_UID = "; + query += UIDStr; + query += " AND COLCOUNT = 1 AND COLUMN_NUMBER = CAST(? AS INTEGER)"; // single column histograms only + + HSCursor histCursor; + histCursor.prepareQuery(query.data(), 1, 5); // 1 input parameter, 5 output // Memory required by RUS has been estimated in prepareForIUS(). // Here we need to add the extra amount needed by IUS. Do it for each group below. @@ -6853,7 +7135,7 @@ Lng32 HSGlobalsClass::selectIUSBatch(Int64 currentRows, Int64 futureRows, NABool } colnum = group->colSet[0].colnum; - retcode = histCursor.open(); + retcode = histCursor.open(1, (void*)&colnum); HSHandleError(retcode); retcode = histCursor.fetch(5, (void*)&histID, (void*)&intvlCount, (void*)&totalRowCount, (void*)&totalUEC, @@ -6927,15 +7209,17 @@ Lng32 HSGlobalsClass::selectIUSBatch(Int64 currentRows, Int64 futureRows, NABool insGroup->state = PENDING; - colsSelected++; + colsSuggested++; memLeft -= totMemNeeded; } else { - // the memory is not enough for the group. Do nothing. - // The group is still in UNPROCESSED state. + // Not enough memory for the group. Leave the group in UNPROCESSED state, + // and get rid of the HSHistogram object we created for it. ranOut = TRUE; + delete group->groupHist; + group->groupHist = NULL; if (LM->LogNeeded()) { - sprintf(LM->msg, "Not enough memory: memLeft=" PF64 "totMemNeeded=", memLeft); + sprintf(LM->msg, "Not enough memory for %s: memLeft=" PF64 " totMemNeeded=", group->colNames->data(), memLeft); formatFixedNumeric((Int64)totMemNeeded, 0, LM->msg+strlen(LM->msg)); LM->Log(LM->msg); sprintf(LM->msg, "group->memNeeded="PF64"", group->memNeeded); @@ -6983,15 +7267,13 @@ Lng32 HSGlobalsClass::selectIUSBatch(Int64 currentRows, Int64 futureRows, NABool insGroup = insGroup->next; } - // Now allocate memory for singleGroup, inMemDelete and inMemInsert table, - // for each column in PENDING state. - allocateMemoryForColumns(singleGroup, futureRows, NULL); - - allocateMemoryForColumns(iusSampleDeletedInMem->getColumns(), - iusSampleDeletedInMem->getNumRows(), NULL); - - allocateMemoryForColumns(iusSampleInsertedInMem->getColumns(), - iusSampleInsertedInMem->getNumRows(), NULL); + // Now allocate memory for singleGroup, inMemDelete and inMemInsert table, + // for each column in PENDING state. + colsSelected = allocateMemoryForIUSColumns(singleGroup, futureRows, + iusSampleDeletedInMem->getColumns(), + iusSampleDeletedInMem->getNumRows(), + iusSampleInsertedInMem->getColumns(), + iusSampleInsertedInMem->getNumRows()); if (LM->LogNeeded()) { @@ -7008,8 +7290,10 @@ Lng32 HSGlobalsClass::selectIUSBatch(Int64 currentRows, Int64 futureRows, NABool } group = group->next; } - sprintf(LM->msg, "return from selectIUSBatch(): count=%d", colsSelected); + sprintf(LM->msg, "return from selectIUSBatch(): columns originally selected = %d, " + "columns able to allocate = %d", colsSuggested, colsSelected); LM->Log(LM->msg); + LM->StopTimer(); } return retcode; @@ -7040,8 +7324,11 @@ Lng32 HSGlobalsClass::incrementHistograms() } else { if ( retcode == 0 ) { group->state = PROCESSED; // IUS successful. - delGroup->state = PROCESSED; // IUS successful. - insGroup->state = PROCESSED; // IUS successful. + group->freeISMemory(); + delGroup->state = PROCESSED; // IUS successful. + delGroup->freeISMemory(); + insGroup->state = PROCESSED; // IUS successful. + insGroup->freeISMemory(); } else HSHandleError(retcode); } @@ -7147,9 +7434,14 @@ Lng32 HSGlobalsClass::initIUSIntervals(HSColGroupStruct* smplGroup, UInt32 histID, Int16 numIntervals) { + HSLogMan *LM = HSLogMan::Instance(); + if (LM->LogNeeded()) + LM->StartTimer("IUS: initIUSIntervals()"); typedef Int16 LenType; Lng32 retcode = 0; Int64 tableUID = objDef->getObjectUID(); + char UIDStr[30]; + convertInt64ToAscii(tableUID,UIDStr); Int64 rowCount; Int16 intvlNum; @@ -7160,10 +7452,26 @@ Lng32 HSGlobalsClass::initIUSIntervals(HSColGroupStruct* smplGroup, NAWchar boundarySpec[HS_MAX_UCS_BOUNDARY_CHAR + 1]; // +1 for 2-byte count NAWchar MFV[HS_MAX_UCS_BOUNDARY_CHAR + 1]; - HSCliStatement intvlCursor(HSCliStatement::SHOWINT_MX_2300, - (char*)hsintval_table->data(), - (char*)&tableUID, - (char*)&histID); + char histIDStr[20]; + sprintf(histIDStr,"%u",histID); + + // SELECT several columns + // FROM SB_HISTOGRAM_INTERVALS + // WHERE TABLE_UID = tableUID AND HISTOGRAM_ID = histID + // ORDER BY INTERVAL_NUMBER + + NAString query = "SELECT INTERVAL_NUMBER, INTERVAL_ROWCOUNT, INTERVAL_UEC," + " INTERVAL_BOUNDARY, STD_DEV_OF_FREQ, V1, V2, V5 FROM "; + query += *hsintval_table; + query += " WHERE TABLE_UID = "; + query += UIDStr; + query += " AND HISTOGRAM_ID = "; + query += histIDStr; + query += " ORDER BY INTERVAL_NUMBER"; + + HSCursor intvlCursor(STMTHEAP,HS_INTERVAL_STMT_ID); + intvlCursor.prepareQuery(query.data(), 0, 8); // no input parameters, 8 output + retcode = intvlCursor.open(); HSHandleError(retcode); @@ -7172,8 +7480,8 @@ Lng32 HSGlobalsClass::initIUSIntervals(HSColGroupStruct* smplGroup, while (TRUE) { - retcode = intvlCursor.fetch(8, (void*)&rowCount, - (void*)&intvlNum, + retcode = intvlCursor.fetch(8, (void*)&intvlNum, + (void*)&rowCount, (void*)&uec, (void*)boundarySpec, (void*)&stddev, (void*)&v1, (void*)&v2, @@ -7237,6 +7545,8 @@ Lng32 HSGlobalsClass::initIUSIntervals(HSColGroupStruct* smplGroup, } + if (LM->LogNeeded()) + LM->StopTimer(); return retcode; } @@ -8076,6 +8386,35 @@ NABoolean HSGlobalsClass::allMCGroupsProcessed(NABoolean forIS) } +// This function is called by the HS_ASSERT macro to take care of some things +// before triggering an assertion failure: +// - Log the assertion failure if logging is enabled. +// - Roll back transaction if one is in progress. +// - Put an assertion error in the diagnostics area. This is supposed to be +// done by code executed due to the macro HS_ASSERT invokes for the assertion +// failure, but it does not always work properly. Doing it here prevents it +// from being attempted downstream. +// The parameters are the text of the assertion, and the file and line at which +// it occurred. +void HSGlobalsClass::preAssertionFailure(const char* condition, + const char* fileName, + Lng32 lineNum) +{ + HSTranMan *TM = HSTranMan::Instance(); + HSLogMan *LM = HSLogMan::Instance(); + if (LM->LogNeeded()) + { + sprintf(LM->msg, "***[ERROR] INTERNAL ASSERTION (%s) AT %s:%i", condition, fileName, lineNum); + LM->Log(LM->msg); + } + if (TM->StartedTransaction()) + TM->Rollback(); + diagsArea << DgSqlCode(arkcmpErrorAssert) + << DgString0(condition) + << DgString1(fileName) + << DgInt0(lineNum); +} + /****************************************************************/ /* METHOD: getRetcodeFromDiags() */ @@ -10589,10 +10928,27 @@ Int32 HSGlobalsClass::selectSortBatch(Int64 rows, return count; } +// Reduce the percentage of physical memory to limit internal sort to, following +// an allocation failure that proved our previous estimate too high. We arbitrarily +// reduce it to 90% of what it was. We could get smarter about this, and base the +// reduction on how much of what we recommended was successfully allocated before +// the failure. +void HSGlobalsClass::memReduceAllowance() +{ + HSLogMan *LM = HSLogMan::Instance(); + + ISMemPercentage_ *= .9f; + + if (LM->LogNeeded()) + { + sprintf(LM->msg, "Reducing ISMemPercentage_ to %f", ISMemPercentage_); + LM->Log(LM->msg); + } +} + // Takes corrective action when a memory allocation for internal sort could not // be made. Remove the offending column and remaining unallocated columns from -// the current internal sort batch, and reduce the percentage of available memory -// to request. +// the current internal sort batch. // // Parameters: // failedGroup - The single-col group which could not be allocated for. @@ -10610,8 +10966,8 @@ void HSGlobalsClass::memRecover(HSColGroupStruct* failedGroup, if (LM->LogNeeded()) { LM->Log("<<<Recovering from failed memory allocation for internal sort"); - sprintf(LM->msg, "Memory allocation failed for %s", - failedGroup->colSet[0].colname->data()); + sprintf(LM->msg, "Memory allocation failed for %s (" PF64 " rows)", + failedGroup->colSet[0].colname->data(), rows); LM->Log(LM->msg); } @@ -10683,19 +11039,127 @@ void HSGlobalsClass::memRecover(HSColGroupStruct* failedGroup, } while (grp = grp->next); - // Reduce the percentage of physical memory to limit internal sort to, since - // our previous estimate was obviously too high. We arbitrarily reduce it to - // 90% of what it was. We could get smarter about this, and base the reduction - // on how much of what we recommended was successfully allocated before the - // failure. - ISMemPercentage_ *= .9f; + if (LM->LogNeeded()) + LM->Log(">>>Finished recovery from failed memory allocation for internal sort"); +} +// The data read into memory for IUS consists not only of the primary data +// (from the existing persistent sample), but also the data for the rows to +// be removed from the sample, and those to be inserted into the sample. The +// allocation of memory for these three sets of data for a column must be +// kept consistent, such that on a given pass, the three data sets for a +// column should either all be in memory, or none of them. +// +// This function attempts to allocate the needed memory for all three data +// sets for a given column selected for an IUS batch. If an allocation failure +// occurs for one of the data sets, it ensures that any prior allocation for +// a corresponding data set is undone, and that the state of corresponding +// groups are all the same (typically removing the column from the current +// batch by changing its state from PENDING to UNPROCESSED). +Int32 HSGlobalsClass::allocateMemoryForIUSColumns(HSColGroupStruct* group, + Int64 rows, + HSColGroupStruct* delGroup, + Int64 delRows, + HSColGroupStruct* insGroup, + Int64 insRows) +{ + HSLogMan *LM = HSLogMan::Instance(); if (LM->LogNeeded()) - { - sprintf(LM->msg, "Reducing ISMemPercentage_ to %f", ISMemPercentage_); - LM->Log(LM->msg); - LM->Log(">>>Finished recovery from failed memory allocation for internal sort"); - } + LM->StartTimer("Allocate storage for IUS columns"); + + Int32 numCols = 0; + HSColGroupStruct* firstPendingGroup = NULL; + + // To simplify the logic of keeping the three groups in sync, place them + // and their row counts in arrays. On each iteration the array elements + // are updated to point to the next group in the respective list. + HSColGroupStruct* groupArr[] = {group, delGroup, insGroup}; + Int64 rowsArr[] = {rows, delRows, insRows}; + + NABoolean gotMemory = TRUE; + + // Create storage for query results. + do + { + if (groupArr[0]->state != PENDING) + { + // Skip column not selected for this batch by advancing all 3 groups. + for (Int16 i=0; i<3; i++) + groupArr[i] = groupArr[i]->next; + continue; + } + + if (!firstPendingGroup) + firstPendingGroup = groupArr[0]; + + // Allocate all memory needed for storing values of each group. If unable + // to do so for all groups, make necessary adjustments to group states and + // set flag indicating memory shortfall. + for (Int16 i=0; i<3 && gotMemory; i++) + { + if (!groupArr[i]->allocateISMemory(rowsArr[i])) + { + // Recover from failed allocation (free any partial allocation and + // reset the group's state to UNPROCESSED). Also do this for any + // groups already allocated, e.g., if the allocation fails for + // delGroup, back out the allocation for the primary group for the + // column in question. + Int16 j; + HSColGroupStruct *grpi, *grpj; + for (j=i; j>=0; j--) + { + memRecover(groupArr[j], groupArr[0] == firstPendingGroup, rowsArr[j], NULL); + if (j < i) // free memory for groups in this set + groupArr[j]->freeISMemory(); // that were already allocated + } + + // For groups not allocated yet, don't need to free anything, but + // make sure the states of corresponding groups are the same. + // memRecover() will have changed the PENDING state of some of the + // columns, and if the corresponding delGroup and/or insGroup are + // not changed, they will be part of the queries to read the sample + // decrement/increment, for columns that are not being processed in + // this batch. + for (j=i+1; j<3; j++) + { + grpi = groupArr[i]; + grpj = groupArr[j]; + while (grpi) + { + grpj->state = grpi->state; + grpi = grpi->next; + grpj = grpj->next; + } + } + gotMemory = FALSE; + memReduceAllowance(); + } + else + { + // Allocation was successful. + groupArr[i]->nextData = groupArr[i]->data; + groupArr[i]->mcis_nextData = groupArr[i]->mcis_data; + } + } + + // If the allocation was successful, increment the count of columns for + // which memory was allocated, and advance to the next element in each + // sequence of groups (primary, delete, and insert). If the allocation + // was not successful, the loop will exit (recovery from the allocation + // failure has been performed within the loop). + if (gotMemory) + { + for (Int16 i=0; i<3; i++) + groupArr[i] = groupArr[i]->next; + numCols++; + } + + } while (gotMemory && groupArr[0]); + + if (LM->LogNeeded()) + LM->StopTimer(); + + return numCols; } // Allocates memory needed for internal sort for all columns marked as PENDING. @@ -10731,6 +11195,7 @@ Int32 HSGlobalsClass::allocateMemoryForColumns(HSColGroupStruct* group, if (!group->allocateISMemory(rows)) { memRecover(group, group == firstPendingGroup, rows, mgr); + memReduceAllowance(); break; } @@ -10863,8 +11328,9 @@ Lng32 HSGlobalsClass::readColumnsIntoMem(HSCursor *cursor, Int64 rows) // processInternalSortNulls() to a separate new method in the future. retcode = processInternalSortNulls(cursor->rowsetSize(), singleGroup); HSHandleError(retcode); - retcode = cursor->setRowsetPointers(singleGroup, - (Lng32)MINOF(MAX_ROWSET, rowsLeft)); + Lng32 rowsetSize = (Lng32)MINOF(MAX_ROWSET, rowsLeft); + if (rowsetSize > 0) + retcode = cursor->setRowsetPointers(singleGroup,rowsetSize); } } if (retcode < 0) HSHandleError(retcode) else retcode=0; // Set to 0 for warnings. @@ -11236,6 +11702,31 @@ Int64 placeWidePivot(T* sortArr, Int64 lowInx, Int64 highInx, Int64 pivotInx, return (pivotWidth >= highInx - lowInx ? -1 : storePtr - sortArr); } + +template <class T> +void checkForBackwardness(HSGlobalsClass * hsGlobals, HSColGroupStruct *group, T * listitem1, T * listitem2) +{ + if (*listitem1 > *listitem2) + { + group->backwardWarningCount++; + if (group->backwardWarningCount < 5) // report this warning at most 5 times per column + { + // raise a warning that we found data in backwards order, which means + // we might get out-of-order histograms, which is bad + hsGlobals->diagsArea << DgSqlCode(UERR_UNEXPECTED_BACKWARDS_DATA) + << DgString0(group->colSet[0].colname->data()); + } + } +} + +template < > +void checkForBackwardness(HSGlobalsClass * hsGlobals, HSColGroupStruct *group, +MCWrapper * listitem1, MCWrapper * listitem2) +{ + // TODO: write this method when necessary; it's a no-op for now +} + + // The data in the column's data array has been sorted but not grouped. // Iterate over the values, counting duplicates. When a new value is // encountered, create a new group, consisting of a distinct value and @@ -11295,6 +11786,7 @@ do valueCountIndex++; numRows=0; } + checkForBackwardness(hsGlobals,group,listitem1,listitem2); listitem1++; listitem2++; valueIndex++; @@ -11628,6 +12120,60 @@ NAString& HSGlobalsClass::getWherePredicateForIUS() return (*ius_where_condition_text); } +// Return the following string in the queryText parameter: +// delete from <smplTable> where <whereCondition> +void HSGlobalsClass::generateIUSDeleteQuery(const NAString& smplTable, + NAString& queryText) +{ + queryText = "DELETE FROM "; + + queryText.append(smplTable.data()); + + NAString& whereClause = getWherePredicateForIUS(); + if (whereClause.length() > 0) { + queryText.append(" WHERE "); + queryText.append(whereClause); + } +} + +// Create statement to add rows to the IUS persistent sample table. +// upsert using load into into <smplTable>... +// +// If doing full IUS, the new sample table rows are already in the temporary +// _I table, and the source for the upsert is +// (select * from <smplTable>_I) +// +// If a limited IUS (update persistent sample table and generate histograms), +// the source for the upsert is the source table with the IUS where predicate +// and sampling rate applied: +// (select * from <sourceTable> where <predicate> sample random <sampleRate> percent) +void HSGlobalsClass::generateIUSSelectInsertQuery(const NAString& smplTable, + const NAString& sourceTable, + NAString& queryText) +{ + queryText.append("UPSERT USING LOAD INTO "); // for algorithm 1 + queryText.append(smplTable.data()); + queryText.append(" (SELECT * FROM "); + + if (CmpCommon::getDefault(USTAT_INCREMENTAL_UPDATE_STATISTICS) == DF_ON) + { + queryText.append(smplTable.data()); + queryText.append("_I)"); + } + else + { + queryText.append(sourceTable.data()); + queryText.append(" where "); + queryText.append(getWherePredicateForIUS()); + NAString sampleOpt; + createSampleOption(SAMPLE_RAND_1, + sampleRateAsPercetageForIUS * 100.0, + sampleOpt, 0, 0); + queryText.append(sampleOpt); + queryText.append(")"); + }
<TRUNCATED>
