Repository: carbondata Updated Branches: refs/heads/master 6ca03f6b7 -> a4c2ef5f8
[CARBONDATA-2656] Presto vector stream readers performance Enhancement eliminate the extra iteration over the carbonColumnVectorImpl object -> vectorArray, by extending it to StreamReaders which will fill up carbon-core vector data (one by one) directly to the block(presto), and on the call of block builder it will return the block to the Presto. This closes #2412 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a4c2ef5f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a4c2ef5f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a4c2ef5f Branch: refs/heads/master Commit: a4c2ef5f833373b4d3bfb6dc4a9fb1c166ae0ed4 Parents: 6ca03f6 Author: sv71294 <[email protected]> Authored: Tue Jun 5 17:44:58 2018 +0530 Committer: chenliang613 <[email protected]> Committed: Wed Jul 18 14:36:09 2018 +0800 ---------------------------------------------------------------------- .../carbondata/presto/CarbonVectorBatch.java | 89 +++++--- .../carbondata/presto/CarbondataPageSource.java | 95 ++------ .../presto/CarbondataPageSourceProvider.java | 18 +- .../PrestoCarbonVectorizedRecordReader.java | 25 ++- .../presto/readers/AbstractStreamReader.java | 66 ------ .../presto/readers/BooleanStreamReader.java | 93 +++----- .../readers/DecimalSliceStreamReader.java | 219 +++++-------------- .../presto/readers/DoubleStreamReader.java | 94 +++----- .../presto/readers/IntegerStreamReader.java | 90 +++----- .../presto/readers/LongStreamReader.java | 87 +++----- .../presto/readers/ObjectStreamReader.java | 56 ++--- .../readers/PrestoVectorBlockBuilder.java | 28 +++ .../presto/readers/ShortStreamReader.java | 87 +++----- .../presto/readers/SliceStreamReader.java | 105 ++++----- .../carbondata/presto/readers/StreamReader.java | 43 ---- .../presto/readers/StreamReaders.java | 98 --------- .../presto/readers/TimestampStreamReader.java | 75 ++++--- .../CarbonDictionaryDecodeReadSupport.scala | 6 +- 18 files changed, 461 insertions(+), 913 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java index b6caaa3..6a4cc0d 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java @@ -20,50 +20,81 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.DecimalType; import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.presto.readers.BooleanStreamReader; +import org.apache.carbondata.presto.readers.DecimalSliceStreamReader; +import org.apache.carbondata.presto.readers.DoubleStreamReader; +import org.apache.carbondata.presto.readers.IntegerStreamReader; +import org.apache.carbondata.presto.readers.LongStreamReader; +import org.apache.carbondata.presto.readers.ObjectStreamReader; +import org.apache.carbondata.presto.readers.ShortStreamReader; +import org.apache.carbondata.presto.readers.SliceStreamReader; +import org.apache.carbondata.presto.readers.TimestampStreamReader; + +import com.facebook.presto.spi.block.SliceArrayBlock; public class CarbonVectorBatch { - private static final int DEFAULT_BATCH_SIZE = 4 * 1024; + private static final int DEFAULT_BATCH_SIZE = 4 * 1024; - private final StructField[] schema; private final int capacity; - private int numRows; private final CarbonColumnVectorImpl[] columns; - // True if the row is filtered. private final boolean[] filteredRows; - // Column indices that cannot have null values. private final Set<Integer> nullFilteredColumns; - + private int numRows; // Total number of rows that have been filtered. private int numRowsFiltered = 0; - - private CarbonVectorBatch(StructField[] schema, int maxRows) { - this.schema = schema; + private CarbonVectorBatch(StructField[] schema, CarbonDictionaryDecodeReadSupport readSupport, + int maxRows) { this.capacity = maxRows; this.columns = new CarbonColumnVectorImpl[schema.length]; this.nullFilteredColumns = new HashSet<>(); this.filteredRows = new boolean[maxRows]; + Dictionary[] dictionaries = readSupport.getDictionaries(); + DataType[] dataTypes = readSupport.getDataTypes(); for (int i = 0; i < schema.length; ++i) { - StructField field = schema[i]; - columns[i] = new CarbonColumnVectorImpl(maxRows, field.getDataType()); + columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i], + readSupport.getSliceArrayBlock(i)); } - } - - public static CarbonVectorBatch allocate(StructField[] schema) { - return new CarbonVectorBatch(schema, DEFAULT_BATCH_SIZE); + public static CarbonVectorBatch allocate(StructField[] schema, + CarbonDictionaryDecodeReadSupport readSupport) { + return new CarbonVectorBatch(schema, readSupport, DEFAULT_BATCH_SIZE); } - public static CarbonVectorBatch allocate(StructField[] schema, int maxRows) { - return new CarbonVectorBatch(schema, maxRows); + private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType, + StructField field, Dictionary dictionary, SliceArrayBlock dictionarySliceArrayBlock) { + if (dataType == DataTypes.BOOLEAN) { + return new BooleanStreamReader(batchSize, field.getDataType(), dictionary); + } else if (dataType == DataTypes.SHORT) { + return new ShortStreamReader(batchSize, field.getDataType(), dictionary); + } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) { + return new IntegerStreamReader(batchSize, field.getDataType(), dictionary); + } else if (dataType == DataTypes.TIMESTAMP) { + return new TimestampStreamReader(batchSize, field.getDataType(), dictionary); + } else if (dataType == DataTypes.LONG) { + return new LongStreamReader(batchSize, field.getDataType(), dictionary); + } else if (dataType == DataTypes.DOUBLE) { + return new DoubleStreamReader(batchSize, field.getDataType(), dictionary); + } else if (dataType == DataTypes.STRING) { + return new SliceStreamReader(batchSize, field.getDataType(), dictionarySliceArrayBlock); + } else if (DataTypes.isDecimal(dataType)) { + return new DecimalSliceStreamReader(batchSize, (DecimalType) field.getDataType(), dictionary); + } else { + return new ObjectStreamReader(batchSize, field.getDataType()); + } } + /** * Resets the batch for writing. */ @@ -78,18 +109,19 @@ public class CarbonVectorBatch { this.numRowsFiltered = 0; } - /** * Returns the number of columns that make up this batch. */ - public int numCols() { return columns.length; } + public int numCols() { + return columns.length; + } /** * Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or * more of their attributes are part of a non-nullable column. */ public void setNumRows(int numRows) { - assert(numRows <= this.capacity); + assert (numRows <= this.capacity); this.numRows = numRows; for (int ordinal : nullFilteredColumns) { @@ -102,30 +134,33 @@ public class CarbonVectorBatch { } } - /** * Returns the number of rows for read, including filtered rows. */ - public int numRows() { return numRows; } + public int numRows() { + return numRows; + } /** * Returns the number of valid rows. */ public int numValidRows() { - assert(numRowsFiltered <= numRows); + assert (numRowsFiltered <= numRows); return numRows - numRowsFiltered; } /** * Returns the column at `ordinal`. */ - public CarbonColumnVectorImpl column(int ordinal) { return columns[ordinal]; } + public CarbonColumnVectorImpl column(int ordinal) { + return columns[ordinal]; + } /** * Returns the max capacity (in number of rows) for this batch. */ - public int capacity() { return capacity; } - - + public int capacity() { + return capacity; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java index d31010f..ad7006a 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java @@ -19,12 +19,10 @@ package org.apache.carbondata.presto; import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.presto.readers.StreamReader; -import org.apache.carbondata.presto.readers.StreamReaders; +import org.apache.carbondata.presto.readers.PrestoVectorBlockBuilder; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables; @@ -35,7 +33,6 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.LazyBlock; import com.facebook.presto.spi.block.LazyBlockLoader; -import com.facebook.presto.spi.type.Type; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -47,25 +44,18 @@ class CarbondataPageSource implements ConnectorPageSource { private static final LogService logger = LogServiceFactory.getLogService(CarbondataPageSource.class.getName()); - private final List<Type> types; + private List<ColumnHandle> columnHandles; private boolean closed; private PrestoCarbonVectorizedRecordReader vectorReader; - private CarbonDictionaryDecodeReadSupport<Object[]> readSupport; - List<ColumnHandle> columnHandles; private long sizeOfData = 0; - private final StreamReader[] readers ; private int batchId; private long nanoStart; private long nanoEnd; - public CarbondataPageSource(CarbonDictionaryDecodeReadSupport readSupport, - PrestoCarbonVectorizedRecordReader vectorizedRecordReader, - List<ColumnHandle> columnHandles ) { + CarbondataPageSource(PrestoCarbonVectorizedRecordReader vectorizedRecordReader, + List<ColumnHandle> columnHandles) { this.columnHandles = columnHandles; - this.types = getColumnTypes(); - this.readSupport = readSupport; vectorReader = vectorizedRecordReader; - this.readers = createStreamReaders(); } @Override public long getCompletedBytes() { @@ -77,10 +67,9 @@ class CarbondataPageSource implements ConnectorPageSource { } @Override public boolean isFinished() { - return closed ; + return closed; } - @Override public Page getNextPage() { if (nanoStart == 0) { nanoStart = System.nanoTime(); @@ -89,13 +78,12 @@ class CarbondataPageSource implements ConnectorPageSource { int batchSize = 0; try { batchId++; - if(vectorReader.nextKeyValue()) { + if (vectorReader.nextKeyValue()) { Object vectorBatch = vectorReader.getCurrentValue(); - if(vectorBatch != null && vectorBatch instanceof CarbonVectorBatch) - { + if (vectorBatch instanceof CarbonVectorBatch) { columnarBatch = (CarbonVectorBatch) vectorBatch; batchSize = columnarBatch.numRows(); - if(batchSize == 0){ + if (batchSize == 0) { close(); return null; } @@ -108,22 +96,16 @@ class CarbondataPageSource implements ConnectorPageSource { return null; } - Block[] blocks = new Block[types.size()]; + Block[] blocks = new Block[columnHandles.size()]; for (int column = 0; column < blocks.length; column++) { - Type type = types.get(column); - readers[column].setBatchSize(columnarBatch.numRows()); - readers[column].setVectorReader(true); - readers[column].setVector(columnarBatch.column(column)); - blocks[column] = new LazyBlock(batchSize, new CarbondataBlockLoader(column, type)); + blocks[column] = new LazyBlock(batchSize, new CarbondataBlockLoader(column)); } Page page = new Page(batchSize, blocks); return page; - } - catch (PrestoException e) { + } catch (PrestoException e) { closeWithSuppression(e); throw e; - } - catch ( RuntimeException | InterruptedException | IOException e) { + } catch (RuntimeException | InterruptedException | IOException e) { closeWithSuppression(e); throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e); } @@ -133,7 +115,7 @@ class CarbondataPageSource implements ConnectorPageSource { return sizeOfData; } - @Override public void close() { + @Override public void close() { // some hive input formats are broken and bad things can happen if you close them multiple times if (closed) { return; @@ -148,13 +130,11 @@ class CarbondataPageSource implements ConnectorPageSource { } - protected void closeWithSuppression(Throwable throwable) - { + private void closeWithSuppression(Throwable throwable) { requireNonNull(throwable, "throwable is null"); try { close(); - } - catch (RuntimeException e) { + } catch (RuntimeException e) { // Self-suppression not permitted logger.error(e, e.getMessage()); if (throwable != e) { @@ -166,61 +146,32 @@ class CarbondataPageSource implements ConnectorPageSource { /** * Lazy Block Implementation for the Carbondata */ - private final class CarbondataBlockLoader - implements LazyBlockLoader<LazyBlock> - { + private final class CarbondataBlockLoader implements LazyBlockLoader<LazyBlock> { private final int expectedBatchId = batchId; private final int columnIndex; - private final Type type; private boolean loaded; - public CarbondataBlockLoader(int columnIndex, Type type) - { + CarbondataBlockLoader(int columnIndex) { this.columnIndex = columnIndex; - this.type = requireNonNull(type, "type is null"); } - @Override - public final void load(LazyBlock lazyBlock) - { + @Override public final void load(LazyBlock lazyBlock) { if (loaded) { return; } checkState(batchId == expectedBatchId); try { - Block block = readers[columnIndex].readBlock(type); + PrestoVectorBlockBuilder blockBuilder = + (PrestoVectorBlockBuilder) vectorReader.getColumnarBatch().column(columnIndex); + blockBuilder.setBatchSize(lazyBlock.getPositionCount()); + Block block = blockBuilder.buildBlock(); sizeOfData += block.getSizeInBytes(); lazyBlock.setBlock(block); - } - catch (IOException e) { + } catch (Exception e) { throw new CarbonDataLoadingException("Error in Reading Data from Carbondata ", e); } loaded = true; } - } - - /** - * Create the Stream Reader for every column based on their type - * This method will be initialized only once based on the types. - * - * @return - */ - private StreamReader[] createStreamReaders( ) { - requireNonNull(types); - StreamReader[] readers = new StreamReader[types.size()]; - for (int i = 0; i < types.size(); i++) { - readers[i] = StreamReaders.createStreamReader(types.get(i), readSupport - .getSliceArrayBlock(i),readSupport.getDictionaries()[i]); - } - return readers; - } - - private List<Type> getColumnTypes() { - return columnHandles.stream().map(a -> ((CarbondataColumnHandle)a).getColumnType()) - .collect(Collectors.toList()); - } - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java index 4679eac..cc5bf2a 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java @@ -78,13 +78,12 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) { this.queryId = ((CarbondataSplit)split).getQueryId(); CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport(); - PrestoCarbonVectorizedRecordReader carbonRecordReader = createReader(split, columns, readSupport); - return new CarbondataPageSource(readSupport, carbonRecordReader, columns ); + PrestoCarbonVectorizedRecordReader carbonRecordReader = + createReader(split, columns, readSupport); + return new CarbondataPageSource(carbonRecordReader, columns); } - /** - * * @param split * @param columns * @param readSupport @@ -103,7 +102,7 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider CarbonIterator iterator = queryExecutor.execute(queryModel); readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); PrestoCarbonVectorizedRecordReader reader = new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel, - (AbstractDetailQueryResultIterator) iterator); + (AbstractDetailQueryResultIterator) iterator, readSupport); reader.setTaskId(carbondataSplit.getIndex()); return reader; } catch (IOException e) { @@ -116,7 +115,6 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider } /** - * * @param carbondataSplit * @param columns * @return @@ -152,9 +150,6 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits()); queryModel.setTableBlockInfos(tableBlockInfoList); - - - return queryModel; } catch (IOException e) { throw new RuntimeException("Unable to get the Query Model ", e); @@ -162,7 +157,6 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider } /** - * * @param conf * @param carbonTable * @param filterExpression @@ -190,9 +184,7 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider return format; } - /** - * * @param columns * @return */ @@ -208,7 +200,6 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider } /** - * * @param carbonSplit * @return */ @@ -222,5 +213,4 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider return tableCacheModel.carbonTable; } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java index 913d423..32e163a 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java @@ -80,11 +80,14 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> { private long queryStartTime; + private CarbonDictionaryDecodeReadSupport readSupport; + public PrestoCarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel, - AbstractDetailQueryResultIterator iterator) { + AbstractDetailQueryResultIterator iterator, CarbonDictionaryDecodeReadSupport readSupport) { this.queryModel = queryModel; this.iterator = iterator; this.queryExecutor = queryExecutor; + this.readSupport = readSupport; enableReturningBatches(); this.queryStartTime = System.currentTimeMillis(); } @@ -184,17 +187,15 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> { if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory .getDirectDictionaryGenerator(dim.getDimension().getDataType()); - fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), - generator.getReturnType()); + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), generator.getReturnType()); } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) { - fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), - dim.getDimension().getDataType()); + fields[dim.getOrdinal()] = + new StructField(dim.getColumnName(), dim.getDimension().getDataType()); } else if (dim.getDimension().isComplex()) { - fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), - dim.getDimension().getDataType()); + fields[dim.getOrdinal()] = + new StructField(dim.getColumnName(), dim.getDimension().getDataType()); } else { - fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), - DataTypes.INT); + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), DataTypes.INT); } } @@ -212,7 +213,7 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> { } } - columnarBatch = CarbonVectorBatch.allocate(fields); + columnarBatch = CarbonVectorBatch.allocate(fields, readSupport); CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; boolean[] filteredRows = new boolean[columnarBatch.capacity()]; for (int i = 0; i < fields.length; i++) { @@ -221,7 +222,6 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> { carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows); } - private CarbonVectorBatch resultBatch() { if (columnarBatch == null) initBatch(); return columnarBatch; @@ -251,6 +251,9 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> { return false; } + public CarbonVectorBatch getColumnarBatch() { + return columnarBatch; + } public void setTaskId(long taskId) { this.taskId = taskId; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java deleted file mode 100644 index 81a4b4f..0000000 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.presto.readers; - -import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; - -/** - * Abstract class for Stream Readers - */ -public abstract class AbstractStreamReader implements StreamReader { - - protected Object[] streamData; - - protected CarbonColumnVectorImpl columnVector; - - protected boolean isVectorReader; - - protected int batchSize; - - /** - * Setter for StreamData - * @param data - */ - @Override public void setStreamData(Object[] data) { - this.streamData = data; - } - - /** - * Setter for Vector data - * @param vector - */ - @Override public void setVector(CarbonColumnVectorImpl vector) { - this.columnVector = vector; - } - - /** - * Setter for vector Reader - * @param isVectorReader - */ - public void setVectorReader(boolean isVectorReader) { - this.isVectorReader = isVectorReader; - } - - /** - * Setter for BatchSize - * @param batchSize - */ - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java index 0b7206b..17578d7 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java @@ -17,91 +17,64 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; - import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import org.apache.carbondata.core.util.DataTypeUtil; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.BooleanType; import com.facebook.presto.spi.type.Type; -public class BooleanStreamReader extends AbstractStreamReader { +public class BooleanStreamReader extends CarbonColumnVectorImpl + implements PrestoVectorBlockBuilder { - private boolean isDictionary; - private Dictionary dictionary; + protected int batchSize; - public BooleanStreamReader() { + protected Type type = BooleanType.BOOLEAN; - } + protected BlockBuilder builder; - public BooleanStreamReader(boolean isDictionary, Dictionary dictionary) { - this.isDictionary = isDictionary; + private Dictionary dictionary; + + public BooleanStreamReader(int batchSize, DataType dataType, Dictionary dictionary) { + super(batchSize, dataType); + this.batchSize = batchSize; + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); this.dictionary = dictionary; } - public Block readBlock(Type type) throws IOException { - int numberOfRows = 0; - BlockBuilder builder = null; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - if (isDictionary) { - populateDictionaryVector(type, numberOfRows, builder); - } else { - if (columnVector.anyNullsSet()) { - handleNullInVector(type, numberOfRows, builder); - } else { - populateVector(type, numberOfRows, builder); - } - } - } - } else { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - type.writeBoolean(builder, byteToBoolean(streamData[i])); - } - } - + @Override public Block buildBlock() { return builder.build(); } - private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeBoolean(builder, byteToBoolean(columnVector.getData(i))); - } + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + @Override public void putInt(int rowId, int value) { + Object data = DataTypeUtil + .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.BOOLEAN); + if (data != null) { + type.writeBoolean(builder, (boolean) data); + } else { + builder.appendNull(); } } - private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - type.writeBoolean(builder, byteToBoolean(columnVector.getData(i))); - } + @Override public void putBoolean(int rowId, boolean value) { + type.writeBoolean(builder, value); } - private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - int value = (int) columnVector.getData(i); - Object data = DataTypeUtil - .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.BOOLEAN); - if (data != null) { - type.writeBoolean(builder,(boolean) data); - } else { - builder.appendNull(); - } - } + @Override public void putNull(int rowId) { + builder.appendNull(); } - private Boolean byteToBoolean(Object value){ - byte byteValue = (byte)value; - return byteValue == 1; + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java index 54f2b5f..ed88343 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java @@ -17,13 +17,13 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Objects; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import org.apache.carbondata.core.util.DataTypeUtil; import com.facebook.presto.spi.block.Block; @@ -45,80 +45,66 @@ import static java.math.RoundingMode.HALF_UP; /** * Reader for DecimalValues */ -public class DecimalSliceStreamReader extends AbstractStreamReader { +public class DecimalSliceStreamReader extends CarbonColumnVectorImpl + implements PrestoVectorBlockBuilder { + private final char[] buffer = new char[100]; + protected int batchSize; + protected Type type; + protected BlockBuilder builder; private Dictionary dictionary; - private boolean isDictionary; + public DecimalSliceStreamReader(int batchSize, + org.apache.carbondata.core.metadata.datatype.DecimalType dataType, Dictionary dictionary) { + super(batchSize, dataType); + this.type = DecimalType.createDecimalType(dataType.getPrecision(), dataType.getScale()); + this.batchSize = batchSize; + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + this.dictionary = dictionary; + } - private final char[] buffer = new char[100]; + @Override public Block buildBlock() { + return builder.build(); + } - public DecimalSliceStreamReader() { + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + @Override public void putInt(int rowId, int value) { + DecimalType decimalType = (DecimalType) type; + Object data = DataTypeUtil.getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), + DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale())); + if (Objects.isNull(data)) { + builder.appendNull(); + } else { + decimalBlockWriter((BigDecimal) data); + } } - public DecimalSliceStreamReader(boolean isDictionary, Dictionary dictionary) { - this.dictionary = dictionary; - this.isDictionary = isDictionary; + @Override public void putDecimal(int rowId, BigDecimal value, int precision) { + decimalBlockWriter(value); } - /** - * Create Block for DecimalType - * @param type - * @return - * @throws IOException - */ - public Block readBlock(Type type) throws IOException { - int numberOfRows = 0; - BlockBuilder builder = null; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - if (isDictionary) { - if (isShortDecimal(type)) { - populateShortDictionaryVector(type, numberOfRows, builder); - } else { - populateLongDictionaryVector(type, numberOfRows, builder); - } - } else { - if (columnVector.anyNullsSet()) { - handleNullInVector(type, numberOfRows, builder); - } else { - if (isShortDecimal(type)) { - populateShortDecimalVector(type, numberOfRows, builder); - } else { - populateLongDecimalVector(type, numberOfRows, builder); - } - } - } - } + @Override public void putNull(int rowId) { + builder.appendNull(); + } + + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + } + + private void decimalBlockWriter(BigDecimal value) { + if (isShortDecimal(type)) { + long rescaledDecimal = Decimals.rescale(value.unscaledValue().longValue(), value.scale(), + ((DecimalType) type).getScale()); + type.writeLong(builder, rescaledDecimal); } else { - if (streamData != null) { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - Slice slice = getSlice(streamData[i], type); - if (isShortDecimal(type)) { - type.writeLong(builder, parseLong((DecimalType) type, slice, 0, slice.length())); - } else { - type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); - } - } - } - } - if (builder == null) { - return null; + Slice slice = getSlice(value, type); + type.writeSlice(builder, parseSlice((DecimalType) type, slice, slice.length())); } - return builder.build(); } - /** - * Function to getSlice from Decimal Object - * @param value - * @param type - * @return - */ private Slice getSlice(Object value, Type type) { if (type instanceof DecimalType) { DecimalType actual = (DecimalType) type; @@ -137,50 +123,20 @@ public class DecimalSliceStreamReader extends AbstractStreamReader { rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale()); Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal); return utf8Slice(Decimals.toString(decimalSlice, actual.getScale())); - } - } } else { return utf8Slice(value.toString()); } } - /** - * Function to parse ShortDecimalType as it is internally treated as Long - * @param type - * @param slice - * @param offset - * @param length - * @return - */ - private long parseLong(DecimalType type, Slice slice, int offset, int length) { - BigDecimal decimal = parseBigDecimal(type, slice, offset, length); - return decimal.unscaledValue().longValue(); - } - - /** - * Function for parsing the Slice - * @param type - * @param slice - * @param offset - * @param length - * @return - */ - private Slice parseSlice(DecimalType type, Slice slice, int offset, int length) { - BigDecimal decimal = parseBigDecimal(type, slice, offset, length); + private Slice parseSlice(DecimalType type, Slice slice, int length) { + BigDecimal decimal = parseBigDecimal(type, slice, length); return encodeUnscaledValue(decimal.unscaledValue()); } - /** - * Function for parsing the BigDecimal - * @param type - * @param slice - * @param offset - * @param length - * @return - */ - private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length) { + private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int length) { + int offset = 0; checkArgument(length < buffer.length); for (int i = 0; i < length; i++) { buffer[i] = (char) slice.getByte(offset + i); @@ -192,78 +148,5 @@ public class DecimalSliceStreamReader extends AbstractStreamReader { checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger than column precision"); return decimal; - } - - private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - if (isShortDecimal(type)) { - BigDecimal decimalValue = (BigDecimal)columnVector.getData(i); - long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(), - decimalValue.scale(),((DecimalType) type).getScale()); - type.writeLong(builder, rescaledDecimal); - } else { - Slice slice = getSlice(columnVector.getData(i), type); - type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); - } - } - } - } - - private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder) { - DecimalType decimalType = (DecimalType) type; - for (int i = 0; i < numberOfRows; i++) { - BigDecimal decimalValue = (BigDecimal) columnVector.getData(i); - long rescaledDecimal = Decimals - .rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(), - decimalType.getScale()); - type.writeLong(builder, rescaledDecimal); - } - } - - private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - Slice slice = getSlice((columnVector.getData(i)), type); - type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); - } - } - - private void populateShortDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) { - DecimalType decimalType = (DecimalType) type; - for (int i = 0; i < numberOfRows; i++) { - int value = (int) columnVector.getData(i); - Object data = DataTypeUtil.getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), - DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale())); - if (Objects.isNull(data)) { - builder.appendNull(); - } else { - BigDecimal decimalValue = (BigDecimal) data; - long rescaledDecimal = Decimals - .rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(), - decimalType.getScale()); - type.writeLong(builder, rescaledDecimal); - } - } - } - - private void populateLongDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) { - DecimalType decimalType = (DecimalType) type; - for (int i = 0; i < numberOfRows; i++) { - int value = (int) columnVector.getData(i); - Object data = DataTypeUtil.getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), - DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale())); - if (Objects.isNull(data)) { - builder.appendNull(); - } else { - BigDecimal decimalValue = (BigDecimal) data; - Slice slice = getSlice(decimalValue, type); - type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); - } - } - } - - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java index 3e7fc59..384112f 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java @@ -17,95 +17,65 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; - import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import org.apache.carbondata.core.util.DataTypeUtil; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.DoubleType; import com.facebook.presto.spi.type.Type; /** * Class for Reading the Double value and setting it in Block */ -public class DoubleStreamReader extends AbstractStreamReader { +public class DoubleStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder { + + protected int batchSize; + + protected Type type = DoubleType.DOUBLE; + + protected BlockBuilder builder; - private boolean isDictionary; private Dictionary dictionary; - public DoubleStreamReader() { + public DoubleStreamReader(int batchSize, DataType dataType, Dictionary dictionary) { + super(batchSize, dataType); + this.batchSize = batchSize; + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + this.dictionary = dictionary; + } + @Override public Block buildBlock() { + return builder.build(); } - public DoubleStreamReader(boolean isDictionary, Dictionary dictionary) { - this.isDictionary = isDictionary; - this.dictionary = dictionary; + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } - /** - * Create the DoubleType Block - * - * @param type - * @return - * @throws IOException - */ - public Block readBlock(Type type) throws IOException { - int numberOfRows; - BlockBuilder builder; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - if (isDictionary) { - populateDictionaryVector(type, numberOfRows, builder); - } else { - if (columnVector.anyNullsSet()) { - handleNullInVector(type, numberOfRows, builder); - } else { - populateVector(type, numberOfRows, builder); - } - } - } + @Override public void putInt(int rowId, int value) { + Object data = DataTypeUtil + .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.DOUBLE); + if (data != null) { + type.writeDouble(builder, (Double) data); } else { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - type.writeDouble(builder, (Double) streamData[i]); - } + builder.appendNull(); } - - return builder.build(); } - private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeDouble(builder, (Double) columnVector.getData(i)); - } - } + @Override public void putDouble(int rowId, double value) { + type.writeDouble(builder, value); } - private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - type.writeDouble(builder, (Double) columnVector.getData(i)); - } + @Override public void putNull(int rowId) { + builder.appendNull(); } - private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - int value = (int) columnVector.getData(i); - Object data = DataTypeUtil - .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.DOUBLE); - if (data != null) { - type.writeDouble(builder, (Double) data); - } else { - builder.appendNull(); - } - } + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java index ffe1aef..a3ce908 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java @@ -17,86 +17,64 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; - import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import org.apache.carbondata.core.util.DataTypeUtil; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.IntegerType; import com.facebook.presto.spi.type.Type; -public class IntegerStreamReader extends AbstractStreamReader { +public class IntegerStreamReader extends CarbonColumnVectorImpl + implements PrestoVectorBlockBuilder { - private Dictionary dictionaryValues; - private boolean isDictionary; + protected int batchSize; - public IntegerStreamReader() { + protected Type type = IntegerType.INTEGER; - } + protected BlockBuilder builder; - public IntegerStreamReader(boolean isDictionary, Dictionary dictionary) { - this.dictionaryValues = dictionary; - this.isDictionary = isDictionary; + private Dictionary dictionary; + + public IntegerStreamReader(int batchSize, DataType dataType, Dictionary dictionary) { + super(batchSize, dataType); + this.batchSize = batchSize; + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + this.dictionary = dictionary; } - public Block readBlock(Type type) throws IOException { - int numberOfRows; - BlockBuilder builder; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - if (isDictionary) { - populateDictionaryVector(type, numberOfRows, builder); - } else { - if (columnVector.anyNullsSet()) { - handleNullInVector(type, numberOfRows, builder); - } else { - populateVector(type, numberOfRows, builder); - } - } - } - } else { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - type.writeLong(builder, ((Integer) streamData[i]).longValue()); - } - } + @Override public Block buildBlock() { return builder.build(); } - private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + @Override public void putInt(int rowId, int value) { + if (dictionary == null) { + type.writeLong(builder, value); + } else { + Object data = DataTypeUtil + .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.INT); + if (data != null) { + type.writeLong(builder, ((Integer) data).longValue()); } else { - type.writeLong(builder, ((Integer) columnVector.getData(i)).longValue()); + builder.appendNull(); } } } - private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - Integer value = (Integer) columnVector.getData(i); - type.writeLong(builder, value.longValue()); - } + @Override public void putNull(int rowId) { + builder.appendNull(); } - private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - int value = (int) columnVector.getData(i); - Object data = DataTypeUtil - .getDataBasedOnDataType(dictionaryValues.getDictionaryValueForKey(value), - DataTypes.INT); - if (data != null) { - type.writeLong(builder, ((Integer) data).longValue()); - } else { - builder.appendNull(); - } - } + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java index e1000c5..892614d 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java @@ -17,83 +17,62 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; - import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import org.apache.carbondata.core.util.DataTypeUtil; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.BigintType; import com.facebook.presto.spi.type.Type; -public class LongStreamReader extends AbstractStreamReader { +public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder { + + protected int batchSize; + + protected Type type = BigintType.BIGINT; + + protected BlockBuilder builder; - private boolean isDictionary; private Dictionary dictionary; - public LongStreamReader() { + public LongStreamReader(int batchSize, DataType dataType, Dictionary dictionary) { + super(batchSize, dataType); + this.batchSize = batchSize; + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + this.dictionary = dictionary; + } + @Override public Block buildBlock() { + return builder.build(); } - public LongStreamReader(boolean isDictionary, Dictionary dictionary) { - this.isDictionary = isDictionary; - this.dictionary = dictionary; + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } - public Block readBlock(Type type) throws IOException { - int numberOfRows; - BlockBuilder builder; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - if (isDictionary) { - populateDictionaryVector(type, numberOfRows, builder); - } - if (columnVector.anyNullsSet()) { - handleNullInVector(type, numberOfRows, builder); - } else { - populateVector(type, numberOfRows, builder); - } - } + @Override public void putInt(int rowId, int value) { + Object data = DataTypeUtil + .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.LONG); + if (data != null) { + type.writeLong(builder, (Long) data); } else { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - type.writeLong(builder, (Long) streamData[i]); - } + builder.appendNull(); } - return builder.build(); } - private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeLong(builder, (Long) columnVector.getData(i)); - } - } + @Override public void putLong(int rowId, long value) { + type.writeLong(builder, value); } - private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - type.writeLong(builder, (long) columnVector.getData(i)); - } + @Override public void putNull(int rowId) { + builder.appendNull(); } - private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - int value = (int) columnVector.getData(i); - Object data = DataTypeUtil - .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.LONG); - if (data != null) { - type.writeLong(builder, (Long) data); - } else { - builder.appendNull(); - } - } - } + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java index 8952712..e4c9775 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java @@ -17,50 +17,50 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.IntegerType; import com.facebook.presto.spi.type.Type; /** * Class to read the Object Stream */ -public class ObjectStreamReader extends AbstractStreamReader { +public class ObjectStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder { + protected int batchSize; + protected Type type = IntegerType.INTEGER; - public ObjectStreamReader() { + protected BlockBuilder builder; + public ObjectStreamReader(int batchSize, DataType dataType) { + super(batchSize, dataType); + this.batchSize = batchSize; + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); } - /** - * Function to create the object Block - * @param type - * @return - * @throws IOException - */ - public Block readBlock(Type type) throws IOException { - int numberOfRows = 0; - BlockBuilder builder = null; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - for (int i = 0; i < numberOfRows; i++) { - type.writeObject(builder, columnVector.getData(i)); - } - } - } else { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - type.writeObject(builder, streamData[i]); - } - } - + @Override public Block buildBlock() { return builder.build(); } + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + @Override public void putObject(int rowId, Object value) { + type.writeObject(builder, value); + } + + @Override public void putNull(int rowId) { + builder.appendNull(); + } + + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/PrestoVectorBlockBuilder.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/PrestoVectorBlockBuilder.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/PrestoVectorBlockBuilder.java new file mode 100644 index 0000000..001e4c4 --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/PrestoVectorBlockBuilder.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.readers; + +import com.facebook.presto.spi.block.Block; + +public interface PrestoVectorBlockBuilder { + + Block buildBlock(); + + void setBatchSize(int batchSize); + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java index 51f1cd5..d207fd9 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java @@ -17,85 +17,62 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; - import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import org.apache.carbondata.core.util.DataTypeUtil; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.SmallintType; import com.facebook.presto.spi.type.Type; -public class ShortStreamReader extends AbstractStreamReader { +public class ShortStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder { - private boolean isDictionary; - private Dictionary dictionary; + protected int batchSize; - public ShortStreamReader() { + protected Type type = SmallintType.SMALLINT; - } + protected BlockBuilder builder; - public ShortStreamReader(boolean isDictionary, Dictionary dictionary) { - this.isDictionary = isDictionary; + private Dictionary dictionary; + + public ShortStreamReader(int batchSize, DataType dataType, Dictionary dictionary) { + super(batchSize, dataType); + this.batchSize = batchSize; + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); this.dictionary = dictionary; } - public Block readBlock(Type type) throws IOException { - int numberOfRows; - BlockBuilder builder; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - if (isDictionary) { - populateDictionaryVector(type, numberOfRows, builder); - } else { - if (columnVector.anyNullsSet()) { - handleNullInVector(type, numberOfRows, builder); - } else { - populateVector(type, numberOfRows, builder); - } - } - } - } else { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - type.writeLong(builder, (Short) streamData[i]); - } - } + @Override public Block buildBlock() { return builder.build(); } - private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeLong(builder, ((Short) columnVector.getData(i))); - } - } + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } - private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - type.writeLong(builder, (Short) columnVector.getData(i)); + @Override public void putInt(int rowId, int value) { + Object data = DataTypeUtil + .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.SHORT); + if (data != null) { + type.writeLong(builder, (Short) data); + } else { + builder.appendNull(); } } - private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - int value = (int) columnVector.getData(i); - Object data = DataTypeUtil - .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.SHORT); - if (data != null) { - type.writeLong(builder, (Short) data); - } else { - builder.appendNull(); - } - } + @Override public void putShort(int rowId, short value) { + type.writeLong(builder, value); } + @Override public void putNull(int rowId) { + builder.appendNull(); + } + + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java index cce35e0..53ece0b 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java @@ -17,10 +17,8 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; - -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; @@ -28,85 +26,68 @@ import com.facebook.presto.spi.block.BlockBuilderStatus; import com.facebook.presto.spi.block.DictionaryBlock; import com.facebook.presto.spi.block.SliceArrayBlock; import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.VarcharType; -import static io.airlift.slice.Slices.utf8Slice; import static io.airlift.slice.Slices.wrappedBuffer; /** * This class reads the String data and convert it into Slice Block */ -public class SliceStreamReader extends AbstractStreamReader { +public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoVectorBlockBuilder { - private boolean isDictionary; + protected int batchSize; - private SliceArrayBlock dictionarySliceArrayBlock; + protected Type type = VarcharType.VARCHAR; - public SliceStreamReader() { - } + protected BlockBuilder builder; + int[] values; + private SliceArrayBlock dictionarySliceArrayBlock; - public SliceStreamReader(boolean isDictionary, SliceArrayBlock dictionarySliceArrayBlock) { - this.isDictionary = isDictionary; - this.dictionarySliceArrayBlock = dictionarySliceArrayBlock; + public SliceStreamReader(int batchSize, DataType dataType, + SliceArrayBlock dictionarySliceArrayBlock) { + super(batchSize, dataType); + this.batchSize = batchSize; + if (dictionarySliceArrayBlock == null) { + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + } else { + this.dictionarySliceArrayBlock = dictionarySliceArrayBlock; + this.values = new int[batchSize]; + } } - /** - * Function to create the Slice Block - * - * @param type - * @return - * @throws IOException - */ - public Block readBlock(Type type) throws IOException { - int numberOfRows; - BlockBuilder builder; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - if (isDictionary) { - int[] values = new int[numberOfRows]; - for (int i = 0; i < numberOfRows; i++) { - if (!columnVector.isNullAt(i)) { - values[i] = (Integer) columnVector.getData(i); - } - } - return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values); - } else { - if (columnVector.anyNullsSet()) { - handleNullInVector(type, numberOfRows, builder); - } else { - populateVector(type, numberOfRows, builder); - } - } - } + @Override public Block buildBlock() { + if (dictionarySliceArrayBlock == null) { + return builder.build(); } else { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - type.writeSlice(builder, utf8Slice(streamData[i].toString())); - } + return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values); } - - return builder.build(); } - private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeSlice(builder, wrappedBuffer((byte[]) columnVector.getData(i))); - } - } + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } - private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - type.writeSlice(builder, wrappedBuffer((byte[]) columnVector.getData(i))); - } + @Override public void putInt(int rowId, int value) { + values[rowId] = value; } + @Override public void putBytes(int rowId, byte[] value) { + type.writeSlice(builder, wrappedBuffer(value)); + } + @Override public void putBytes(int rowId, int offset, int length, byte[] value) { + byte[] byteArr = new byte[length]; + System.arraycopy(value, offset, byteArr, 0, length); + type.writeSlice(builder, wrappedBuffer(byteArr)); + } + @Override public void putNull(int rowId) { + if (dictionarySliceArrayBlock == null) { + builder.appendNull(); + } + } + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java deleted file mode 100644 index c3cd6c0..0000000 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.presto.readers; - -import java.io.IOException; - -import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; - -import com.facebook.presto.spi.block.Block; -import com.facebook.presto.spi.type.Type; - -/** - * Interface for StreamReader - */ -public interface StreamReader { - - Block readBlock(Type type) throws IOException; - - void setStreamData(Object[] data); - - void setVector(CarbonColumnVectorImpl vector); - - void setVectorReader(boolean isVectorReader); - - void setBatchSize(int batchSize); - - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java deleted file mode 100644 index 1ad3d28..0000000 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.presto.readers; - -import org.apache.carbondata.core.cache.dictionary.Dictionary; - -import com.facebook.presto.spi.block.SliceArrayBlock; -import com.facebook.presto.spi.type.DateType; -import com.facebook.presto.spi.type.DecimalType; -import com.facebook.presto.spi.type.IntegerType; -import com.facebook.presto.spi.type.SmallintType; -import com.facebook.presto.spi.type.TimestampType; -import com.facebook.presto.spi.type.Type; -import io.airlift.slice.Slice; - -/** - * This class creates streamReader - * Based on type. - */ -public final class StreamReaders { - /** - * This function select Stream readers based on Type and use it. - * - * @param type - * @param dictionarySliceArrayBlock - * @return StreamReader - */ - public static StreamReader createStreamReader(Type type, - SliceArrayBlock dictionarySliceArrayBlock, Dictionary dictionary) { - Class<?> javaType = type.getJavaType(); - if (dictionary != null) { - if (javaType == long.class) { - if (type instanceof IntegerType || type instanceof DateType) { - return new IntegerStreamReader(true, dictionary); - } else if (type instanceof DecimalType) { - return new DecimalSliceStreamReader(true, dictionary); - } else if (type instanceof SmallintType) { - return new ShortStreamReader(true, dictionary); - } - return new LongStreamReader(true, dictionary); - - } else if (javaType == double.class) { - return new DoubleStreamReader(true, dictionary); - } else if (javaType == Slice.class) { - if (type instanceof DecimalType) { - return new DecimalSliceStreamReader(true, dictionary); - } else { - return new SliceStreamReader(true, dictionarySliceArrayBlock); - } - }else if (javaType == boolean.class) { - return new BooleanStreamReader(true,dictionary); - } else { - return new ObjectStreamReader(); - } - } else { - if (javaType == long.class) { - if (type instanceof IntegerType || type instanceof DateType) { - return new IntegerStreamReader(); - } else if (type instanceof DecimalType) { - return new DecimalSliceStreamReader(); - } else if (type instanceof SmallintType) { - return new ShortStreamReader(); - } else if (type instanceof TimestampType) { - return new TimestampStreamReader(); - } - return new LongStreamReader(); - - } else if (javaType == double.class) { - return new DoubleStreamReader(); - } else if (javaType == Slice.class) { - if (type instanceof DecimalType) { - return new DecimalSliceStreamReader(); - } else { - return new SliceStreamReader(); - } - }else if (javaType == boolean.class) { - return new BooleanStreamReader(); - } else { - return new ObjectStreamReader(); - } - - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java index a22ef29..f52916c 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java @@ -17,60 +17,63 @@ package org.apache.carbondata.presto.readers; -import java.io.IOException; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.core.util.DataTypeUtil; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.TimestampType; import com.facebook.presto.spi.type.Type; -public class TimestampStreamReader extends AbstractStreamReader { +public class TimestampStreamReader extends CarbonColumnVectorImpl + implements PrestoVectorBlockBuilder { - private int TIMESTAMP_DIVISOR = 1000; + protected int batchSize; - public TimestampStreamReader() { + protected Type type = TimestampType.TIMESTAMP; - } + protected BlockBuilder builder; - public Block readBlock(Type type) throws IOException { - int numberOfRows = 0; - BlockBuilder builder = null; - if (isVectorReader) { - numberOfRows = batchSize; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - if (columnVector != null) { - if (columnVector.anyNullsSet()) { - handleNullInVector(type, numberOfRows, builder); - } else { - populateVector(type, numberOfRows, builder); - } - } + private Dictionary dictionary; - } else { - numberOfRows = streamData.length; - builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); - for (int i = 0; i < numberOfRows; i++) { - type.writeLong(builder, (Long) streamData[i]); - } - } + public TimestampStreamReader(int batchSize, DataType dataType, Dictionary dictionary) { + super(batchSize, dataType); + this.batchSize = batchSize; + this.builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + this.dictionary = dictionary; + } + @Override public Block buildBlock() { return builder.build(); } - private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeLong(builder, (Long)columnVector.getData(i)/ TIMESTAMP_DIVISOR); - } - } + @Override public void setBatchSize(int batchSize) { + this.batchSize = batchSize; } - private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { - for (int i = 0; i < numberOfRows; i++) { - type.writeLong(builder, (Long)columnVector.getData(i)/TIMESTAMP_DIVISOR); + @Override public void putInt(int rowId, int value) { + Object data = DataTypeUtil + .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.LONG); + if (data != null) { + type.writeLong(builder, (Long) data / 1000); + } else { + builder.appendNull(); } } + @Override public void putLong(int rowId, long value) { + type.writeLong(builder, value / 1000); + } + + @Override public void putNull(int rowId) { + builder.appendNull(); + } + + @Override public void reset() { + builder = type.createBlockBuilder(new BlockBuilderStatus(), batchSize); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a4c2ef5f/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala index 82cdf3a..42d7c93 100644 --- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala +++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala @@ -93,7 +93,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] { while (chunks.hasNext) { { val value: Array[Byte] = chunks.next - if(count ==1) { + if (count == 1) { sliceArray(count) = null } else { @@ -123,6 +123,10 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] { dictionaries } + def getDataTypes: Array[DataType] = { + dataTypes + } + /** * to book keep the dictionary cache or update access count for each * column involved during decode, to facilitate LRU cache policy if memory
