Repository: incubator-hawq Updated Branches: refs/heads/master 30b30449a -> c72e58946
HAWQ-1600. Parquet table data vectorized scan Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/cfee2c51 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/cfee2c51 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/cfee2c51 Branch: refs/heads/master Commit: cfee2c51d7b28d23c644e903d544b3645c42b9a9 Parents: 30b3044 Author: Weinan Wang <wew...@pivotal.io> Authored: Fri Mar 30 10:05:56 2018 +0800 Committer: Weinan Wang <wew...@pivotal.io> Committed: Fri Mar 30 13:38:29 2018 +0800 ---------------------------------------------------------------------- contrib/vexecutor/Makefile | 2 +- contrib/vexecutor/execVQual.c | 125 ++++++++++++++++++ contrib/vexecutor/execVQual.h | 40 ++++++ contrib/vexecutor/execVScan.c | 44 ++++++- contrib/vexecutor/execVScan.h | 3 + contrib/vexecutor/parquet_reader.c | 194 ++++++++++++++++++++++++++++ contrib/vexecutor/parquet_reader.h | 39 ++++++ contrib/vexecutor/tuplebatch.c | 6 +- contrib/vexecutor/vadt.c | 19 ++- contrib/vexecutor/vadt.h | 11 +- contrib/vexecutor/vcheck.c | 12 +- contrib/vexecutor/vcheck.h | 2 + contrib/vexecutor/vexecutor.c | 6 +- src/backend/access/parquet/parquetam.c | 4 +- 14 files changed, 484 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/Makefile ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/Makefile b/contrib/vexecutor/Makefile index 805485f..0eae623 100644 --- a/contrib/vexecutor/Makefile +++ b/contrib/vexecutor/Makefile @@ -17,7 +17,7 @@ MODULE_big = vexecutor -OBJS = vexecutor.o vadt.o vcheck.o tuplebatch.o execVScan.o +OBJS = vexecutor.o vadt.o vcheck.o tuplebatch.o execVScan.o execVQual.o parquet_reader.o PG_CXXFLAGS = -Wall -O0 -g -std=c++11 PG_LIBS = $(libpq_pgport) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/execVQual.c ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/execVQual.c b/contrib/vexecutor/execVQual.c new file mode 100644 index 0000000..c4ccf9e --- /dev/null +++ b/contrib/vexecutor/execVQual.c @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "execVQual.h" + +/* + * ExecVariableList + * Evaluates a simple-Variable-list projection. + * + * Results are stored into the passed values and isnull arrays. + */ +static void +ExecVecVariableList(ProjectionInfo *projInfo, + Datum *values) +{ + ExprContext *econtext = projInfo->pi_exprContext; + int *varSlotOffsets = projInfo->pi_varSlotOffsets; + int *varNumbers = projInfo->pi_varNumbers; + TupleBatch tb = (TupleBatch) values; + int i; + tb->ncols = list_length(projInfo->pi_targetlist); + + /* + * Assign to result by direct extraction of fields from source slots ... a + * mite ugly, but fast ... + */ + for (i = list_length(projInfo->pi_targetlist) - 1; i >= 0; i--) + { + char *slotptr = ((char *) econtext) + varSlotOffsets[i]; + TupleTableSlot *varSlot = *((TupleTableSlot **) slotptr); + int varNumber = varNumbers[i] - 1; + tb->datagroup[i] = ((TupleBatch)varSlot->PRIVATE_tb)->datagroup[varNumber]; + } +} + +TupleTableSlot * +ExecVProject(ProjectionInfo *projInfo, ExprDoneCond *isDone) +{ + TupleTableSlot *slot; + Assert(projInfo != NULL); + + /* + * get the projection info we want + */ + slot = projInfo->pi_slot; + + /* + * Clear any former contents of the result slot. This makes it safe for + * us to use the slot's Datum/isnull arrays as workspace. (Also, we can + * return the slot as-is if we decide no rows can be projected.) + */ + ExecClearTuple(slot); + + /* + * form a new result tuple (if possible); if successful, mark the result + * slot as containing a valid virtual tuple + */ + if (projInfo->pi_isVarList) + { + /* simple Var list: this always succeeds with one result row */ + if (isDone) + *isDone = ExprSingleResult; + + ExecVecVariableList(projInfo,slot->PRIVATE_tb); + ExecStoreVirtualTuple(slot); + } + else + { + elog(FATAL,"does not support expression in projection stmt"); + // if (ExecTargetList(projInfo->pi_targetlist, + // projInfo->pi_exprContext, + // slot_get_values(slot), + // slot_get_isnull(slot), + // (ExprDoneCond *) projInfo->pi_itemIsDone, + // isDone)) + // ExecStoreVirtualTuple(slot); + } + + return slot; +} + +/* + * VirtualNodeProc + * return value indicate whether has a tuple data fill in slot->PRIVATE_tts_values slot + * This function will be invoked in V->N process. + */ +bool +VirtualNodeProc(ScanState* state,TupleTableSlot *slot){ + if(TupIsNull(slot) ) + return false; + + TupleBatch tb = slot->PRIVATE_tb; + ExecClearTuple(slot); + + while (tb->skip[tb->iter] && tb->iter < tb->nrows) + tb->iter++; + + if(tb->iter == tb->nrows) + return false; + + for(int i = 0;i < tb->ncols;i ++) + { + vheader *header = tb->datagroup[i]; + GetVFunc(GetVtype(header->elemtype))->gettypevalue(header,tb->iter,slot->PRIVATE_tts_values + i); + slot->PRIVATE_tts_isnull[i] = header->isnull[tb->iter]; + } + tb->iter ++; + ExecStoreVirtualTuple(slot); + return true; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/execVQual.h ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/execVQual.h b/contrib/vexecutor/execVQual.h new file mode 100644 index 0000000..fb35789 --- /dev/null +++ b/contrib/vexecutor/execVQual.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef __EXEC_VQUAL_H___ +#define __EXEC_VQUAL_H___ +#include "postgres.h" + +#include "access/heapam.h" +#include "cdb/cdbvars.h" +#include "cdb/partitionselection.h" +#include "executor/execdebug.h" +#include "executor/nodeAgg.h" +#include "tuplebatch.h" +#include "vadt.h" + +extern TupleTableSlot * +ExecVProject(ProjectionInfo *projInfo, ExprDoneCond *isDone); + +extern bool +ExecVQual(List *qual, ExprContext *econtext); + +extern bool +VirtualNodeProc(ScanState* state,TupleTableSlot *slot); + +#endif http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/execVScan.c ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/execVScan.c b/contrib/vexecutor/execVScan.c index b0fbd11..44a6854 100644 --- a/contrib/vexecutor/execVScan.c +++ b/contrib/vexecutor/execVScan.c @@ -19,6 +19,8 @@ #include "execVScan.h" #include "miscadmin.h" +#include "execVQual.h" +#include "parquet_reader.h" static TupleTableSlot* ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd); @@ -38,8 +40,8 @@ getVScanMethod(int tableType) }, //PARQUETSCAN { - &ParquetScanNext, &BeginScanParquetRelation, &EndScanParquetRelation, - &ReScanParquetRelation, &MarkRestrNotAllowed, &MarkRestrNotAllowed + &ParquetVScanNext, &BeginScanParquetRelation, &EndScanParquetRelation, + NULL,NULL,NULL } }; @@ -53,6 +55,35 @@ getVScanMethod(int tableType) return &scanMethods[tableType]; } +/* + * ExecTableVScanVirtualLayer + * translate a batch of tuple to single one if scan parent is normal execution node + * + * VirtualNodeProc may not fill tuple data in slot success, due to qualification tag. + * invoke scan table functoin if no tuple can pop up in TupleBatch + */ +TupleTableSlot *ExecTableVScanVirtualLayer(ScanState *scanState) +{ + VectorizedState* vs = (VectorizedState*)scanState->ps.vectorized; + VectorizedState* pvs = vs->parent->vectorized; + + if(pvs->vectorized) + return ExecVScan(scanState,getVScanMethod(scanState->tableType)->accessMethod); + else + { + TupleTableSlot* slot = scanState->ps.ps_ProjInfo ? scanState->ps.ps_ResultTupleSlot : scanState->ss_ScanTupleSlot; + bool succ = VirtualNodeProc(scanState,slot); + + if(!succ) + { + slot = ExecTableVScan(scanState); + VirtualNodeProc(scanState,slot); + } + + return slot; + } +} + TupleTableSlot *ExecTableVScan(ScanState *scanState) { if (scanState->scan_state == SCAN_INIT || @@ -61,6 +92,8 @@ TupleTableSlot *ExecTableVScan(ScanState *scanState) getVScanMethod(scanState->tableType)->beginScanMethod(scanState); } + tbReset(scanState->ss_ScanTupleSlot->PRIVATE_tb); + tbReset(scanState->ps.ps_ResultTupleSlot->PRIVATE_tb); TupleTableSlot *slot = ExecVScan(scanState,getVScanMethod(scanState->tableType)->accessMethod); if (TupIsNull(slot) && !scanState->ps.delayEagerFree) @@ -148,7 +181,10 @@ ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd) * Form a projection tuple, store it in the result tuple slot * and return it. */ - return ExecProject(projInfo, NULL); + ((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->nrows = ((TupleBatch)slot->PRIVATE_tb)->nrows; + memcpy(((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->skip, + ((TupleBatch)slot->PRIVATE_tb)->skip,sizeof(bool) * ((TupleBatch)slot->PRIVATE_tb)->nrows); + return ExecVProject(projInfo, NULL); } else { @@ -163,5 +199,7 @@ ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd) * Tuple fails qual, so free per-tuple memory and try again. */ ResetExprContext(econtext); + tbReset(node->ss_ScanTupleSlot->PRIVATE_tb); + tbReset(node->ps.ps_ResultTupleSlot->PRIVATE_tb); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/execVScan.h ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/execVScan.h b/contrib/vexecutor/execVScan.h index 22eee5b..ced3945 100644 --- a/contrib/vexecutor/execVScan.h +++ b/contrib/vexecutor/execVScan.h @@ -23,5 +23,8 @@ #include "postgres.h" #include "executor/executor.h" + +TupleTableSlot *ExecTableVScanVirtualLayer(ScanState *scanState); + TupleTableSlot *ExecTableVScan(ScanState *scanState); #endif http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/parquet_reader.c ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/parquet_reader.c b/contrib/vexecutor/parquet_reader.c new file mode 100644 index 0000000..301bd65 --- /dev/null +++ b/contrib/vexecutor/parquet_reader.c @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "parquet_reader.h" + +#include "executor/executor.h" +#include "tuplebatch.h" +#include "vcheck.h" + +extern bool getNextRowGroup(ParquetScanDesc scan); +static int +ParquetRowGroupReader_ScanNextTupleBatch( + TupleDesc tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot); + +static void +parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, TupleTableSlot *slot); + +TupleTableSlot * +ParquetVScanNext(ScanState *scanState) +{ + Assert(IsA(scanState, TableScanState) || IsA(scanState, DynamicTableScanState)); + ParquetScanState *node = (ParquetScanState *)scanState; + Assert(node->opaque != NULL && node->opaque->scandesc != NULL); + + parquet_vgetnext(node->opaque->scandesc, node->ss.ps.state->es_direction, node->ss.ss_ScanTupleSlot); + return node->ss.ss_ScanTupleSlot; +} + +static void +parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, TupleTableSlot *slot) +{ + + //AOTupleId aoTupleId; + Assert(ScanDirectionIsForward(direction)); + + for(;;) + { + if(scan->bufferDone) + { + /* + * Get the next row group. We call this function until we + * successfully get a block to process, or finished reading + * all the data (all 'segment' files) for this relation. + */ + while(!getNextRowGroup(scan)) + { + /* have we read all this relation's data. done! */ + if(scan->pqs_done_all_splits) + { + ExecClearTuple(slot); + return /*NULL*/; + } + } + scan->bufferDone = false; + } + + int row_num = ParquetRowGroupReader_ScanNextTupleBatch( + scan->pqs_tupDesc, + &scan->rowGroupReader, + scan->hawqAttrToParquetColChunks, + scan->proj, + slot); + if(row_num > 0) + return; + + /* no more items in the row group, get new buffer */ + scan->bufferDone = true; + } +} + +/* + * Get next tuple batch from current row group into slot. + * + * Return false if current row group has no tuple left, true otherwise. + */ +static int +ParquetRowGroupReader_ScanNextTupleBatch( + TupleDesc tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot) +{ + Assert(slot); + + if (rowGroupReader->rowRead >= rowGroupReader->rowCount) + { + ParquetRowGroupReader_FinishedScanRowGroup(rowGroupReader); + return false; + } + + /* + * get the next item (tuple) from the row group + */ + int ncol = slot->tts_tupleDescriptor->natts; + TupleBatch tb = (TupleBatch )slot->PRIVATE_tb; + + tb->nrows = 0; + if (rowGroupReader->rowRead + tb->batchsize > rowGroupReader->rowCount) { + tb->nrows = rowGroupReader->rowCount-rowGroupReader->rowRead; + rowGroupReader->rowRead = rowGroupReader->rowCount; + } + else { + tb->nrows = tb->batchsize ; + rowGroupReader->rowRead += tb->batchsize; + } + + int colReaderIndex = 0; + for(int i = 0; i < tb->ncols ; i++) + { + if(projs[i] == false) + continue; + + Oid hawqTypeID = tupDesc->attrs[i]->atttypid; + Oid hawqVTypeID = GetVtype(hawqTypeID); + if(!tb->datagroup[i]) + tbCreateColumn(tb,i,hawqVTypeID); + + vheader* header = tb->datagroup[i]; + header->dim = tb->nrows; + + ParquetColumnReader *nextReader = + &rowGroupReader->columnReaders[colReaderIndex]; + + for(int j = 0;j < tb->nrows; j++) + { + + if(hawqAttrToParquetColNum[i] == 1) + { + ParquetColumnReader_readValue(nextReader, GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j, hawqTypeID); + } + else + { + /* + * Because there are some memory reused inside the whole column reader, so need + * to switch the context from PerTupleContext to rowgroup->context + */ + MemoryContext oldContext = MemoryContextSwitchTo(rowGroupReader->memoryContext); + + switch(hawqTypeID) + { + case HAWQ_TYPE_POINT: + ParquetColumnReader_readPoint(nextReader, GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j); + break; + case HAWQ_TYPE_PATH: + ParquetColumnReader_readPATH(nextReader, GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j); + break; + case HAWQ_TYPE_LSEG: + ParquetColumnReader_readLSEG(nextReader, GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j); + break; + case HAWQ_TYPE_BOX: + ParquetColumnReader_readBOX(nextReader, GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j); + break; + case HAWQ_TYPE_CIRCLE: + ParquetColumnReader_readCIRCLE(nextReader,GetVFunc(hawqVTypeID)->gettypeptr(header,j),header->isnull + j); + break; + case HAWQ_TYPE_POLYGON: + ParquetColumnReader_readPOLYGON(nextReader,GetVFunc(hawqVTypeID)->gettypeptr(header,j),header->isnull + j); + break; + default: + Insist(false); + break; + } + + MemoryContextSwitchTo(oldContext); + } + } + + colReaderIndex += hawqAttrToParquetColNum[i]; + } + + /*construct tuple, and return back*/ + TupSetVirtualTupleNValid(slot, ncol); + return tb->nrows; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/parquet_reader.h ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/parquet_reader.h b/contrib/vexecutor/parquet_reader.h new file mode 100644 index 0000000..2a2f1c0 --- /dev/null +++ b/contrib/vexecutor/parquet_reader.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef __PARQUET_READER__ +#define __PARQUET_READER__ + +#include "postgres.h" +#include "fmgr.h" +#include "funcapi.h" +#include "cdb/cdbparquetfooterprocessor.h" +#include "cdb/cdbparquetfooterserializer.h" +#include "access/parquetmetadata_c++/MetadataInterface.h" +#include "cdb/cdbparquetrowgroup.h" +#include "utils/memutils.h" +#include "utils/palloc.h" +#include "snappy-c.h" +#include "zlib.h" +#include "executor/spi.h" +#include "cdb/cdbparquetam.h" +#include "nodes/print.h" + +TupleTableSlot *ParquetVScanNext(ScanState *node); + +#endif http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/tuplebatch.c ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/tuplebatch.c b/contrib/vexecutor/tuplebatch.c index 3cec6ec..cd8859b 100644 --- a/contrib/vexecutor/tuplebatch.c +++ b/contrib/vexecutor/tuplebatch.c @@ -68,17 +68,17 @@ void tbCreateColumn(TupleBatch tb,int colid,Oid type) return; int bs = tb->batchsize; - GetVFunc(type)->vtbuild((bs)); + tb->datagroup[colid] = GetVFunc(type)->vtbuild((bs)); } void tbfreeColumn(vheader** vh,int colid) { - GetVFunc(vh[colid]->elemtype)->vtfree(&vh[colid]); + GetVFunc(GetVtype(vh[colid]->elemtype))->vtfree(&vh[colid]); } static size_t vtypeSize(vheader *vh) { - return GetVFunc(vh->elemtype)->vtsize(vh); + return GetVFunc(GetVtype(vh->elemtype))->vtsize(vh); } static size_t http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vadt.c ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/vadt.c b/contrib/vexecutor/vadt.c index eaccc33..4e5b766 100644 --- a/contrib/vexecutor/vadt.c +++ b/contrib/vexecutor/vadt.c @@ -39,7 +39,7 @@ size_t v##type##Size(vheader *vh) \ vheader* buildv##type(int n) \ { \ vheader* result; \ - result = (vheader*) palloc0(offsetof(v##type,values) + n * sizeof(type)) + sizeof(Datum); \ + result = (vheader*) palloc0(offsetof(v##type,values) + n * sizeof(type) + sizeof(Datum)) ; \ result->dim = n; \ result->elemtype = typeoid; \ result->isnull = palloc0(sizeof(bool) * n); \ @@ -49,7 +49,7 @@ vheader* buildv##type(int n) \ /* Destroy the vectorized data */ #define FUNCTION_DESTORY(type, typeoid) \ -void destoryv##type(v##header **header) \ +void destroyv##type(vheader **header) \ { \ v##type** ptr = (v##type**) header; \ pfree((*header)->isnull); \ @@ -357,13 +357,28 @@ v##type##const_type##cmpstr(PG_FUNCTION_ARGS) \ FUNCTION_CMP(type) \ FUNCTION_CMP_RCONST(type) +#define FUNCTION_GETPTR(type) \ +type* getptrv##type(vheader *header,int n) \ +{\ + if(n < 0 || n > header->dim) return NULL; \ + v##type* ptr = (v##type *) header; \ + return ptr->values + n; \ +} +#define FUNCTION_GETVALUE(type) \ +void getvaluev##type(vheader *header,int n,Datum *ptr) \ +{ \ + if(n < 0 || n > header->dim) return;\ + *ptr = ((v##type*)header)->values[n];\ +} #define TYPE_DEFINE(type, typeoid) \ FUNCTION_VTYPESIZE(type) \ FUNCTION_OP_ALL(type) \ FUNCTION_BUILD(type, typeoid) \ FUNCTION_DESTORY(type, typeoid) \ + FUNCTION_GETPTR(type) \ + FUNCTION_GETVALUE(type) \ FUNCTION_SERIALIZATION(type,typeoid) \ /* These MACRO will be expanded when the code is compiled. */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vadt.h ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/vadt.h b/contrib/vexecutor/vadt.h index 8b7061a..c8ad3f8 100644 --- a/contrib/vexecutor/vadt.h +++ b/contrib/vexecutor/vadt.h @@ -58,8 +58,15 @@ extern vheader* buildv##type(int n); /* Destroy the vectorized data */ #define FUNCTION_DESTORY_HEADER(type, typeoid) \ -extern void destoryv##type(vheader **header); +extern void destroyv##type(vheader **header); +/* Get a column data pointer */ +#define FUNCTION_GETPTR_HEADER(type) \ +extern type* getptrv##type(vheader *header,int n); + +/* Get a column data */ +#define FUNCTION_GETVALUE_HEADER(type) \ +extern void getvaluev##type(vheader *header,int n,Datum *ptr); /* * IN function for the abstract data types * e.g. Datum vint2in(PG_FUNCTION_ARGS) @@ -186,6 +193,8 @@ extern Datum v##type##deserialization(unsigned char* buf,size_t* len); \ #define TYPE_HEADER(type, typeoid) \ VADT_BUILD(type) \ + FUNCTION_GETPTR_HEADER(type)\ + FUNCTION_GETVALUE_HEADER(type) \ FUNCTION_OP_ALL_HEADER(type) \ FUNCTION_IN_HEADER(type, typeoid) \ FUNCTION_OUT_HEADER(type, typeoid) \ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vcheck.c ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/vcheck.c b/contrib/vexecutor/vcheck.c index 88de429..7e1a591 100644 --- a/contrib/vexecutor/vcheck.c +++ b/contrib/vexecutor/vcheck.c @@ -36,12 +36,12 @@ static const vFuncMap funcMap[] = { - {INT2OID, &buildvint2, &destoryvint2, &vint2Size,&vint2serialization,&vint2deserialization}, - {INT4OID, &buildvint4, &destoryvint4, &vint4Size,&vint4serialization,&vint4deserialization}, - {INT8OID, &buildvint8, &destoryvint8, &vint8Size,&vint8serialization,&vint8deserialization}, - {FLOAT4OID, &buildvfloat4, &destoryvfloat4, &vfloat4Size,&vfloat4serialization,&vfloat4deserialization}, - {FLOAT8OID, &buildvfloat8, &destoryvfloat8, &vfloat8Size,&vfloat8serialization,&vfloat8deserialization}, - {BOOLOID, &buildvbool, &destoryvbool, &vboolSize,&vboolserialization,&vbooldeserialization} + {INT2OID, &buildvint2, &destroyvint2, &getptrvint2,&getvaluevint2,&vint2Size,&vint2serialization,&vint2deserialization}, + {INT4OID, &buildvint4, &destroyvint4, &getptrvint4,&getvaluevint4,&vint4Size,&vint4serialization,&vint4deserialization}, + {INT8OID, &buildvint8, &destroyvint8, &getptrvint8,&getvaluevint8,&vint8Size,&vint8serialization,&vint8deserialization}, + {FLOAT4OID, &buildvfloat4, &destroyvfloat4, &getptrvfloat4,&getvaluevfloat4,&vfloat4Size,&vfloat4serialization,&vfloat4deserialization}, + {FLOAT8OID, &buildvfloat8, &destroyvfloat8, &getptrvfloat8,&getvaluevfloat8,&vfloat8Size,&vfloat8serialization,&vfloat8deserialization}, + {BOOLOID, &buildvbool, &destroyvbool, &getptrvbool,&getvaluevbool,&vboolSize,&vboolserialization,&vbooldeserialization} }; static const int funcMapLen = sizeof(funcMap) /sizeof(vFuncMap); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vcheck.h ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/vcheck.h b/contrib/vexecutor/vcheck.h index 59bc46b..9b3d18c 100644 --- a/contrib/vexecutor/vcheck.h +++ b/contrib/vexecutor/vcheck.h @@ -27,6 +27,8 @@ typedef struct vFuncMap Oid ntype; vheader* (* vtbuild)(int n); void (* vtfree)(vheader **vh); + Datum (* gettypeptr)(vheader *vh,int n); + void (* gettypevalue)(vheader *vh,int n,Datum *ptr); size_t (* vtsize)(vheader *vh); size_t (*serialization)(vheader* vh, unsigned char* buf); Datum (*deserialization)(unsigned char* buf,size_t* len); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vexecutor.c ---------------------------------------------------------------------- diff --git a/contrib/vexecutor/vexecutor.c b/contrib/vexecutor/vexecutor.c index 51cf0f7..4533fdd 100644 --- a/contrib/vexecutor/vexecutor.c +++ b/contrib/vexecutor/vexecutor.c @@ -189,7 +189,7 @@ static TupleTableSlot* VExecProcNode(PlanState *node) case T_ParquetScanState: case T_AppendOnlyScanState: case T_TableScanState: - result = ExecTableVScan((TableScanState*)node); + result = ExecTableVScanVirtualLayer((TableScanState*)node); break; default: break; @@ -208,10 +208,6 @@ static bool VExecEndNode(PlanState *node) { case T_AppendOnlyScanState: case T_ParquetScanState: - tbDestroy((TupleBatch *) (&node->ps_ResultTupleSlot->PRIVATE_tb)); - tbDestroy((TupleBatch *) (&((TableScanState *) node)->ss.ss_ScanTupleSlot->PRIVATE_tb)); - ret = true; - break; case T_TableScanState: ExecEndTableScan((TableScanState *)node); ret = true; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/src/backend/access/parquet/parquetam.c ---------------------------------------------------------------------- diff --git a/src/backend/access/parquet/parquetam.c b/src/backend/access/parquet/parquetam.c index bec8422..4967d0f 100644 --- a/src/backend/access/parquet/parquetam.c +++ b/src/backend/access/parquet/parquetam.c @@ -53,7 +53,7 @@ static void initscan(ParquetScanDesc scan); static bool SetNextFileSegForRead(ParquetScanDesc scan); /*get next row group to read*/ -static bool getNextRowGroup(ParquetScanDesc scan); +bool getNextRowGroup(ParquetScanDesc scan); /*close current scanning file segments*/ static void CloseScannedFileSeg(ParquetScanDesc scan); @@ -327,7 +327,7 @@ void parquet_getnext(ParquetScanDesc scan, ScanDirection direction, /* * You can think of this scan routine as get next "executor" Parquet rowGroup. */ -static bool getNextRowGroup(ParquetScanDesc scan) +bool getNextRowGroup(ParquetScanDesc scan) { if (scan->pqs_need_new_split) {