Repository: incubator-hawq
Updated Branches:
  refs/heads/master 30b30449a -> c72e58946


HAWQ-1600. Parquet table data vectorized scan


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/cfee2c51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/cfee2c51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/cfee2c51

Branch: refs/heads/master
Commit: cfee2c51d7b28d23c644e903d544b3645c42b9a9
Parents: 30b3044
Author: Weinan Wang <wew...@pivotal.io>
Authored: Fri Mar 30 10:05:56 2018 +0800
Committer: Weinan Wang <wew...@pivotal.io>
Committed: Fri Mar 30 13:38:29 2018 +0800

----------------------------------------------------------------------
 contrib/vexecutor/Makefile             |   2 +-
 contrib/vexecutor/execVQual.c          | 125 ++++++++++++++++++
 contrib/vexecutor/execVQual.h          |  40 ++++++
 contrib/vexecutor/execVScan.c          |  44 ++++++-
 contrib/vexecutor/execVScan.h          |   3 +
 contrib/vexecutor/parquet_reader.c     | 194 ++++++++++++++++++++++++++++
 contrib/vexecutor/parquet_reader.h     |  39 ++++++
 contrib/vexecutor/tuplebatch.c         |   6 +-
 contrib/vexecutor/vadt.c               |  19 ++-
 contrib/vexecutor/vadt.h               |  11 +-
 contrib/vexecutor/vcheck.c             |  12 +-
 contrib/vexecutor/vcheck.h             |   2 +
 contrib/vexecutor/vexecutor.c          |   6 +-
 src/backend/access/parquet/parquetam.c |   4 +-
 14 files changed, 484 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/Makefile
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/Makefile b/contrib/vexecutor/Makefile
index 805485f..0eae623 100644
--- a/contrib/vexecutor/Makefile
+++ b/contrib/vexecutor/Makefile
@@ -17,7 +17,7 @@
 
 
 MODULE_big = vexecutor
-OBJS    = vexecutor.o vadt.o vcheck.o tuplebatch.o execVScan.o
+OBJS    = vexecutor.o vadt.o vcheck.o tuplebatch.o execVScan.o execVQual.o 
parquet_reader.o
 
 PG_CXXFLAGS = -Wall -O0 -g -std=c++11
 PG_LIBS = $(libpq_pgport) 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/execVQual.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/execVQual.c b/contrib/vexecutor/execVQual.c
new file mode 100644
index 0000000..c4ccf9e
--- /dev/null
+++ b/contrib/vexecutor/execVQual.c
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "execVQual.h"
+
+/*
+ * ExecVariableList
+ *             Evaluates a simple-Variable-list projection.
+ *
+ * Results are stored into the passed values and isnull arrays.
+ */
+static void
+ExecVecVariableList(ProjectionInfo *projInfo,
+                 Datum *values)
+{
+    ExprContext *econtext = projInfo->pi_exprContext;
+    int                   *varSlotOffsets = projInfo->pi_varSlotOffsets;
+    int                   *varNumbers = projInfo->pi_varNumbers;
+    TupleBatch  tb = (TupleBatch) values;
+    int                        i;
+    tb->ncols = list_length(projInfo->pi_targetlist);
+
+    /*
+     * Assign to result by direct extraction of fields from source slots ... a
+     * mite ugly, but fast ...
+     */
+    for (i = list_length(projInfo->pi_targetlist) - 1; i >= 0; i--)
+    {
+        char      *slotptr = ((char *) econtext) + varSlotOffsets[i];
+        TupleTableSlot *varSlot = *((TupleTableSlot **) slotptr);
+        int                    varNumber = varNumbers[i] - 1;
+        tb->datagroup[i] = 
((TupleBatch)varSlot->PRIVATE_tb)->datagroup[varNumber];
+    }
+}
+
+TupleTableSlot *
+ExecVProject(ProjectionInfo *projInfo, ExprDoneCond *isDone)
+{
+    TupleTableSlot *slot;
+    Assert(projInfo != NULL);
+
+    /*
+     * get the projection info we want
+     */
+    slot = projInfo->pi_slot;
+
+    /*
+     * Clear any former contents of the result slot.  This makes it safe for
+     * us to use the slot's Datum/isnull arrays as workspace. (Also, we can
+     * return the slot as-is if we decide no rows can be projected.)
+     */
+    ExecClearTuple(slot);
+
+    /*
+     * form a new result tuple (if possible); if successful, mark the result
+     * slot as containing a valid virtual tuple
+     */
+    if (projInfo->pi_isVarList)
+    {
+        /* simple Var list: this always succeeds with one result row */
+        if (isDone)
+            *isDone = ExprSingleResult;
+
+        ExecVecVariableList(projInfo,slot->PRIVATE_tb);
+        ExecStoreVirtualTuple(slot);
+    }
+    else
+    {
+        elog(FATAL,"does not support expression in projection stmt");
+       // if (ExecTargetList(projInfo->pi_targetlist,
+       //                    projInfo->pi_exprContext,
+       //                    slot_get_values(slot),
+       //                    slot_get_isnull(slot),
+       //                    (ExprDoneCond *) projInfo->pi_itemIsDone,
+       //                    isDone))
+       //     ExecStoreVirtualTuple(slot);
+    }
+
+    return slot;
+}
+
+/*
+ * VirtualNodeProc
+ *      return value indicate whether has a tuple data fill in 
slot->PRIVATE_tts_values slot
+ *      This function will be invoked in V->N process.
+ */
+bool
+VirtualNodeProc(ScanState* state,TupleTableSlot *slot){
+    if(TupIsNull(slot) )
+        return false;
+
+    TupleBatch tb = slot->PRIVATE_tb;
+    ExecClearTuple(slot);
+
+    while (tb->skip[tb->iter] && tb->iter < tb->nrows)
+        tb->iter++;
+
+    if(tb->iter == tb->nrows)
+        return false;
+
+    for(int i = 0;i < tb->ncols;i ++)
+    {
+        vheader *header = tb->datagroup[i];
+        
GetVFunc(GetVtype(header->elemtype))->gettypevalue(header,tb->iter,slot->PRIVATE_tts_values
 + i);
+        slot->PRIVATE_tts_isnull[i] = header->isnull[tb->iter];
+    }
+    tb->iter ++;
+    ExecStoreVirtualTuple(slot);
+    return true;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/execVQual.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/execVQual.h b/contrib/vexecutor/execVQual.h
new file mode 100644
index 0000000..fb35789
--- /dev/null
+++ b/contrib/vexecutor/execVQual.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef __EXEC_VQUAL_H___
+#define __EXEC_VQUAL_H___
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "cdb/cdbvars.h"
+#include "cdb/partitionselection.h"
+#include "executor/execdebug.h"
+#include "executor/nodeAgg.h"
+#include "tuplebatch.h"
+#include "vadt.h"
+
+extern TupleTableSlot *
+ExecVProject(ProjectionInfo *projInfo, ExprDoneCond *isDone);
+
+extern bool
+ExecVQual(List *qual, ExprContext *econtext);
+
+extern bool
+VirtualNodeProc(ScanState* state,TupleTableSlot *slot);
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/execVScan.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/execVScan.c b/contrib/vexecutor/execVScan.c
index b0fbd11..44a6854 100644
--- a/contrib/vexecutor/execVScan.c
+++ b/contrib/vexecutor/execVScan.c
@@ -19,6 +19,8 @@
 
 #include "execVScan.h"
 #include "miscadmin.h"
+#include "execVQual.h"
+#include "parquet_reader.h"
 
 static TupleTableSlot*
 ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd);
@@ -38,8 +40,8 @@ getVScanMethod(int tableType)
                     },
                     //PARQUETSCAN
                     {
-                            &ParquetScanNext, &BeginScanParquetRelation, 
&EndScanParquetRelation,
-                            &ReScanParquetRelation, &MarkRestrNotAllowed, 
&MarkRestrNotAllowed
+                            &ParquetVScanNext, &BeginScanParquetRelation, 
&EndScanParquetRelation,
+                            NULL,NULL,NULL
                     }
             };
 
@@ -53,6 +55,35 @@ getVScanMethod(int tableType)
     return &scanMethods[tableType];
 }
 
+/*
+ * ExecTableVScanVirtualLayer
+ *          translate a batch of tuple to single one if scan parent is normal 
execution node
+ *
+ * VirtualNodeProc may not fill tuple data in slot success, due to 
qualification tag.
+ * invoke scan table functoin if no tuple can pop up in TupleBatch
+ */
+TupleTableSlot *ExecTableVScanVirtualLayer(ScanState *scanState)
+{
+    VectorizedState* vs = (VectorizedState*)scanState->ps.vectorized;
+    VectorizedState* pvs = vs->parent->vectorized;
+
+    if(pvs->vectorized)
+        return 
ExecVScan(scanState,getVScanMethod(scanState->tableType)->accessMethod);
+    else
+    {
+        TupleTableSlot* slot = scanState->ps.ps_ProjInfo ? 
scanState->ps.ps_ResultTupleSlot : scanState->ss_ScanTupleSlot;
+        bool succ = VirtualNodeProc(scanState,slot);
+
+        if(!succ)
+        {
+            slot = ExecTableVScan(scanState);
+            VirtualNodeProc(scanState,slot);
+        }
+
+        return slot;
+    }
+}
+
 TupleTableSlot *ExecTableVScan(ScanState *scanState)
 {
     if (scanState->scan_state == SCAN_INIT ||
@@ -61,6 +92,8 @@ TupleTableSlot *ExecTableVScan(ScanState *scanState)
         getVScanMethod(scanState->tableType)->beginScanMethod(scanState);
     }
 
+    tbReset(scanState->ss_ScanTupleSlot->PRIVATE_tb);
+    tbReset(scanState->ps.ps_ResultTupleSlot->PRIVATE_tb);
     TupleTableSlot *slot = 
ExecVScan(scanState,getVScanMethod(scanState->tableType)->accessMethod);
 
     if (TupIsNull(slot) && !scanState->ps.delayEagerFree)
@@ -148,7 +181,10 @@ ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd)
                  * Form a projection tuple, store it in the result tuple slot
                  * and return it.
                  */
-                return ExecProject(projInfo, NULL);
+                ((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->nrows = 
((TupleBatch)slot->PRIVATE_tb)->nrows;
+                memcpy(((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->skip,
+                       ((TupleBatch)slot->PRIVATE_tb)->skip,sizeof(bool) * 
((TupleBatch)slot->PRIVATE_tb)->nrows);
+                return ExecVProject(projInfo, NULL);
             }
             else
             {
@@ -163,5 +199,7 @@ ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd)
          * Tuple fails qual, so free per-tuple memory and try again.
          */
         ResetExprContext(econtext);
+        tbReset(node->ss_ScanTupleSlot->PRIVATE_tb);
+        tbReset(node->ps.ps_ResultTupleSlot->PRIVATE_tb);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/execVScan.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/execVScan.h b/contrib/vexecutor/execVScan.h
index 22eee5b..ced3945 100644
--- a/contrib/vexecutor/execVScan.h
+++ b/contrib/vexecutor/execVScan.h
@@ -23,5 +23,8 @@
 #include "postgres.h"
 #include "executor/executor.h"
 
+
+TupleTableSlot *ExecTableVScanVirtualLayer(ScanState *scanState);
+
 TupleTableSlot *ExecTableVScan(ScanState *scanState);
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/parquet_reader.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/parquet_reader.c 
b/contrib/vexecutor/parquet_reader.c
new file mode 100644
index 0000000..301bd65
--- /dev/null
+++ b/contrib/vexecutor/parquet_reader.c
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "parquet_reader.h"
+
+#include "executor/executor.h"
+#include "tuplebatch.h"
+#include "vcheck.h"
+
+extern bool getNextRowGroup(ParquetScanDesc scan);
+static int
+ParquetRowGroupReader_ScanNextTupleBatch(
+               TupleDesc                               tupDesc,
+               ParquetRowGroupReader   *rowGroupReader,
+               int                                             
*hawqAttrToParquetColNum,
+               bool                                    *projs,
+               TupleTableSlot                  *slot);
+
+static void
+parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, TupleTableSlot 
*slot);
+
+TupleTableSlot *
+ParquetVScanNext(ScanState *scanState)
+{
+       Assert(IsA(scanState, TableScanState) || IsA(scanState, 
DynamicTableScanState));
+       ParquetScanState *node = (ParquetScanState *)scanState;
+       Assert(node->opaque != NULL && node->opaque->scandesc != NULL);
+
+       parquet_vgetnext(node->opaque->scandesc, 
node->ss.ps.state->es_direction, node->ss.ss_ScanTupleSlot);
+       return node->ss.ss_ScanTupleSlot;
+}
+
+static void
+parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, TupleTableSlot 
*slot)
+{
+
+       //AOTupleId aoTupleId;
+       Assert(ScanDirectionIsForward(direction));
+
+       for(;;)
+       {
+               if(scan->bufferDone)
+               {
+                       /*
+                        * Get the next row group. We call this function until 
we
+                        * successfully get a block to process, or finished 
reading
+                        * all the data (all 'segment' files) for this relation.
+                        */
+                       while(!getNextRowGroup(scan))
+                       {
+                               /* have we read all this relation's data. done! 
*/
+                               if(scan->pqs_done_all_splits)
+                               {
+                                       ExecClearTuple(slot);
+                                       return /*NULL*/;
+                               }
+                       }
+                       scan->bufferDone = false;
+               }
+
+               int row_num  = ParquetRowGroupReader_ScanNextTupleBatch(
+                                                               
scan->pqs_tupDesc,
+                                                               
&scan->rowGroupReader,
+                                                               
scan->hawqAttrToParquetColChunks,
+                                                               scan->proj,
+                                                               slot);
+               if(row_num > 0)
+                       return;
+
+               /* no more items in the row group, get new buffer */
+               scan->bufferDone = true;
+       }
+}
+
+/*
+ * Get next tuple batch from current row group into slot.
+ *
+ * Return false if current row group has no tuple left, true otherwise.
+ */
+static int
+ParquetRowGroupReader_ScanNextTupleBatch(
+       TupleDesc                               tupDesc,
+       ParquetRowGroupReader   *rowGroupReader,
+       int                                             
*hawqAttrToParquetColNum,
+       bool                                    *projs,
+       TupleTableSlot                  *slot)
+{
+       Assert(slot);
+
+       if (rowGroupReader->rowRead >= rowGroupReader->rowCount)
+       {
+               ParquetRowGroupReader_FinishedScanRowGroup(rowGroupReader);
+               return false;
+       }
+
+       /*
+        * get the next item (tuple) from the row group
+        */
+       int ncol = slot->tts_tupleDescriptor->natts;
+    TupleBatch tb = (TupleBatch )slot->PRIVATE_tb;
+
+       tb->nrows = 0;
+       if (rowGroupReader->rowRead + tb->batchsize > rowGroupReader->rowCount) 
{
+               tb->nrows = rowGroupReader->rowCount-rowGroupReader->rowRead;
+               rowGroupReader->rowRead = rowGroupReader->rowCount;
+       }
+       else {
+               tb->nrows = tb->batchsize ;
+               rowGroupReader->rowRead += tb->batchsize;
+       }
+
+       int colReaderIndex = 0;
+       for(int i = 0; i < tb->ncols ; i++)
+       {
+               if(projs[i] == false)
+                       continue;
+
+               Oid hawqTypeID = tupDesc->attrs[i]->atttypid;
+        Oid hawqVTypeID = GetVtype(hawqTypeID);
+               if(!tb->datagroup[i])
+                       tbCreateColumn(tb,i,hawqVTypeID);
+
+               vheader* header = tb->datagroup[i];
+        header->dim = tb->nrows;
+
+               ParquetColumnReader *nextReader =
+                       &rowGroupReader->columnReaders[colReaderIndex];
+
+               for(int j = 0;j < tb->nrows; j++)
+               {
+
+                       if(hawqAttrToParquetColNum[i] == 1)
+                       {
+                               ParquetColumnReader_readValue(nextReader, 
GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j, hawqTypeID);
+                       }
+                       else
+                       {
+                               /*
+                                * Because there are some memory reused inside 
the whole column reader, so need
+                                * to switch the context from PerTupleContext 
to rowgroup->context
+                                */
+                               MemoryContext oldContext = 
MemoryContextSwitchTo(rowGroupReader->memoryContext);
+
+                               switch(hawqTypeID)
+                               {
+                                       case HAWQ_TYPE_POINT:
+                                               
ParquetColumnReader_readPoint(nextReader, 
GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j);
+                                               break;
+                                       case HAWQ_TYPE_PATH:
+                                               
ParquetColumnReader_readPATH(nextReader, 
GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j);
+                                               break;
+                                       case HAWQ_TYPE_LSEG:
+                                               
ParquetColumnReader_readLSEG(nextReader, 
GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j);
+                                               break;
+                                       case HAWQ_TYPE_BOX:
+                                               
ParquetColumnReader_readBOX(nextReader, 
GetVFunc(hawqVTypeID)->gettypeptr(header,j), header->isnull + j);
+                                               break;
+                                       case HAWQ_TYPE_CIRCLE:
+                                               
ParquetColumnReader_readCIRCLE(nextReader,GetVFunc(hawqVTypeID)->gettypeptr(header,j),header->isnull
 + j);
+                                               break;
+                                       case HAWQ_TYPE_POLYGON:
+                                               
ParquetColumnReader_readPOLYGON(nextReader,GetVFunc(hawqVTypeID)->gettypeptr(header,j),header->isnull
 + j);
+                                               break;
+                                       default:
+                                               Insist(false);
+                                               break;
+                               }
+
+                               MemoryContextSwitchTo(oldContext);
+                       }
+               }
+
+               colReaderIndex += hawqAttrToParquetColNum[i];
+       }
+
+       /*construct tuple, and return back*/
+       TupSetVirtualTupleNValid(slot, ncol);
+       return tb->nrows;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/parquet_reader.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/parquet_reader.h 
b/contrib/vexecutor/parquet_reader.h
new file mode 100644
index 0000000..2a2f1c0
--- /dev/null
+++ b/contrib/vexecutor/parquet_reader.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef __PARQUET_READER__
+#define __PARQUET_READER__
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "cdb/cdbparquetfooterprocessor.h"
+#include "cdb/cdbparquetfooterserializer.h"
+#include "access/parquetmetadata_c++/MetadataInterface.h"
+#include "cdb/cdbparquetrowgroup.h"
+#include "utils/memutils.h"
+#include "utils/palloc.h"
+#include "snappy-c.h"
+#include "zlib.h"
+#include "executor/spi.h"
+#include "cdb/cdbparquetam.h"
+#include "nodes/print.h"
+
+TupleTableSlot *ParquetVScanNext(ScanState *node);
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/tuplebatch.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/tuplebatch.c b/contrib/vexecutor/tuplebatch.c
index 3cec6ec..cd8859b 100644
--- a/contrib/vexecutor/tuplebatch.c
+++ b/contrib/vexecutor/tuplebatch.c
@@ -68,17 +68,17 @@ void tbCreateColumn(TupleBatch tb,int colid,Oid type)
         return;
     int bs = tb->batchsize;
 
-    GetVFunc(type)->vtbuild((bs));
+    tb->datagroup[colid] = GetVFunc(type)->vtbuild((bs));
 }
 
 void tbfreeColumn(vheader** vh,int colid)
 {
-    GetVFunc(vh[colid]->elemtype)->vtfree(&vh[colid]);
+    GetVFunc(GetVtype(vh[colid]->elemtype))->vtfree(&vh[colid]);
 }
 
 static size_t vtypeSize(vheader *vh)
 {
-    return GetVFunc(vh->elemtype)->vtsize(vh);
+    return GetVFunc(GetVtype(vh->elemtype))->vtsize(vh);
 }
 
 static size_t

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vadt.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/vadt.c b/contrib/vexecutor/vadt.c
index eaccc33..4e5b766 100644
--- a/contrib/vexecutor/vadt.c
+++ b/contrib/vexecutor/vadt.c
@@ -39,7 +39,7 @@ size_t v##type##Size(vheader *vh) \
 vheader* buildv##type(int n) \
 { \
        vheader* result; \
-       result = (vheader*) palloc0(offsetof(v##type,values) + n * 
sizeof(type)) + sizeof(Datum); \
+       result = (vheader*) palloc0(offsetof(v##type,values) + n * sizeof(type) 
+ sizeof(Datum)) ; \
        result->dim = n; \
        result->elemtype = typeoid; \
        result->isnull = palloc0(sizeof(bool) * n); \
@@ -49,7 +49,7 @@ vheader* buildv##type(int n) \
 
 /* Destroy the vectorized data */
 #define FUNCTION_DESTORY(type, typeoid) \
-void destoryv##type(v##header **header) \
+void destroyv##type(vheader **header) \
 { \
        v##type** ptr = (v##type**) header; \
        pfree((*header)->isnull); \
@@ -357,13 +357,28 @@ v##type##const_type##cmpstr(PG_FUNCTION_ARGS) \
     FUNCTION_CMP(type) \
     FUNCTION_CMP_RCONST(type) 
 
+#define FUNCTION_GETPTR(type) \
+type* getptrv##type(vheader *header,int n) \
+{\
+       if(n < 0 || n > header->dim) return NULL; \
+       v##type* ptr = (v##type *) header; \
+       return ptr->values + n; \
+}
 
+#define FUNCTION_GETVALUE(type) \
+void getvaluev##type(vheader *header,int n,Datum *ptr) \
+{ \
+       if(n < 0 || n > header->dim) return;\
+       *ptr = ((v##type*)header)->values[n];\
+}
 
 #define TYPE_DEFINE(type, typeoid) \
     FUNCTION_VTYPESIZE(type) \
     FUNCTION_OP_ALL(type) \
     FUNCTION_BUILD(type, typeoid) \
     FUNCTION_DESTORY(type, typeoid) \
+    FUNCTION_GETPTR(type) \
+    FUNCTION_GETVALUE(type) \
     FUNCTION_SERIALIZATION(type,typeoid) \
 
 /* These MACRO will be expanded when the code is compiled. */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vadt.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/vadt.h b/contrib/vexecutor/vadt.h
index 8b7061a..c8ad3f8 100644
--- a/contrib/vexecutor/vadt.h
+++ b/contrib/vexecutor/vadt.h
@@ -58,8 +58,15 @@ extern vheader* buildv##type(int n);
 
 /* Destroy the vectorized data */
 #define FUNCTION_DESTORY_HEADER(type, typeoid) \
-extern void destoryv##type(vheader **header);
+extern void destroyv##type(vheader **header);
 
+/* Get a column data pointer */
+#define FUNCTION_GETPTR_HEADER(type) \
+extern type* getptrv##type(vheader *header,int n);
+
+/* Get a column data */
+#define FUNCTION_GETVALUE_HEADER(type) \
+extern void getvaluev##type(vheader *header,int n,Datum *ptr);
 /*
  * IN function for the abstract data types
  * e.g. Datum vint2in(PG_FUNCTION_ARGS)
@@ -186,6 +193,8 @@ extern Datum v##type##deserialization(unsigned char* 
buf,size_t* len); \
 
 #define TYPE_HEADER(type, typeoid) \
     VADT_BUILD(type) \
+    FUNCTION_GETPTR_HEADER(type)\
+    FUNCTION_GETVALUE_HEADER(type) \
     FUNCTION_OP_ALL_HEADER(type) \
     FUNCTION_IN_HEADER(type, typeoid) \
     FUNCTION_OUT_HEADER(type, typeoid) \

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vcheck.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/vcheck.c b/contrib/vexecutor/vcheck.c
index 88de429..7e1a591 100644
--- a/contrib/vexecutor/vcheck.c
+++ b/contrib/vexecutor/vcheck.c
@@ -36,12 +36,12 @@
 
 
 static const vFuncMap funcMap[] = {
-               {INT2OID, &buildvint2, &destoryvint2, 
&vint2Size,&vint2serialization,&vint2deserialization},
-               {INT4OID, &buildvint4, &destoryvint4, 
&vint4Size,&vint4serialization,&vint4deserialization},
-               {INT8OID, &buildvint8, &destoryvint8, 
&vint8Size,&vint8serialization,&vint8deserialization},
-               {FLOAT4OID, &buildvfloat4, &destoryvfloat4, 
&vfloat4Size,&vfloat4serialization,&vfloat4deserialization},
-               {FLOAT8OID, &buildvfloat8, &destoryvfloat8, 
&vfloat8Size,&vfloat8serialization,&vfloat8deserialization},
-               {BOOLOID, &buildvbool, &destoryvbool, 
&vboolSize,&vboolserialization,&vbooldeserialization}
+               {INT2OID, &buildvint2, &destroyvint2, 
&getptrvint2,&getvaluevint2,&vint2Size,&vint2serialization,&vint2deserialization},
+               {INT4OID, &buildvint4, &destroyvint4, 
&getptrvint4,&getvaluevint4,&vint4Size,&vint4serialization,&vint4deserialization},
+               {INT8OID, &buildvint8, &destroyvint8, 
&getptrvint8,&getvaluevint8,&vint8Size,&vint8serialization,&vint8deserialization},
+               {FLOAT4OID, &buildvfloat4, &destroyvfloat4, 
&getptrvfloat4,&getvaluevfloat4,&vfloat4Size,&vfloat4serialization,&vfloat4deserialization},
+               {FLOAT8OID, &buildvfloat8, &destroyvfloat8, 
&getptrvfloat8,&getvaluevfloat8,&vfloat8Size,&vfloat8serialization,&vfloat8deserialization},
+               {BOOLOID, &buildvbool, &destroyvbool, 
&getptrvbool,&getvaluevbool,&vboolSize,&vboolserialization,&vbooldeserialization}
 };
 static const int funcMapLen = sizeof(funcMap) /sizeof(vFuncMap);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vcheck.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/vcheck.h b/contrib/vexecutor/vcheck.h
index 59bc46b..9b3d18c 100644
--- a/contrib/vexecutor/vcheck.h
+++ b/contrib/vexecutor/vcheck.h
@@ -27,6 +27,8 @@ typedef struct vFuncMap
     Oid ntype;
     vheader* (* vtbuild)(int n);
     void (* vtfree)(vheader **vh);
+       Datum (* gettypeptr)(vheader *vh,int n);
+       void (* gettypevalue)(vheader *vh,int n,Datum *ptr);
     size_t (* vtsize)(vheader *vh);
     size_t (*serialization)(vheader* vh, unsigned char* buf);
     Datum (*deserialization)(unsigned char* buf,size_t* len);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/contrib/vexecutor/vexecutor.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/vexecutor.c b/contrib/vexecutor/vexecutor.c
index 51cf0f7..4533fdd 100644
--- a/contrib/vexecutor/vexecutor.c
+++ b/contrib/vexecutor/vexecutor.c
@@ -189,7 +189,7 @@ static TupleTableSlot* VExecProcNode(PlanState *node)
         case T_ParquetScanState:
         case T_AppendOnlyScanState:
         case T_TableScanState:
-            result = ExecTableVScan((TableScanState*)node);
+                       result = 
ExecTableVScanVirtualLayer((TableScanState*)node);
             break;
         default:
             break;
@@ -208,10 +208,6 @@ static bool VExecEndNode(PlanState *node)
        {
                case T_AppendOnlyScanState:
                case T_ParquetScanState:
-            tbDestroy((TupleBatch *) (&node->ps_ResultTupleSlot->PRIVATE_tb));
-            tbDestroy((TupleBatch *) (&((TableScanState *) 
node)->ss.ss_ScanTupleSlot->PRIVATE_tb));
-                       ret = true;
-                       break;
                case T_TableScanState:
                        ExecEndTableScan((TableScanState *)node);
                        ret = true;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/cfee2c51/src/backend/access/parquet/parquetam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/parquet/parquetam.c 
b/src/backend/access/parquet/parquetam.c
index bec8422..4967d0f 100644
--- a/src/backend/access/parquet/parquetam.c
+++ b/src/backend/access/parquet/parquetam.c
@@ -53,7 +53,7 @@ static void initscan(ParquetScanDesc scan);
 static bool SetNextFileSegForRead(ParquetScanDesc scan);
 
 /*get next row group to read*/
-static bool getNextRowGroup(ParquetScanDesc scan);
+bool getNextRowGroup(ParquetScanDesc scan);
 
 /*close current scanning file segments*/
 static void CloseScannedFileSeg(ParquetScanDesc scan);
@@ -327,7 +327,7 @@ void parquet_getnext(ParquetScanDesc scan, ScanDirection 
direction,
 /*
  * You can think of this scan routine as get next "executor" Parquet rowGroup.
  */
-static bool getNextRowGroup(ParquetScanDesc scan)
+bool getNextRowGroup(ParquetScanDesc scan)
 {
        if (scan->pqs_need_new_split)
        {

Reply via email to