yjhjstz commented on code in PR #1241:
URL: https://github.com/apache/cloudberry/pull/1241#discussion_r2237895326


##########
src/backend/access/appendonly/appendonlyam.c:
##########
@@ -1118,6 +1118,305 @@ getNextBlock(AppendOnlyScanDesc scan)
        return true;
 }
 
+static int
+appendonly_locate_target_segment(AppendOnlyScanDesc scan, int64 targrow)
+{
+       int64 rowcount;
+
+       for (int i = scan->aos_segfiles_processed - 1; i < 
scan->aos_total_segfiles; i++)
+       {
+               if (i < 0)
+                       continue;
+
+               rowcount = scan->aos_segfile_arr[i]->total_tupcount;
+               if (rowcount <= 0)
+                       continue;
+
+               if (scan->segfirstrow + rowcount - 1 >= targrow)
+               {
+                       /* found the target segment */
+                       return i;
+               }
+
+               /* continue next segment */
+               scan->segfirstrow += rowcount;
+               scan->segrowsprocessed = 0;
+       }
+
+       /* row is beyond the total number of rows in the relation */
+       return -1;
+}
+
+/*
+ * returns the segfile number in which `targrow` locates  
+ */
+static int
+appendonly_getsegment(AppendOnlyScanDesc scan, int64 targrow)
+{
+       int segidx, segno;
+
+       /* locate the target segment */
+       segidx = appendonly_locate_target_segment(scan, targrow);
+       if (segidx < 0)
+       {
+               CloseScannedFileSeg(scan);
+
+               /* done reading all segfiles */
+               Assert(scan->aos_done_all_segfiles);
+
+               return -1;
+       }
+
+       if (segidx + 1 > scan->aos_segfiles_processed)
+       {
+               /* done current segfile */
+               CloseScannedFileSeg(scan);
+               /*
+                * Adjust aos_segfiles_processed to guide
+                * SetNextFileSegForRead() opening next
+                * right segfile.
+                */
+               scan->aos_segfiles_processed = segidx;
+       }
+
+       segno = scan->aos_segfile_arr[segidx]->segno;
+       Assert(segno > InvalidFileSegNumber && segno <= 
AOTupleId_MaxSegmentFileNum);
+
+       if (scan->aos_need_new_segfile)
+       {
+               if (SetNextFileSegForRead(scan))
+               {
+                       Assert(scan->segrowsprocessed == 0);
+                       scan->needNextBuffer = true;
+               }
+               else
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INTERNAL_ERROR),
+                                        errmsg("Unexpected behavior, failed to 
open segno %d during scanning AO table %s",
+                                                       segno, 
RelationGetRelationName(scan->aos_rd))));
+               }
+       }
+
+       return segno;
+}
+
+static inline int64
+appendonly_block_remaining_rows(AppendOnlyScanDesc scan)
+{
+       return (scan->executorReadBlock.rowCount - 
scan->executorReadBlock.blockRowsProcessed);
+}
+
+/*
+ * locates the block in which `targrow` exists
+ */
+static void
+appendonly_getblock(AppendOnlyScanDesc scan, int64 targrow, int64 *startrow)
+{
+       AppendOnlyExecutorReadBlock *varblock = &scan->executorReadBlock;
+       int64 rowcount = -1;
+
+       if (!scan->needNextBuffer)
+       {
+               /* we have a current block */
+               rowcount = appendonly_block_remaining_rows(scan);
+               Assert(rowcount >= 0);
+
+               if (*startrow + rowcount - 1 >= targrow)
+               {
+                       /* row lies in current block, nothing to do */
+                       return;
+               }
+               else
+               {
+                       /* skip scanning remaining rows */
+                       *startrow += rowcount;
+                       scan->needNextBuffer = true;
+               }
+       }
+
+       /*
+        * Keep reading block headers until we find the block containing
+        * the target row.
+        */
+       while (true)
+       {
+               elog(DEBUG2, "appendonly_getblock(): [targrow: %ld, currow: 
%ld, diff: %ld, "
+                        "startrow: %ld, rowcount: %ld, segfirstrow: %ld, 
segrowsprocessed: %ld, "
+                        "blockRowsProcessed: %ld, blockRowCount: %d]", 
targrow, *startrow + rowcount - 1,
+                        *startrow + rowcount - 1 - targrow, *startrow, 
rowcount, scan->segfirstrow,
+                        scan->segrowsprocessed, varblock->blockRowsProcessed,
+                        varblock->rowCount);
+
+               if 
(AppendOnlyExecutorReadBlock_GetBlockInfo(&scan->storageRead, varblock))
+               {
+                       /* new block, reset blockRowsProcessed */
+                       varblock->blockRowsProcessed = 0;
+                       rowcount = appendonly_block_remaining_rows(scan);
+                       Assert(rowcount > 0);
+                       if (*startrow + rowcount - 1 >= targrow)
+                       {
+                               
AppendOnlyExecutorReadBlock_GetContents(varblock);
+                               /* got a new buffer to consume */
+                               scan->needNextBuffer = false;
+                               return;
+                       }
+
+                       *startrow += rowcount;
+                       
AppendOnlyExecutionReadBlock_FinishedScanBlock(varblock);
+                       
AppendOnlyStorageRead_SkipCurrentBlock(&scan->storageRead);
+                       /* continue next block */
+               }
+               else
+                       pg_unreachable(); /* unreachable code */
+       }
+}
+
+/*
+ * block directory based get_target_tuple()
+ */
+static bool
+appendonly_blkdirscan_get_target_tuple(AppendOnlyScanDesc scan, int64 targrow, 
TupleTableSlot *slot)
+{
+       int segno, segidx;
+       int64 rownum, rowsprocessed;
+       AOTupleId aotid;
+       AppendOnlyBlockDirectory *blkdir = &scan->aofetch->blockDirectory;
+
+       Assert(scan->blkdirscan != NULL);
+
+       /* locate the target segment */
+       segidx = appendonly_locate_target_segment(scan, targrow);
+       if (segidx < 0)
+               return false;
+
+       scan->aos_segfiles_processed = segidx + 1;
+
+       segno = scan->aos_segfile_arr[segidx]->segno;
+       Assert(segno > InvalidFileSegNumber && segno <= 
AOTupleId_MaxSegmentFileNum);
+
+       /*
+        * Note: It is safe to assume that the scan's segfile array and the
+        * blockdir's segfile array are identical. Otherwise, we should stop
+        * processing and throw an exception to make the error visible.
+        */
+       if (blkdir->segmentFileInfo[segidx]->segno != segno)
+       {
+               ereport(ERROR,
+                               (errcode(ERRCODE_INTERNAL_ERROR),
+                                errmsg("segfile array contents in both scan 
descriptor "
+                                               "and block directory are not 
identical on "
+                                               "append-optimized relation 
'%s'",
+                                               
RelationGetRelationName(blkdir->aoRel))));
+       }
+
+       /*
+        * Set the current segfile info in the blkdir struct, so we can
+        * reuse the (cached) block directory entry during the tuple fetch
+        * operation below. See AppendOnlyBlockDirectory_GetCachedEntry().
+        */
+       blkdir->currentSegmentFileNum = blkdir->segmentFileInfo[segidx]->segno;
+       blkdir->currentSegmentFileInfo = blkdir->segmentFileInfo[segidx];
+
+       /*
+        * "segfirstrow" should be always pointing to the first row of
+        * a new segfile in blkdir based ANALYZE, only locate_target_segment
+        * could update its value.
+        * 
+        * "segrowsprocessed" is used for tracking the position of
+        * processed rows in the current segfile.
+        */
+       rowsprocessed = scan->segfirstrow + scan->segrowsprocessed;
+       /* locate the target row by seqscan block directory */
+       rownum = AOBlkDirScan_GetRowNum(scan->blkdirscan,
+                                                                       segno,
+                                                                       0,
+                                                                       targrow,
+                                                                       
&rowsprocessed);
+
+       elog(DEBUG2, "AOBlkDirScan_GetRowNum(segno: %d, col: %d, targrow: %ld): 
"
+                "[segfirstrow: %ld, segrowsprocessed: %ld, rownum: %ld, 
cached_mpentry_num: %d]",
+                segno, 0, targrow, scan->segfirstrow, scan->segrowsprocessed, 
rownum,
+                blkdir->cached_mpentry_num);
+       
+       if (rownum < 0)
+               return false;
+
+       scan->segrowsprocessed = rowsprocessed - scan->segfirstrow;
+
+       /* form the target tuple TID */
+       AOTupleIdInit(&aotid, segno, rownum);
+
+       /* ensure the target minipage entry was stored in fetch descriptor */
+       Assert(blkdir->cached_mpentry_num != InvalidEntryNum);
+       Assert(blkdir->minipages == &blkdir->minipages[0]);
+
+       /* fetch the target tuple */
+       if(!appendonly_fetch(scan->aofetch, &aotid, slot))
+               return false;
+
+       /* OK to return this tuple */
+       pgstat_count_heap_fetch(scan->aos_rd);
+
+       return true;
+}
+
+/*
+ * Given a specific target row number 'targrow' (in the space of all row 
numbers
+ * physically present in the table, i.e. across all segfiles), scan and return
+ * the corresponding tuple in 'slot'.
+ *
+ * If the tuple is visible, return true. Otherwise, return false.
+ */
+bool
+appendonly_get_target_tuple(AppendOnlyScanDesc aoscan, int64 targrow, 
TupleTableSlot *slot)
+{
+       AppendOnlyExecutorReadBlock *varblock = &aoscan->executorReadBlock;
+       bool visible;
+       int64 rowsprocessed, rownum;
+       int segno;
+       AOTupleId aotid;
+
+       if (aoscan->blkdirscan != NULL)
+               return appendonly_blkdirscan_get_target_tuple(aoscan, targrow, 
slot);
+
+       segno = appendonly_getsegment(aoscan, targrow);
+       if (segno < 0)
+               return false;
+
+       rowsprocessed = aoscan->segfirstrow + aoscan->segrowsprocessed;
+
+       appendonly_getblock(aoscan, targrow, &rowsprocessed);
+
+       aoscan->segrowsprocessed = rowsprocessed - aoscan->segfirstrow;

Review Comment:
   appendonly_getblock may change `rowsprocessed` .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org

Reply via email to