This is an automated email from the ASF dual-hosted git repository.
maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new f6ba05ff42 Use interface for storage interactions in Append-optimized
TAM
f6ba05ff42 is described below
commit f6ba05ff42855a5112334aaabe6cfc0eb3046270
Author: reshke <[email protected]>
AuthorDate: Sun Sep 29 20:01:20 2024 +0000
Use interface for storage interactions in Append-optimized TAM
We present an appendonly SMGR interface and use it for storage
operations.
Use of this interface allow to implement out-of-core extensions that
store data not localy but in some external storage.
---
src/backend/access/aocs/aocs_compaction.c | 4 +-
src/backend/access/aocs/aocsam.c | 33 +++++++++++----
src/backend/access/aocs/aocsam_handler.c | 2 +-
src/backend/access/aocs/test/aocsam_test.c | 14 +++++++
src/backend/access/appendonly/aomd.c | 48 ++++++++++++++--------
.../access/appendonly/appendonly_compaction.c | 4 +-
src/backend/access/appendonly/appendonlyam.c | 12 ++++--
.../access/appendonly/appendonlyam_handler.c | 2 +-
src/backend/cdb/cdbappendonlystorageread.c | 11 +++--
src/backend/cdb/cdbappendonlystoragewrite.c | 12 ++++--
src/backend/cdb/cdbappendonlyxlog.c | 23 +++++++++--
src/backend/cdb/cdbbufferedappend.c | 7 +++-
src/backend/cdb/cdbbufferedread.c | 10 +++--
src/backend/cdb/test/cdbbufferedread_test.c | 6 ++-
src/backend/storage/smgr/smgr.c | 23 +++++++++++
src/backend/utils/datumstream/datumstream.c | 11 +++--
src/include/access/aomd.h | 6 ++-
src/include/cdb/cdbappendonlystorageread.h | 4 +-
src/include/cdb/cdbappendonlystoragewrite.h | 6 ++-
src/include/cdb/cdbbufferedappend.h | 4 +-
src/include/cdb/cdbbufferedread.h | 4 +-
src/include/storage/smgr.h | 17 ++++++++
src/include/utils/datumstream.h | 6 ++-
23 files changed, 205 insertions(+), 64 deletions(-)
diff --git a/src/backend/access/aocs/aocs_compaction.c
b/src/backend/access/aocs/aocs_compaction.c
index 9e5c9414d5..34ac15cfe8 100644
--- a/src/backend/access/aocs/aocs_compaction.c
+++ b/src/backend/access/aocs/aocs_compaction.c
@@ -86,7 +86,7 @@ AOCSCompaction_DropSegmentFile(Relation aorel, int segno,
AOVacuumRelStats *vacr
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, pseudoSegNo, 0,
vacrelstats);
- CloseAOSegmentFile(fd);
+ CloseAOSegmentFile(fd, aorel);
}
else
{
@@ -149,7 +149,7 @@ AOCSSegmentFileTruncateToEOF(Relation aorel, int segno,
AOCSVPInfo *vpinfo, AOVa
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, fileSegNo, segeof,
vacrelstats);
- CloseAOSegmentFile(fd);
+ CloseAOSegmentFile(fd, aorel);
elogif(Debug_appendonly_print_compaction, LOG,
"Successfully truncated AO COL relation
\"%s.%s\", relation id %u, relfilenode %lu column #%d, logical segment #%d
(physical segment file #%d, logical EOF " INT64_FORMAT ")",
diff --git a/src/backend/access/aocs/aocsam.c b/src/backend/access/aocs/aocsam.c
index bf12f52611..b6f20b4232 100644
--- a/src/backend/access/aocs/aocsam.c
+++ b/src/backend/access/aocs/aocsam.c
@@ -75,7 +75,9 @@ aocs_delete_hook_type aocs_delete_hook = NULL;
*/
static void
open_datumstreamread_segfile(
- char *basepath,
RelFileNode node,
+ char *basepath,
+ const struct f_smgr_ao
*smgrAO,
+ RelFileNode node,
AOCSFileSegInfo
*segInfo,
DatumStreamRead *ds,
int colNo)
@@ -118,7 +120,7 @@ open_all_datumstreamread_segfiles(AOCSScanDesc scan,
AOCSFileSegInfo *segInfo)
{
AttrNumber attno = proj_atts[i];
- open_datumstreamread_segfile(basepath, rel->rd_node, segInfo,
ds[attno], attno);
+ open_datumstreamread_segfile(basepath, rel->rd_smgr->smgr_ao,
rel->rd_node, segInfo, ds[attno], attno);
datumstreamread_block(ds[attno], blockDirectory, attno);
AOCSScanDesc_UpdateTotalBytesRead(scan, attno);
@@ -141,6 +143,8 @@ open_ds_write(Relation rel, DatumStreamWrite **ds,
TupleDesc relationTupleDesc,
rnode.node = rel->rd_node;
rnode.backend = rel->rd_backend;
+ RelationOpenSmgr(rel);
+
/* open datum streams. It will open segment file underneath */
for (int i = 0; i < natts; ++i)
{
@@ -181,7 +185,8 @@ open_ds_write(Relation rel, DatumStreamWrite **ds,
TupleDesc relationTupleDesc,
RelationGetRelationName(rel),
/* title */ titleBuf.data,
XLogIsNeeded() && RelationNeedsWAL(rel),
-
&rnode);
+
&rnode,
+
rel->rd_smgr->smgr_ao);
}
}
@@ -231,6 +236,8 @@ open_ds_read(Relation rel, DatumStreamRead **ds, TupleDesc
relationTupleDesc,
for (AttrNumber attno = 0; attno < relationTupleDesc->natts; attno++)
ds[attno] = NULL;
+ RelationOpenSmgr(rel);
+
/* And then initialize the data streams for those columns we need */
for (AttrNumber i = 0; i < num_proj_atts; i++)
{
@@ -272,7 +279,8 @@ open_ds_read(Relation rel, DatumStreamRead **ds, TupleDesc
relationTupleDesc,
attr,
RelationGetRelationName(rel),
/* title */ titleBuf.data,
-
&rel->rd_node);
+
&rel->rd_node,
+
rel->rd_smgr->smgr_ao);
}
}
@@ -1409,7 +1417,7 @@ openFetchSegmentFile(AOCSFetchDesc aocsFetchDesc,
if (logicalEof == 0)
return false;
- open_datumstreamread_segfile(aocsFetchDesc->basepath,
aocsFetchDesc->relation->rd_node,
+ open_datumstreamread_segfile(aocsFetchDesc->basepath,
aocsFetchDesc->relation->rd_smgr->smgr_ao, aocsFetchDesc->relation->rd_node,
fsInfo,
datumStreamFetchDesc->datumStream,
colNo);
@@ -1523,6 +1531,8 @@ aocs_fetch_init(Relation relation,
aocsFetchDesc->datumStreamFetchDesc = (DatumStreamFetchDesc *)
palloc0(relation->rd_att->natts * sizeof(DatumStreamFetchDesc));
+ RelationOpenSmgr(relation);
+
for (colno = 0; colno < relation->rd_att->natts; colno++)
{
@@ -1566,7 +1576,7 @@ aocs_fetch_init(Relation relation,
TupleDescAttr(tupleDesc, colno),
relation->rd_rel->relname.data,
/*
title */ titleBuf.data,
-
&relation->rd_node);
+
&relation->rd_node, relation->rd_smgr->smgr_ao);
}
if (opts[colno])
@@ -1941,13 +1951,16 @@ aocs_begin_headerscan(Relation rel, int colno)
ao_attr.overflowSize = 0;
ao_attr.safeFSWriteSize = 0;
hdesc = palloc(sizeof(AOCSHeaderScanDescData));
+
+ RelationOpenSmgr(rel);
+
AppendOnlyStorageRead_Init(&hdesc->ao_read,
NULL, //current
memory context
opts[colno]->blocksize,
RelationGetRelationName(rel),
"ALTER TABLE ADD
COLUMN scan",
&ao_attr,
- &rel->rd_node);
+ &rel->rd_node,
rel->rd_smgr->smgr_ao);
hdesc->colno = colno;
return hdesc;
}
@@ -2032,6 +2045,9 @@ aocs_addcol_init(Relation rel,
NULL);
iattr = rel->rd_att->natts - num_newcols;
+
+ RelationOpenSmgr(rel);
+
for (i = 0; i < num_newcols; ++i, ++iattr)
{
Form_pg_attribute attr = TupleDescAttr(rel->rd_att, iattr);
@@ -2047,7 +2063,8 @@ aocs_addcol_init(Relation rel,
attr, RelationGetRelationName(rel),
titleBuf.data,
XLogIsNeeded() && RelationNeedsWAL(rel),
-
&rnode);
+
&rnode,
+
rel->rd_smgr->smgr_ao);
}
return desc;
}
diff --git a/src/backend/access/aocs/aocsam_handler.c
b/src/backend/access/aocs/aocsam_handler.c
index b17931eed1..86b55ac1dc 100644
--- a/src/backend/access/aocs/aocsam_handler.c
+++ b/src/backend/access/aocs/aocsam_handler.c
@@ -1378,7 +1378,7 @@ aoco_relation_copy_data(Relation rel, const RelFileNode
*newrnode)
*/
RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence, SMGR_AO,
rel);
- copy_append_only_data(rel->rd_node, *newrnode, rel->rd_backend,
rel->rd_rel->relpersistence);
+ copy_append_only_data(rel->rd_node, *newrnode, rel->rd_smgr, dstrel,
rel->rd_backend, rel->rd_rel->relpersistence);
/*
* For append-optimized tables, no forks other than the main fork should
diff --git a/src/backend/access/aocs/test/aocsam_test.c
b/src/backend/access/aocs/test/aocsam_test.c
index b3c38cd8b7..fddd0a1baf 100644
--- a/src/backend/access/aocs/test/aocsam_test.c
+++ b/src/backend/access/aocs/test/aocsam_test.c
@@ -5,6 +5,7 @@
#include "postgres.h"
#include "utils/memutils.h"
+#include "storage/smgr.h"
#include "../aocsam.c"
@@ -19,8 +20,11 @@ test__aocs_begin_headerscan(void **state)
{
AOCSHeaderScanDesc desc;
RelationData reldata;
+ SMgrRelationData smgrdata;
FormData_pg_class pgclass;
+ memset(&reldata, 0, sizeof(RelationData));
+
reldata.rd_rel = &pgclass;
reldata.rd_id = 12345;
StdRdOptions opt;
@@ -28,6 +32,10 @@ test__aocs_begin_headerscan(void **state)
opt.blocksize = 8192 * 5;
StdRdOptions *opts[1];
+ smgrdata.smgr_ao = smgrAOGetDefault();
+ reldata.rd_smgr = &smgrdata;
+ reldata.rd_backend = InvalidBackendId;
+
opts[0] = &opt;
strncpy(&pgclass.relname.data[0], "mock_relation", 13);
@@ -63,6 +71,7 @@ test__aocs_addcol_init(void **state)
{
AOCSAddColumnDesc desc;
RelationData reldata;
+ SMgrRelationData smgrdata;
int nattr = 5;
StdRdOptions **opts =
(StdRdOptions **) malloc(sizeof(StdRdOptions *) * nattr);
@@ -98,6 +107,8 @@ test__aocs_addcol_init(void **state)
expect_value(create_datumstreamwrite, needsWAL, true);
expect_any(create_datumstreamwrite, rnode);
expect_any(create_datumstreamwrite, rnode);
+ expect_any(create_datumstreamwrite, smgrAO);
+ expect_any(create_datumstreamwrite, smgrAO);
expect_any_count(create_datumstreamwrite, attr, 2);
expect_any_count(create_datumstreamwrite, relname, 2);
expect_any_count(create_datumstreamwrite, title, 2);
@@ -112,6 +123,9 @@ test__aocs_addcol_init(void **state)
memset(reldata.rd_att->attrs, 0, sizeof(Form_pg_attribute *) * nattr);
reldata.rd_att->natts = nattr;
+ smgrdata.smgr_ao = smgrAOGetDefault();
+ reldata.rd_smgr = &smgrdata;
+
expect_value(GetAppendOnlyEntryAttributes, relid, 12345);
expect_any(GetAppendOnlyEntryAttributes, blocksize);
expect_any(GetAppendOnlyEntryAttributes, safefswritesize);
diff --git a/src/backend/access/appendonly/aomd.c
b/src/backend/access/appendonly/aomd.c
index f8114119dd..20d0cb564c 100644
--- a/src/backend/access/appendonly/aomd.c
+++ b/src/backend/access/appendonly/aomd.c
@@ -156,7 +156,10 @@ OpenAOSegmentFile(Relation rel,
File fd;
errno = 0;
- fd = PathNameOpenFile(filepathname, fileFlags);
+
+ RelationOpenSmgr(rel);
+
+ fd = rel->rd_smgr->smgr_ao->smgr_AORelOpenSegFile(filepathname,
fileFlags);
if (fd < 0)
{
if (logicalEof == 0 && errno == ENOENT)
@@ -175,9 +178,12 @@ OpenAOSegmentFile(Relation rel,
* Close an Append Only relation file segment
*/
void
-CloseAOSegmentFile(File fd)
+CloseAOSegmentFile(File fd, Relation rel)
{
- FileClose(fd);
+ Assert(fd > 0);
+ RelationOpenSmgr(rel);
+
+ rel->rd_smgr->smgr_ao->smgr_FileClose(fd);
}
/*
@@ -192,7 +198,9 @@ TruncateAOSegmentFile(File fd, Relation rel, int32
segFileNum, int64 offset, AOV
Assert(fd > 0);
Assert(offset >= 0);
- filesize_before = FileSize(fd);
+ RelationOpenSmgr(rel);
+
+ filesize_before = rel->rd_smgr->smgr_ao->smgr_FileSize(fd);
if (filesize_before < offset)
ereport(ERROR,
(errmsg("\"%s\": file size smaller than logical
eof: %m",
@@ -202,7 +210,7 @@ TruncateAOSegmentFile(File fd, Relation rel, int32
segFileNum, int64 offset, AOV
* Call the 'fd' module with a 64-bit length since AO segment files
* can be multi-gigabyte to the terabytes...
*/
- if (FileTruncate(fd, offset, WAIT_EVENT_DATA_FILE_TRUNCATE) != 0)
+ if (rel->rd_smgr->smgr_ao->smgr_FileTruncate(fd, offset,
WAIT_EVENT_DATA_FILE_TRUNCATE) != 0)
ereport(ERROR,
(errmsg("\"%s\": failed to truncate data after
eof: %m",
relname)));
@@ -387,7 +395,8 @@ mdunlink_ao_perFile(const int segno, void *ctx)
static void
copy_file(char *srcsegpath, char *dstsegpath,
- RelFileNode dst, int segfilenum, bool use_wal)
+ RelFileNode dst, SMgrRelation srcSMGR, SMgrRelation dstSMGR,
+ int segfilenum, bool use_wal)
{
File srcFile;
File dstFile;
@@ -396,7 +405,7 @@ copy_file(char *srcsegpath, char *dstsegpath,
char *buffer = palloc(BLCKSZ);
int dstflags;
- srcFile = PathNameOpenFile(srcsegpath, O_RDONLY | PG_BINARY);
+ srcFile = srcSMGR->smgr_ao->smgr_AORelOpenSegFile(srcsegpath, O_RDONLY
| PG_BINARY);
if (srcFile < 0)
ereport(ERROR,
(errcode_for_file_access(),
@@ -411,13 +420,13 @@ copy_file(char *srcsegpath, char *dstsegpath,
if (segfilenum)
dstflags |= O_CREAT;
- dstFile = PathNameOpenFile(dstsegpath, dstflags);
+ dstFile = dstSMGR->smgr_ao->smgr_AORelOpenSegFile(dstsegpath, dstflags);
if (dstFile < 0)
ereport(ERROR,
(errcode_for_file_access(),
(errmsg("could not create destination file %s:
%m", dstsegpath))));
- left = FileDiskSize(srcFile);
+ left = srcSMGR->smgr_ao->smgr_FileDiskSize(srcFile);
if (left < 0)
ereport(ERROR,
(errcode_for_file_access(),
@@ -431,13 +440,13 @@ copy_file(char *srcsegpath, char *dstsegpath,
CHECK_FOR_INTERRUPTS();
len = Min(left, BLCKSZ);
- if (FileRead(srcFile, buffer, len, offset,
WAIT_EVENT_DATA_FILE_READ) != len)
+ if (srcSMGR->smgr_ao->smgr_FileRead(srcFile, buffer, len,
offset, WAIT_EVENT_DATA_FILE_READ) != len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read %d bytes from
file \"%s\": %m",
len, srcsegpath)));
- if (FileWrite(dstFile, buffer, len, offset,
WAIT_EVENT_DATA_FILE_WRITE) != len)
+ if (dstSMGR->smgr_ao->smgr_FileWrite(dstFile, buffer, len,
offset, WAIT_EVENT_DATA_FILE_WRITE) != len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write %d bytes to
file \"%s\": %m",
@@ -449,19 +458,21 @@ copy_file(char *srcsegpath, char *dstsegpath,
left -= len;
}
- if (FileSync(dstFile, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) != 0)
+ if (dstSMGR->smgr_ao->smgr_FileSync(dstFile,
WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not fsync file \"%s\": %m",
dstsegpath)));
- FileClose(srcFile);
- FileClose(dstFile);
+ srcSMGR->smgr_ao->smgr_FileClose(srcFile);
+ dstSMGR->smgr_ao->smgr_FileClose(dstFile);
pfree(buffer);
}
struct copy_append_only_data_callback_ctx {
char *srcPath;
char *dstPath;
+ SMgrRelation srcSMGR;
+ SMgrRelation dstSMGR;
RelFileNode src;
RelFileNode dst;
bool useWal;
@@ -473,6 +484,7 @@ struct copy_append_only_data_callback_ctx {
*/
void
copy_append_only_data(RelFileNode src, RelFileNode dst,
+ SMgrRelation srcSMGR, SMgrRelation dstSMGR,
BackendId backendid, char relpersistence)
{
char *srcPath;
@@ -488,10 +500,12 @@ copy_append_only_data(RelFileNode src, RelFileNode dst,
srcPath = relpathbackend(src, backendid, MAIN_FORKNUM);
dstPath = relpathbackend(dst, backendid, MAIN_FORKNUM);
- copy_file(srcPath, dstPath, dst, 0, useWal);
+ copy_file(srcPath, dstPath, dst, srcSMGR, dstSMGR, 0, useWal);
copyFiles.srcPath = srcPath;
copyFiles.dstPath = dstPath;
+ copyFiles.srcSMGR = srcSMGR;
+ copyFiles.dstSMGR = dstSMGR;
copyFiles.src = src;
copyFiles.dst = dst;
copyFiles.useWal = useWal;
@@ -526,7 +540,7 @@ copy_append_only_data_perFile(const int segno, void *ctx)
return false;
}
sprintf(dstSegPath, "%s.%u", copyFiles->dstPath, segno);
- copy_file(srcSegPath, dstSegPath, copyFiles->dst, segno,
copyFiles->useWal);
+ copy_file(srcSegPath, dstSegPath, copyFiles->dst, copyFiles->srcSMGR,
copyFiles->dstSMGR, segno, copyFiles->useWal);
return true;
}
@@ -595,7 +609,7 @@ truncate_ao_perFile(const int segno, void *ctx)
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, segno, 0, NULL);
- CloseAOSegmentFile(fd);
+ CloseAOSegmentFile(fd, aorel);
}
else
{
diff --git a/src/backend/access/appendonly/appendonly_compaction.c
b/src/backend/access/appendonly/appendonly_compaction.c
index 356a07d153..261d823247 100644
--- a/src/backend/access/appendonly/appendonly_compaction.c
+++ b/src/backend/access/appendonly/appendonly_compaction.c
@@ -94,7 +94,7 @@ AppendOnlyCompaction_DropSegmentFile(Relation aorel, int
segno, AOVacuumRelStats
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, fileSegNo, 0, vacrelstats);
- CloseAOSegmentFile(fd);
+ CloseAOSegmentFile(fd, aorel);
}
else
{
@@ -259,7 +259,7 @@ AppendOnlySegmentFileTruncateToEOF(Relation aorel, int
segno, int64 segeof, AOVa
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, fileSegNo, segeof,
vacrelstats);
- CloseAOSegmentFile(fd);
+ CloseAOSegmentFile(fd, aorel);
elogif(Debug_appendonly_print_compaction, LOG,
"Successfully truncated AO ROW relation \"%s.%s\",
relation id %u, relfilenode %lu (physical segment file #%d, logical EOF "
INT64_FORMAT ")",
diff --git a/src/backend/access/appendonly/appendonlyam.c
b/src/backend/access/appendonly/appendonlyam.c
index 3ae8be2224..5741b17102 100755
--- a/src/backend/access/appendonly/appendonlyam.c
+++ b/src/backend/access/appendonly/appendonlyam.c
@@ -165,6 +165,7 @@ SetNextFileSegForRead(AppendOnlyScanDesc scan)
if (!scan->initedStorageRoutines)
{
PGFunction *fns = NULL;
+ RelationOpenSmgr(reln);
AppendOnlyStorageRead_Init(
&scan->storageRead,
@@ -173,7 +174,7 @@ SetNextFileSegForRead(AppendOnlyScanDesc scan)
NameStr(scan->aos_rd->rd_rel->relname),
scan->title,
&scan->storageAttributes,
-
&scan->aos_rd->rd_node);
+
&scan->aos_rd->rd_node, reln->rd_smgr->smgr_ao);
/*
* There is no guarantee that the current memory context will be
@@ -287,6 +288,7 @@ SetNextFileSegForRead(AppendOnlyScanDesc scan)
Assert(scan->initedStorageRoutines);
+
AppendOnlyStorageRead_OpenFile(
&scan->storageRead,
scan->aos_filenamepath,
@@ -2219,6 +2221,8 @@ appendonly_fetch_init(Relation relation,
aoFetchDesc->lastSequence[segno] =
ReadLastSequence(aoFormData.segrelid, segno);
}
+ RelationOpenSmgr(relation);
+
AppendOnlyStorageRead_Init(
&aoFetchDesc->storageRead,
aoFetchDesc->initContext,
@@ -2226,7 +2230,7 @@ appendonly_fetch_init(Relation relation,
NameStr(aoFetchDesc->relation->rd_rel->relname),
aoFetchDesc->title,
&aoFetchDesc->storageAttributes,
- &relation->rd_node);
+ &relation->rd_node,
relation->rd_smgr->smgr_ao);
fns = get_funcs_for_compression(NameStr(aoFormData.compresstype));
@@ -2758,6 +2762,8 @@ appendonly_insert_init(Relation rel, int segno)
RelationGetRelationName(aoInsertDesc->aoi_rel));
aoInsertDesc->title = titleBuf.data;
+ RelationOpenSmgr(rel);
+
AppendOnlyStorageWrite_Init(
&aoInsertDesc->storageWrite,
NULL,
@@ -2765,7 +2771,7 @@ appendonly_insert_init(Relation rel, int segno)
RelationGetRelationName(aoInsertDesc->aoi_rel),
aoInsertDesc->title,
&aoInsertDesc->storageAttributes,
- XLogIsNeeded()
&& RelationNeedsWAL(aoInsertDesc->aoi_rel));
+ XLogIsNeeded()
&& RelationNeedsWAL(aoInsertDesc->aoi_rel), rel->rd_smgr->smgr_ao);
aoInsertDesc->storageWrite.compression_functions = fns;
aoInsertDesc->storageWrite.compressionState = cs;
diff --git a/src/backend/access/appendonly/appendonlyam_handler.c
b/src/backend/access/appendonly/appendonlyam_handler.c
index 3aae9b21ee..41c7d00157 100644
--- a/src/backend/access/appendonly/appendonlyam_handler.c
+++ b/src/backend/access/appendonly/appendonlyam_handler.c
@@ -1243,7 +1243,7 @@ appendonly_relation_copy_data(Relation rel, const
RelFileNode *newrnode)
*/
RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence, SMGR_AO,
rel);
- copy_append_only_data(rel->rd_node, *newrnode, rel->rd_backend,
rel->rd_rel->relpersistence);
+ copy_append_only_data(rel->rd_node, *newrnode, rel->rd_smgr, dstrel,
rel->rd_backend, rel->rd_rel->relpersistence);
/*
* For append-optimized tables, no forks other than the main fork should
diff --git a/src/backend/cdb/cdbappendonlystorageread.c
b/src/backend/cdb/cdbappendonlystorageread.c
index 76f9b4c8f5..4b354327af 100755
--- a/src/backend/cdb/cdbappendonlystorageread.c
+++ b/src/backend/cdb/cdbappendonlystorageread.c
@@ -63,7 +63,8 @@ AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead,
char *relationName,
char *title,
AppendOnlyStorageAttributes
*storageAttributes,
- RelFileNode *relFileNode)
+ RelFileNode *relFileNode,
+ const struct f_smgr_ao
*smgrAO)
{
uint8 *memory;
int32 memoryLen;
@@ -117,7 +118,8 @@ AppendOnlyStorageRead_Init(AppendOnlyStorageRead
*storageRead,
storageRead->maxBufferLen,
storageRead->largeReadLen,
relationName,
- relFileNode);
+ relFileNode,
+ smgrAO);
elogif(Debug_appendonly_print_scan ||
Debug_appendonly_print_read_block, LOG,
"Append-Only Storage Read initialize for table '%s' "
@@ -135,6 +137,8 @@ AppendOnlyStorageRead_Init(AppendOnlyStorageRead
*storageRead,
MemoryContextSwitchTo(oldMemoryContext);
storageRead->isActive = true;
+
+ storageRead->smgrAO = smgrAO;
}
/*
@@ -244,7 +248,7 @@ AppendOnlyStorageRead_DoOpenFile(AppendOnlyStorageRead
*storageRead,
/*
* Open the file for read.
*/
- file = PathNameOpenFile(filePathName, fileFlags);
+ file = storageRead->smgrAO->smgr_AORelOpenSegFile(filePathName,
fileFlags);
return file;
}
@@ -316,6 +320,7 @@ AppendOnlyStorageRead_OpenFile(AppendOnlyStorageRead
*storageRead,
Assert(storageRead != NULL);
Assert(storageRead->isActive);
Assert(filePathName != NULL);
+ Assert(storageRead->smgrAO);
/*
* The EOF must be greater than 0, otherwise we risk transactionally
diff --git a/src/backend/cdb/cdbappendonlystoragewrite.c
b/src/backend/cdb/cdbappendonlystoragewrite.c
index 16e5569378..f2df686f17 100755
--- a/src/backend/cdb/cdbappendonlystoragewrite.c
+++ b/src/backend/cdb/cdbappendonlystoragewrite.c
@@ -69,7 +69,8 @@ AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite
*storageWrite,
char *relationName,
char *title,
AppendOnlyStorageAttributes *storageAttributes,
- bool needsWAL)
+ bool needsWAL,
+ const struct f_smgr_ao
*smgrAO)
{
uint8 *memory;
int32 memoryLen;
@@ -82,10 +83,14 @@ AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite
*storageWrite,
Assert(relationName != NULL);
Assert(storageAttributes != NULL);
+ Assert(smgrAO != NULL);
+
/* UNDONE: Range check fields in storageAttributes */
MemSet(storageWrite, 0, sizeof(AppendOnlyStorageWrite));
+ storageWrite->smgrAO = smgrAO;
+
storageWrite->maxBufferLen = maxBufferLen;
if (memoryContext == NULL)
@@ -147,7 +152,8 @@ AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite
*storageWrite,
memoryLen,
storageWrite->maxBufferWithCompressionOverrrunLen,
storageWrite->maxLargeWriteLen,
- relationName);
+ relationName,
+ smgrAO);
elogif(Debug_appendonly_print_insert ||
Debug_appendonly_print_append_block, LOG,
"Append-Only Storage Write initialize for table '%s'
(compression = %s, compression level %d, maximum buffer length %d, large write
length %d)",
@@ -319,7 +325,7 @@ AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite
*storageWrite,
errno = 0;
int fileFlags = O_RDWR | PG_BINARY;
- file = PathNameOpenFile(path, fileFlags);
+ file = storageWrite->smgrAO->smgr_AORelOpenSegFile(path, fileFlags);
if (file < 0)
ereport(ERROR,
(errcode_for_file_access(),
diff --git a/src/backend/cdb/cdbappendonlyxlog.c
b/src/backend/cdb/cdbappendonlyxlog.c
index 23cec513ac..68cea8e3fa 100644
--- a/src/backend/cdb/cdbappendonlyxlog.c
+++ b/src/backend/cdb/cdbappendonlyxlog.c
@@ -67,6 +67,13 @@ ao_insert_replay(XLogReaderState *record)
xl_ao_insert *xlrec = (xl_ao_insert *) XLogRecGetData(record);
char *buffer = (char *) xlrec + SizeOfAOInsert;
uint32 len = XLogRecGetDataLen(record) - SizeOfAOInsert;
+ SMgrRelation smgr;
+
+ /*
+ * Open the relation at smgr level. Relations using shared buffers need
+ * the default SMGR implementation.
+ */
+ smgr = smgropen(xlrec->target.node, InvalidBackendId, SMGR_AO, NULL);
dbPath = GetDatabasePath(xlrec->target.node.dbNode,
xlrec->target.node.spcNode);
@@ -82,14 +89,14 @@ ao_insert_replay(XLogReaderState *record)
/* When writing from the beginning of the file, it might not exist yet.
Create it. */
if (xlrec->target.offset == 0)
fileFlags |= O_CREAT;
- file = PathNameOpenFile(path, fileFlags);
+ file = smgr->smgr_ao->smgr_AORelOpenSegFile(path, fileFlags);
if (file < 0)
{
XLogAOSegmentFile(xlrec->target.node,
xlrec->target.segment_filenum);
return;
}
- written_len = FileWrite(file, buffer, len, xlrec->target.offset,
+ written_len = smgr->smgr_ao->smgr_FileWrite(file, buffer, len,
xlrec->target.offset,
WAIT_EVENT_COPY_FILE_WRITE);
if (written_len < 0 || written_len != len)
{
@@ -104,7 +111,7 @@ ao_insert_replay(XLogReaderState *record)
xlrec->target.segment_filenum,
file);
- FileClose(file);
+ smgr->smgr_ao->smgr_FileClose(file);
}
/*
@@ -130,9 +137,17 @@ ao_truncate_replay(XLogReaderState *record)
char *dbPath;
char path[MAXPGPATH];
File file;
+ SMgrRelation smgr;
xl_ao_truncate *xlrec = (xl_ao_truncate*) XLogRecGetData(record);
+ /*
+ * Open the relation at smgr level. Relations using shared buffers need
+ * the default SMGR implementation.
+ */
+ smgr = smgropen(xlrec->target.node, InvalidBackendId, SMGR_AO, NULL);
+
+
dbPath = GetDatabasePath(xlrec->target.node.dbNode,
xlrec->target.node.spcNode);
@@ -143,7 +158,7 @@ ao_truncate_replay(XLogReaderState *record)
pfree(dbPath);
dbPath = NULL;
- file = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+ file = smgr->smgr_ao->smgr_AORelOpenSegFile(path, O_RDWR | PG_BINARY);
if (file < 0)
{
/*
diff --git a/src/backend/cdb/cdbbufferedappend.c
b/src/backend/cdb/cdbbufferedappend.c
index 96a5e7edf4..dac215be58 100644
--- a/src/backend/cdb/cdbbufferedappend.c
+++ b/src/backend/cdb/cdbbufferedappend.c
@@ -58,7 +58,8 @@ BufferedAppendInit(BufferedAppend *bufferedAppend,
int32 memoryLen,
int32 maxBufferWithCompressionOverrrunLen,
int32 maxLargeWriteLen,
- char *relationName)
+ char *relationName,
+ const struct f_smgr_ao *smgrAO)
{
Assert(bufferedAppend != NULL);
Assert(memory != NULL);
@@ -100,6 +101,8 @@ BufferedAppendInit(BufferedAppend *bufferedAppend,
bufferedAppend->file = -1;
bufferedAppend->filePathName = NULL;
bufferedAppend->fileLen = 0;
+
+ bufferedAppend->smgrAO = smgrAO;
}
/*
@@ -158,7 +161,7 @@ BufferedAppendWrite(BufferedAppend *bufferedAppend, bool
needsWAL)
{
int32 byteswritten;
- byteswritten = FileWrite(bufferedAppend->file,
+ byteswritten =
bufferedAppend->smgrAO->smgr_FileWrite(bufferedAppend->file,
(char *)
largeWriteMemory + bytestotal,
bytesleft,
bufferedAppend->largeWritePosition + bytestotal,
diff --git a/src/backend/cdb/cdbbufferedread.c
b/src/backend/cdb/cdbbufferedread.c
index 0012663007..0232490db5 100644
--- a/src/backend/cdb/cdbbufferedread.c
+++ b/src/backend/cdb/cdbbufferedread.c
@@ -55,14 +55,14 @@ BufferedReadMemoryLen(
* determine the amount of memory to supply.
*/
void
-BufferedReadInit(
- BufferedRead *bufferedRead,
+BufferedReadInit(BufferedRead *bufferedRead,
uint8 *memory,
int32 memoryLen,
int32 maxBufferLen,
int32 maxLargeReadLen,
char *relationName,
- RelFileNode *file_node)
+ RelFileNode *file_node,
+ const struct f_smgr_ao *smgr)
{
Assert(bufferedRead != NULL);
Assert(memory != NULL);
@@ -113,6 +113,8 @@ BufferedReadInit(
*/
bufferedRead->haveTemporaryLimitInEffect = false;
bufferedRead->temporaryLimitFileLen = 0;
+
+ bufferedRead->smgrAO = smgr;
}
/*
@@ -178,7 +180,7 @@ BufferedReadIo(
offset = 0;
while (largeReadLen > 0)
{
- int actualLen = FileRead(bufferedRead->file,
+ int actualLen =
bufferedRead->smgrAO->smgr_FileRead(bufferedRead->file,
(char *) largeReadMemory,
largeReadLen,
bufferedRead->fileOff,
diff --git a/src/backend/cdb/test/cdbbufferedread_test.c
b/src/backend/cdb/test/cdbbufferedread_test.c
index 75493ed362..de43ef4a11 100644
--- a/src/backend/cdb/test/cdbbufferedread_test.c
+++ b/src/backend/cdb/test/cdbbufferedread_test.c
@@ -17,12 +17,13 @@ test__BufferedReadInit__IsConsistent(void **state)
int32 maxBufferLen = 128;
int32 maxLargeReadLen = 128;
RelFileNode file_node = {0};
+ const struct f_smgr_ao *smgrAO = smgrAOGetDefault();
memset(bufferedRead, 0 , sizeof(BufferedRead));
/*
* Call the function so as to set the above values.
*/
- BufferedReadInit(bufferedRead, memory, memoryLen, maxBufferLen,
maxLargeReadLen, relname, &file_node);
+ BufferedReadInit(bufferedRead, memory, memoryLen, maxBufferLen,
maxLargeReadLen, relname, &file_node, smgrAO);
/*
* Check for consistency
*/
@@ -47,12 +48,13 @@ test__BufferedReadUseBeforeBuffer__IsNextReadLenZero(void
**state)
int32 nextBufferLen;
int32 maxReadAheadLen = 64;
RelFileNode file_node = {0};
+ const struct f_smgr_ao *smgrAO = smgrAOGetDefault();
memset(bufferedRead, 0 , sizeof(BufferedRead));
/*
* Initialize the buffer
*/
- BufferedReadInit(bufferedRead, memory, memoryLen, maxBufferLen,
maxLargeReadLen, relname, &file_node);
+ BufferedReadInit(bufferedRead, memory, memoryLen, maxBufferLen,
relname, maxLargeReadLen, &file_node, smgrAO);
/*
* filling up the bufferedRead struct
*/
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 11cfbf6e77..4a332b5e61 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -100,6 +100,21 @@ static const f_smgr smgrsw[] = {
}
};
+static const f_smgr_ao smgrswao[] = {
+ /* regular file */
+ {
+ .smgr_FileClose = FileClose,
+ .smgr_FileDiskSize = FileDiskSize,
+ .smgr_FileTruncate = FileTruncate,
+ .smgr_AORelOpenSegFile = PathNameOpenFile,
+ .smgr_FileWrite = FileWrite,
+ .smgr_FileRead = FileRead,
+ .smgr_FileSize = FileSize,
+ .smgr_FileSync = FileSync,
+ },
+};
+
+
static const int NSmgr = lengthof(smgrsw);
/*
@@ -155,6 +170,11 @@ smgrshutdown(int code, Datum arg)
}
}
+const struct f_smgr_ao *
+smgrAOGetDefault(void) {
+ return &smgrswao[0];
+}
+
/*
* smgropen() -- Return an SMgrRelation object, creating it if need be.
*
@@ -204,6 +224,8 @@ smgropen(RelFileNode rnode, BackendId backend, SMgrImpl
which, Relation rel)
dlist_push_tail(&unowned_relns, &reln->node);
reln->smgr = &smgrsw[reln->smgr_which];
+ reln->smgr_ao = &smgrswao[0];
+
/*
* hook for other storage managers.
*/
@@ -211,6 +233,7 @@ smgropen(RelFileNode rnode, BackendId backend, SMgrImpl
which, Relation rel)
(*smgr_hook) (reln, backend, which, rel);
Assert(reln->smgr);
+ Assert(reln->smgr_ao);
(*reln->smgr).smgr_open(reln);
}
diff --git a/src/backend/utils/datumstream/datumstream.c
b/src/backend/utils/datumstream/datumstream.c
index 5588d97820..2fd3ab0775 100644
--- a/src/backend/utils/datumstream/datumstream.c
+++ b/src/backend/utils/datumstream/datumstream.c
@@ -500,7 +500,8 @@ create_datumstreamwrite(
char *relname,
char *title,
bool needsWAL,
- RelFileNodeBackend *rnode)
+ RelFileNodeBackend *rnode,
+ const struct f_smgr_ao *smgrAO)
{
DatumStreamWrite *acc = palloc0(sizeof(DatumStreamWrite));
@@ -569,7 +570,8 @@ create_datumstreamwrite(
relname,
title,
&acc->ao_attr,
- needsWAL);
+ needsWAL,
+ smgrAO);
acc->ao_write.compression_functions = compressionFunctions;
acc->ao_write.compressionState = compressionState;
@@ -645,7 +647,7 @@ create_datumstreamread(
Form_pg_attribute attr,
char *relname,
char *title,
- RelFileNode *relFileNode)
+ RelFileNode *relFileNode, const
struct f_smgr_ao *smgrAO)
{
DatumStreamRead *acc = palloc0(sizeof(DatumStreamRead));
@@ -702,7 +704,8 @@ create_datumstreamread(
relname,
title,
&acc->ao_attr,
- relFileNode);
+ relFileNode,
+ smgrAO);
acc->ao_read.compression_functions = compressionFunctions;
acc->ao_read.compressionState = compressionState;
diff --git a/src/include/access/aomd.h b/src/include/access/aomd.h
index ed3e207adf..9666ad3974 100644
--- a/src/include/access/aomd.h
+++ b/src/include/access/aomd.h
@@ -17,6 +17,7 @@
#include "htup_details.h"
#include "storage/fd.h"
+#include "storage/smgr.h"
#include "utils/rel.h"
struct AOVacuumRelStats;
@@ -40,7 +41,7 @@ extern File OpenAOSegmentFile(Relation rel,
char *filepathname,
int64 logicalEof);
-extern void CloseAOSegmentFile(File fd);
+extern void CloseAOSegmentFile(File fd, Relation rel);
extern void
TruncateAOSegmentFile(File fd,
@@ -55,7 +56,8 @@ extern void
mdunlink_ao(RelFileNodeBackend rnode, ForkNumber forkNumber, bool isRedo);
extern void
-copy_append_only_data(RelFileNode src, RelFileNode dst, BackendId backendid,
char relpersistence);
+copy_append_only_data(RelFileNode src, RelFileNode dst,
+ SMgrRelation srcSMGR, SMgrRelation dstSMGR, BackendId backendid, char
relpersistence);
/*
* return value should be true if the callback was able to find the given
diff --git a/src/include/cdb/cdbappendonlystorageread.h
b/src/include/cdb/cdbappendonlystorageread.h
index 0f2cccfea3..879d054e52 100755
--- a/src/include/cdb/cdbappendonlystorageread.h
+++ b/src/include/cdb/cdbappendonlystorageread.h
@@ -16,6 +16,7 @@
#include "catalog/pg_appendonly.h"
#include "catalog/pg_compression.h"
+#include "storage/smgr.h"
#include "cdb/cdbappendonlystorage.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "cdb/cdbbufferedread.h"
@@ -191,6 +192,7 @@ typedef struct AppendOnlyStorageRead
* pointers. The array index
* corresponds to COMP_FUNC_* */
+ const struct f_smgr_ao *smgrAO;
} AppendOnlyStorageRead;
extern void AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead,
@@ -198,7 +200,7 @@ extern void
AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead,
int32 maxBufferLen,
char *relationName, char
*title,
AppendOnlyStorageAttributes
*storageAttributes,
- RelFileNode *relFileNode);
+ RelFileNode *relFileNode,
const struct f_smgr_ao *smgrAO);
extern char *AppendOnlyStorageRead_RelationName(AppendOnlyStorageRead
*storageRead);
extern char *AppendOnlyStorageRead_SegmentFileName(AppendOnlyStorageRead
*storageRead);
diff --git a/src/include/cdb/cdbappendonlystoragewrite.h
b/src/include/cdb/cdbappendonlystoragewrite.h
index acbdfe0211..d7493f6103 100755
--- a/src/include/cdb/cdbappendonlystoragewrite.h
+++ b/src/include/cdb/cdbappendonlystoragewrite.h
@@ -176,6 +176,8 @@ typedef struct AppendOnlyStorageWrite
bool needsWAL;
+ const struct f_smgr_ao *smgrAO;
+
} AppendOnlyStorageWrite;
extern void AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite *storageWrite,
@@ -184,7 +186,9 @@ extern void
AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite *storageWrite,
char *relationName,
char *title,
AppendOnlyStorageAttributes *storageAttributes,
-
bool needsWAL);
+
bool needsWAL,
+
const struct f_smgr_ao *smgrAO);
+
extern void AppendOnlyStorageWrite_FinishSession(AppendOnlyStorageWrite
*storageWrite);
extern void
AppendOnlyStorageWrite_TransactionCreateFile(AppendOnlyStorageWrite
*storageWrite,
diff --git a/src/include/cdb/cdbbufferedappend.h
b/src/include/cdb/cdbbufferedappend.h
index 985632a096..27950032c9 100644
--- a/src/include/cdb/cdbbufferedappend.h
+++ b/src/include/cdb/cdbbufferedappend.h
@@ -79,6 +79,7 @@ typedef struct BufferedAppend
int64 fileLen;
int64 fileLen_uncompressed; /* for
calculating compress ratio */
+ const struct f_smgr_ao *smgrAO;
} BufferedAppend;
/*
@@ -102,7 +103,8 @@ extern void BufferedAppendInit(
int32 memoryLen,
int32 maxBufferWithCompressionOverrrunLen,
int32 maxLargeWriteLen,
- char *relationName);
+ char *relationName,
+ const struct f_smgr_ao *smgrAO);
/*
* Takes an open file handle for the next file.
diff --git a/src/include/cdb/cdbbufferedread.h
b/src/include/cdb/cdbbufferedread.h
index 7e17669838..d4824aed40 100644
--- a/src/include/cdb/cdbbufferedread.h
+++ b/src/include/cdb/cdbbufferedread.h
@@ -80,6 +80,7 @@ typedef struct BufferedRead
bool haveTemporaryLimitInEffect;
int64 temporaryLimitFileLen;
+ const struct f_smgr_ao *smgrAO;
} BufferedRead;
/*
@@ -104,7 +105,8 @@ extern void BufferedReadInit(
int32 maxBufferLen,
int32 maxLargeReadLen,
char *relationName,
- RelFileNode *file_node);
+ RelFileNode *file_node,
+ const struct f_smgr_ao *smgr);
/*
* Takes an open file handle for the next file.
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 3751900cd0..38bafde749 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -21,6 +21,7 @@
#include "storage/block.h"
#include "storage/relfilenode.h"
#include "storage/dbdirnode.h"
+#include "storage/fd.h"
#include "utils/relcache.h"
typedef enum SMgrImplementation
@@ -73,6 +74,8 @@ typedef struct SMgrRelationData
char smgr_relpersistence;
/* pointer to storage manager */
const struct f_smgr *smgr;
+ /*pointer to AO storage manager */
+ const struct f_smgr_ao *smgr_ao;
/*
* Fields below here are intended to be private to smgr.c and its
@@ -127,6 +130,18 @@ typedef struct f_smgr
void (*smgr_immedsync) (SMgrRelation reln, ForkNumber
forknum);
} f_smgr;
+typedef struct f_smgr_ao {
+ off_t (*smgr_FileDiskSize) (File file);
+ void (*smgr_FileClose) (File file);
+ int (*smgr_FileTruncate) (File file, int64 offset,
uint32 wait_event_info);
+ File (*smgr_AORelOpenSegFile) (const char *filePath,
int fileFlags);
+ int (*smgr_FileWrite) (File file, char *buffer, int
amount, off_t offset, uint32 wait_event_info);
+ int (*smgr_FileRead) (File file, char *buffer, int
amount, off_t offset, uint32 wait_event_info);
+ off_t (*smgr_FileSize) (File file);
+ int (*smgr_FileSync) (File file, uint32
wait_event_info);
+} f_smgr_ao;
+
+
typedef void (*smgr_init_hook_type) (void);
typedef void (*smgr_hook_type) (SMgrRelation reln, BackendId backend, SMgrImpl
which, Relation rel);
typedef void (*smgr_shutdown_hook_type) (void);
@@ -166,6 +181,8 @@ extern void smgrtruncate(SMgrRelation reln, ForkNumber
*forknum,
extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void AtEOXact_SMgr(void);
+extern const struct f_smgr_ao * smgrAOGetDefault(void);
+
/*
* Hook for plugins to collect statistics from storage functions
diff --git a/src/include/utils/datumstream.h b/src/include/utils/datumstream.h
index 5c98fc4f93..a124c62aef 100644
--- a/src/include/utils/datumstream.h
+++ b/src/include/utils/datumstream.h
@@ -263,7 +263,8 @@ extern DatumStreamWrite *create_datumstreamwrite(
char *relname,
char *title,
bool needsWAL,
- RelFileNodeBackend *rnode);
+ RelFileNodeBackend *rnode,
+ const struct f_smgr_ao *smgrAO);
extern DatumStreamRead *create_datumstreamread(
char *compName,
@@ -274,7 +275,8 @@ extern DatumStreamRead *create_datumstreamread(
Form_pg_attribute attr,
char *relname,
char *title,
- RelFileNode *relFileNode);
+ RelFileNode *relFileNode,
+ const struct f_smgr_ao *smgrAO);
extern void datumstreamwrite_open_file(
DatumStreamWrite * ds,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]