[
https://issues.apache.org/jira/browse/TRAFODION-25?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954111#comment-15954111
]
ASF GitHub Bot commented on TRAFODION-25:
-----------------------------------------
Github user DaveBirdsall commented on a diff in the pull request:
https://github.com/apache/incubator-trafodion/pull/1043#discussion_r109515170
--- Diff: core/sql/optimizer/ScmCostMethod.cpp ---
@@ -3845,8 +3845,336 @@
CostMethodHbaseUpdate::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
- // TODO: Write this method; the line below is a stub
- return CostMethod::scmComputeOperatorCostInternal(op,pws,countOfStreams);
+ // TODO: Write this method; the code below is a copy of the Delete
+ // method which we'll use for the moment. This is better than just
+ // a simple constant stub; we will get parallel Update plans with
+ // this code, for example, that we won't get with constant cost.
+
+ // The theory of operation of Update is somewhat different (since it
+ // might, for example, do a Delete + an Insert, or might do an Hbase
+ // Update -- is that decided before we get here?), so this code will
+ // underestimate the cost in general.
+
+ const Context * myContext = pws->getContext();
+
+ cacheParameters(op,myContext);
+ estimateDegreeOfParallelism();
+
+ const InputPhysicalProperty* ippForMe =
+ myContext->getInputPhysicalProperty();
+
+ // -----------------------------------------
+ // Save off estimated degree of parallelism.
+ // -----------------------------------------
+ countOfStreams = countOfStreams_;
+
+ HbaseDelete* delOp = (HbaseDelete *)op; // downcast
+
+ CMPASSERT(partFunc_ != NULL);
+
+ // Later, if and when we start using NodeMaps to track active regions
for
+ // Trafodion tables in HBase (or native HBase tables), we can use the
+ // following to get active partitions.
+ //CostScalar activePartitions =
+ // (CostScalar)
+ // (((NodeMap *)(partFunc_->getNodeMap()))->getNumActivePartitions());
+ // But for now, we do the following:
+ CostScalar activePartitions =
(CostScalar)(partFunc_->getCountOfPartitions());
+
+ const IndexDesc* CIDesc = delOp->getIndexDesc();
+ const CostScalar & recordSizeInKb = CIDesc->getRecordSizeInKb();
+
+ CostScalar tuplesProcessed(csZero);
+ CostScalar tuplesProduced(csZero);
+ CostScalar tuplesSent(csZero); // we use tuplesSent to model sending
rowIDs to Hbase
+ CostScalar randomIOs(csZero);
+ CostScalar sequentialIOs(csZero);
+
+ CostScalar countOfAsynchronousStreams = activePartitions;
+
+ // figure out if the probes are in order - if they are, then when
+ // scanning, I/O will tend to be sequential
+
+ NABoolean probesInOrder = FALSE;
+ if (ippForMe != NULL) // input physical properties exist?
+ {
+ // See if the probes are in order.
+
+ // For delete, a partial order is ok.
+ NABoolean partiallyInOrderOK = TRUE;
+ NABoolean probesForceSynchronousAccess = FALSE;
+ ValueIdList targetSortKey = CIDesc->getOrderOfKeyValues();
+ ValueIdSet sourceCharInputs =
+ delOp->getGroupAttr()->getCharacteristicInputs();
+
+ ValueIdSet targetCharInputs;
+ // The char inputs are still in terms of the source. Map them to the
target.
+ // Note: The source char outputs in the ipp have already been mapped to
+ // the target. CharOutputs are a set, meaning they do not have
duplicates
+ // But we could have cases where two columns of the target are matched
to the
+ // same source column, example: Sol: 10-040416-5166, where we have
+ // INSERT INTO b6table1
+ // ( SELECT f, h_to_f, f, 8.4
+ // FROM btre211
+ // );
+ // Hence we use lists here instead of sets.
+ // Check to see if there are any duplicates in the source
Characteristics inputs
+ // if no, we shall perform set operations, as these are faster
+ ValueIdList bottomValues =
delOp->updateToSelectMap().getBottomValues();
+ ValueIdSet bottomValuesSet(bottomValues);
+ NABoolean useListInsteadOfSet = FALSE;
+
+ CascadesGroup* group1 =
(*CURRSTMT_OPTGLOBALS->memo)[delOp->getGroupId()];
+
+ GenericUpdate* upOperator = (GenericUpdate *)
group1->getFirstLogExpr();
+
+ if (((upOperator->getTableName().getSpecialType() ==
ExtendedQualName::NORMAL_TABLE ) ||
(upOperator->getTableName().getSpecialType() == ExtendedQualName::GHOST_TABLE
)) &&
+ (bottomValuesSet.entries() != bottomValues.entries() ) )
+ {
+
+ ValueIdList targetInputList;
+ // from here get all the bottom values that appear in the
sourceCharInputs
+ bottomValues.findCommonElements(sourceCharInputs );
+ bottomValuesSet = bottomValues;
+
+ // we can use the bottomValues only if these contain some duplicate
columns of
+ // characteristics inputs, otherwise we shall use the
characteristics inputs.
+ if (bottomValuesSet == sourceCharInputs)
+ {
+ useListInsteadOfSet = TRUE;
+ delOp->updateToSelectMap().rewriteValueIdListUpWithIndex(
+ targetInputList,
+ bottomValues);
+ targetCharInputs = targetInputList;
+ }
+ }
+
+ if (!useListInsteadOfSet)
+ {
+ delOp->updateToSelectMap().rewriteValueIdSetUp(
+ targetCharInputs,
+ sourceCharInputs);
+ }
+
+ // If a target key column is covered by a constant on the source side,
+ // then we need to remove that column from the target sort key
+ removeConstantsFromTargetSortKey(&targetSortKey,
+ &(delOp->updateToSelectMap()));
+ NABoolean orderedNJ = TRUE;
+ // Don't call ordersMatch if njOuterOrder_ is null.
+ if (ippForMe->getAssumeSortedForCosting())
+ orderedNJ = FALSE;
+ else
+ // if leading keys are not same then don't try ordered NJ.
+ orderedNJ =
+ isOrderedNJFeasible(*(ippForMe->getNjOuterOrder()), targetSortKey);
+
+ if (orderedNJ AND
+ ordersMatch(ippForMe,
+ CIDesc,
+ &targetSortKey,
+ targetCharInputs,
+ partiallyInOrderOK,
+ probesForceSynchronousAccess))
+ {
+ probesInOrder = TRUE;
+ if (probesForceSynchronousAccess)
+ {
+ // The probes form a complete order across all partitions and
+ // the clustering key and partitioning key are the same. So, the
+ // only asynchronous I/O we will see will be due to ESPs. So,
+ // limit the count of streams in DP2 by the count of streams in
ESP.
+
+ // Get the logPhysPartitioningFunction, which we will use
+ // to get the logical partitioning function. If it's NULL,
+ // it means the table was not partitioned at all, so we don't
+ // need to limit anything since there already is no asynch I/O.
+
+ // TODO: lppf is always null in Trafodion; figure out what to do
instead...
+ const LogPhysPartitioningFunction* lppf =
+ partFunc_->castToLogPhysPartitioningFunction();
+ if (lppf != NULL)
+ {
+ PartitioningFunction* logPartFunc =
+ lppf->getLogPartitioningFunction();
+ // Get the number of ESPs:
+ CostScalar numParts = logPartFunc->getCountOfPartitions();
+
+ countOfAsynchronousStreams = MINOF(numParts,
+ countOfAsynchronousStreams);
+ } // lppf != NULL
+ } // probesForceSynchronousAccess
+ } // probes are in order
+ } // if input physical properties exist
+
+ CostScalar currentCpus =
+
(CostScalar)myContext->getPlan()->getPhysicalProperty()->getCurrentCountOfCPUs();
+ CostScalar activeCpus = MINOF(countOfAsynchronousStreams, currentCpus);
+ CostScalar streamsPerCpu =
+ (countOfAsynchronousStreams / activeCpus).getCeiling();
+
+
+ CostScalar noOfProbesPerPartition(csOne);
+
+ CostScalar numRowsToDelete(csOne);
+ CostScalar numRowsToScan(csOne);
+
+ CostScalar commonComputation;
+
+ // Determine # of rows to scan and to delete
+
+ if (delOp->getSearchKey() && delOp->getSearchKey()->isUnique() &&
+ (noOfProbes_ == 1))
+ {
+ // unique access
+
+ activePartitions = csOne;
+ countOfAsynchronousStreams = csOne;
+ activeCpus = csOne;
+ streamsPerCpu = csOne;
+ numRowsToScan = csOne;
+ // assume the 1 row always satisfies any executor predicates so
+ // we'll always do the Delete
+ numRowsToDelete = csOne;
+ }
+ else
+ {
+ // non-unique access
+
+ numRowsToDelete =
+ ((myRowCount_ / activePartitions).getCeiling()).minCsOne();
+ noOfProbesPerPartition =
+ ((noOfProbes_ / countOfAsynchronousStreams).getCeiling()).minCsOne();
+
+ // need to compute the number of rows that satisfy the key predicates
+ // to compute the I/Os that must be performed
+
+ // need to create a new histogram, since the one from input logical
+ // prop. has the histogram for the table after all executor preds are
+ // applied (i.e. the result cardinality)
+ IndexDescHistograms
histograms(*CIDesc,CIDesc->getIndexKey().entries());
+
+ // retrieve all of the key preds in key column order
+ ColumnOrderList keyPredsByCol(CIDesc->getIndexKey());
+ delOp->getSearchKey()->getKeyPredicatesByColumn(keyPredsByCol);
+
+ if ( NOT allKeyColumnsHaveHistogramStatistics( histograms, CIDesc ) )
+ {
+ // All key columns do not have histogram data, the best we can
+ // do is use the number of rows that satisfy all predicates
+ // (i.e. the number of rows we will be updating)
+ numRowsToScan = numRowsToDelete;
+ }
+ else
+ {
+ numRowsToScan = numRowsToScanWhenAllKeyColumnsHaveHistograms(
+ histograms,
+ keyPredsByCol,
+ activePartitions,
+ CIDesc
+ );
+ if (numRowsToScan < numRowsToDelete) // sanity test
+ {
+ // we will scan at least as many rows as we delete
+ numRowsToScan = numRowsToDelete;
+ }
+ }
+ }
+
+ // Notes: At execution time, several different TCBs can be created
+ // for a delete. We can class them three ways: Unique, Subset, and
+ // Rowset. Representative examples of the three classes are:
+ //
+ // ExHbaseUMDtrafUniqueTaskTcb
+ // ExHbaseUMDtrafSubsetTaskTcb
+ // ExHbaseAccessSQRowsetTcb
+ //
+ // The theory of operation of each of these differs somewhat.
+ //
+ // For the Unique variant, we use an HBase "get" to obtain a row, apply
+ // a predicate to it, then do an HBase "delete" to delete it if the
+ // predicate is true. (If there is no predicate, we'll simply do a
+ // "checkAndDelete" so there would be no "get" cost.)
+ //
+ // For the Subset variant, we use an HBase "scan" to obtain a sequence
+ // of rows, apply a predicate to each, then do an HBase "delete" on
+ // each row that passes the predicate.
+ //
+ // For the Rowset variant, we simply pass all the input keys to
+ // HBase in batches in HBase "deleteRows" calls. (In Explain plans,
+ // this TCB shows up as "trafodion_delete_vsbb", while the first two
+ // show up as "trafodion_delete".) There is no "get" cost. In plans
+ // with this TCB, there is a separate Scan TCB to obtain the keys,
+ // which then flow to this Rowset TCB via a tuple flow or nested join.
+ // (Such a separate Scan might exist with the first two TCBs also,
+ // e.g., when an index is used to decide which rows to delete.)
+ // The messaging cost to HBase is also reduced since multiple delete
+ // keys are sent per HBase interaction.
+ //
+ // Unfortunately the decisions as to which TCB will be used are
+ // currently made in the generator code and so aren't easily
+ // available to us here. For the moment then, we make no attempt
+ // to distinguish a separate "get" cost, nor do we take into account
+ // possible reduced message cost in the Rowset case. Should this
+ // choice be refactored in the future to push it into the Optimizer,
+ // then we can do a better job here. We did attempt to distinguish
+ // the unique case here from the others, but even there our criteria
+ // are not quite the same as in the generator. So at best, this attempt
+ // simply sharpens the cost estimate in this one particular case.
+
+
+ // Compute the I/O cost
+
+ computeIOCostsForCursorOperation(
+ randomIOs /* out */,
+ sequentialIOs /* out */,
+ CIDesc,
+ numRowsToScan,
+ probesInOrder
+ );
+
+ // Compute the tuple cost
+
+ tuplesProduced = numRowsToDelete;
+ tuplesProcessed = numRowsToScan;
+ tuplesSent = numRowsToDelete;
+
+ CostScalar rowSize = delOp->getIndexDesc()->getRecordLength();
+ CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
+ CostScalar outputRowSize = delOp->getGroupAttr()->getRecordLength();
+ CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
+
+ tuplesProcessed *= rowSizeFactor;
+ tuplesSent *= rowSizeFactor;
+ tuplesProduced *= outputRowSizeFactor;
+
+
+ // ---------------------------------------------------------------------
+ // Synthesize and return cost object.
+ // ---------------------------------------------------------------------
+
+ CostScalar probeRowSize = delOp->getIndexDesc()->getKeyLength();
+ Cost * updateCost =
+ scmCost(tuplesProcessed, tuplesProduced, tuplesSent, randomIOs,
sequentialIOs, noOfProbes_,
+ rowSize, csZero, outputRowSize, probeRowSize);
+
+#ifndef NDEBUG
+if ( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON )
+ {
+ pfp = stdout;
+ fprintf(pfp, "HbaseDelete::scmComputeOperatorCostInternal()\n");
+ updateCost->getScmCplr().print(pfp);
+ fprintf(pfp, "HBase Delete elapsed time: ");
--- End diff --
Here too.
> Insert, delete and update operators should use specialized cost method
> -----------------------------------------------------------------------
>
> Key: TRAFODION-25
> URL: https://issues.apache.org/jira/browse/TRAFODION-25
> Project: Apache Trafodion
> Issue Type: Bug
> Components: sql-cmp
> Reporter: Qifan Chen
> Assignee: David Wayne Birdsall
> Labels: performance
>
> In Trafodion, insert, delete and update operators do not use a specialized
> cost method that model the true operation involved.
> As a result, these operators can be assigned a zero cost and the query plan
> may not be optimal. In one example, a delete query against a partitioned
> table may get a serial plan.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)