Repository: carbondata Updated Branches: refs/heads/master 06ddd82f6 -> f2bb8d380
[CARBONDATA-1323] Presto Optimization for Integration Layer This closes #1190 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f2bb8d38 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f2bb8d38 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f2bb8d38 Branch: refs/heads/master Commit: f2bb8d3804973e02a45fb4f1ac6cb458590af1e5 Parents: 06ddd82 Author: Bhavya Aggarwal <[email protected]> Authored: Fri Jul 21 13:06:46 2017 +0530 Committer: chenliang613 <[email protected]> Committed: Wed Jul 26 22:36:56 2017 +0800 ---------------------------------------------------------------------- integration/presto/pom.xml | 27 +++++++ .../carbondata/presto/CarbondataPageSource.java | 75 +++++++++++++----- .../presto/CarbondataRecordCursor.java | 83 +++++++++----------- .../carbondata/presto/CarbondataRecordSet.java | 42 +++++----- .../presto/CarbonDictionaryDecodeSupport.scala | 65 +++++++++++++++ 5 files changed, 208 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/pom.xml ---------------------------------------------------------------------- diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml index 25eb6a7..4619413 100644 --- a/integration/presto/pom.xml +++ b/integration/presto/pom.xml @@ -228,6 +228,33 @@ <skip>true</skip> </configuration> </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java index 7c50c66..c03983e 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java @@ -27,7 +27,10 @@ import com.facebook.presto.spi.Page; import com.facebook.presto.spi.PageBuilder; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.RecordSet; +import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; +import com.facebook.presto.spi.block.LazyBlock; +import com.facebook.presto.spi.block.LazyBlockLoader; import com.facebook.presto.spi.type.DecimalType; import com.facebook.presto.spi.type.Type; import io.airlift.slice.Slice; @@ -51,21 +54,20 @@ public class CarbondataPageSource implements ConnectorPageSource { private final PageBuilder pageBuilder; private boolean closed; private final char[] buffer = new char[100]; + private Block[] blocks; - public CarbondataPageSource(RecordSet recordSet) - { + public CarbondataPageSource(RecordSet recordSet) { this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor()); } - public CarbondataPageSource(List<Type> types, RecordCursor cursor) - { + public CarbondataPageSource(List<Type> types, RecordCursor cursor) { this.cursor = requireNonNull(cursor, "cursor is null"); this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null"))); this.pageBuilder = new PageBuilder(this.types); + this.blocks = new Block[types.size()]; } - public RecordCursor getCursor() - { + public RecordCursor getCursor() { return cursor; } @@ -86,6 +88,9 @@ public class CarbondataPageSource implements ConnectorPageSource { } @Override public Page getNextPage() { + BlockBuilder output; + Page page; + int size = types.size(); if (!closed) { int i; for (i = 0; i < ROWS_PER_REQUEST; i++) { @@ -98,8 +103,9 @@ public class CarbondataPageSource implements ConnectorPageSource { } pageBuilder.declarePosition(); - for (int column = 0; column < types.size(); column++) { - BlockBuilder output = pageBuilder.getBlockBuilder(column); + + for (int column = 0; column < size; column++) { + output = pageBuilder.getBlockBuilder(column); if (cursor.isNull(column)) { output.appendNull(); } else { @@ -113,8 +119,7 @@ public class CarbondataPageSource implements ConnectorPageSource { type.writeDouble(output, cursor.getDouble(column)); } else if (javaType == Slice.class) { Slice slice = cursor.getSlice(column); - if(type instanceof DecimalType) - { + if (type instanceof DecimalType) { if (isShortDecimal(type)) { type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length())); } else { @@ -127,6 +132,8 @@ public class CarbondataPageSource implements ConnectorPageSource { type.writeObject(output, cursor.getObject(column)); } } + blocks[column] = new LazyBlock(output.getPositionCount(), + new CarbonBlockLoader(output.build(), types.get(column))); } } } @@ -135,10 +142,16 @@ public class CarbondataPageSource implements ConnectorPageSource { if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) { return null; } - Page page = pageBuilder.build(); + + if (blocks != null && blocks.length > 0) { + page = new Page(blocks[0].getPositionCount(), blocks); + } else { + page = pageBuilder.build(); + } + pageBuilder.reset(); return page; - } + } @Override public long getSystemMemoryUsage() { return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes(); @@ -150,29 +163,49 @@ public class CarbondataPageSource implements ConnectorPageSource { } - private long parseLong(DecimalType type, Slice slice, int offset, int length) - { + private long parseLong(DecimalType type, Slice slice, int offset, int length) { BigDecimal decimal = parseBigDecimal(type, slice, offset, length); return decimal.unscaledValue().longValue(); } - - private Slice parseSlice(DecimalType type, Slice slice, int offset, int length) - { + private Slice parseSlice(DecimalType type, Slice slice, int offset, int length) { BigDecimal decimal = parseBigDecimal(type, slice, offset, length); return encodeUnscaledValue(decimal.unscaledValue()); } - private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length) - { + private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length) { checkArgument(length < buffer.length); for (int i = 0; i < length; i++) { buffer[i] = (char) slice.getByte(offset + i); } BigDecimal decimal = new BigDecimal(buffer, 0, length); - checkState(decimal.scale() <= type.getScale(), "Read decimal value scale larger than column scale"); + checkState(decimal.scale() <= type.getScale(), + "Read decimal value scale larger than column scale"); decimal = decimal.setScale(type.getScale(), HALF_UP); - checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger than column precision"); + checkState(decimal.precision() <= type.getPrecision(), + "Read decimal precision larger than column precision"); return decimal; } + + /** + * Using the LazyBlockLoader + */ + private final class CarbonBlockLoader implements LazyBlockLoader<LazyBlock> { + private boolean loaded; + private Block dataBlock; + private Type type; + + public CarbonBlockLoader(Block dataBlock, Type type) { + this.dataBlock = dataBlock; + this.type = type; + } + + @Override public void load(LazyBlock block) { + if (loaded) { + return; + } + block.setBlock(dataBlock); + loaded = true; + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java index 2e97dc0..d6b1422 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java @@ -17,29 +17,24 @@ package org.apache.carbondata.presto; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.metadata.datatype.DataType; + import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.type.DecimalType; import com.facebook.presto.spi.type.Decimals; import com.facebook.presto.spi.type.TimestampType; import com.facebook.presto.spi.type.Type; -import com.facebook.presto.spi.block.Block; -import com.facebook.presto.spi.block.BlockBuilder; -import com.facebook.presto.spi.block.BlockBuilderStatus; - -import com.google.common.base.Strings; import io.airlift.log.Logger; import io.airlift.slice.Slice; -import io.airlift.slice.Slices; -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; - - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.List; - +import scala.Int; +import scala.Tuple3; import static com.facebook.presto.spi.type.BooleanType.BOOLEAN; import static com.facebook.presto.spi.type.Decimals.isShortDecimal; @@ -55,22 +50,24 @@ public class CarbondataRecordCursor implements RecordCursor { private static final Logger log = Logger.get(CarbondataRecordCursor.class); private final List<CarbondataColumnHandle> columnHandles; - private List<String> fields; + private Object[] fields; private CarbondataSplit split; private CarbonIterator<Object[]> rowCursor; - private CarbonReadSupport<Object[]> readSupport; + private CarbonDictionaryDecodeReaderSupport readSupport; + private Tuple3<DataType,Dictionary,Int>[] dictionary; private long totalBytes; private long nanoStart; private long nanoEnd; - public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport, + public CarbondataRecordCursor(CarbonDictionaryDecodeReaderSupport readSupport, CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles, - CarbondataSplit split) { + CarbondataSplit split, Tuple3<DataType,Dictionary,Int>[] dictionaries ) { this.rowCursor = carbonIterator; this.columnHandles = columnHandles; this.readSupport = readSupport; this.totalBytes = 0; + this.dictionary = dictionaries; } @Override public long getTotalBytes() { @@ -101,20 +98,8 @@ public class CarbondataRecordCursor implements RecordCursor { } if (rowCursor.hasNext()) { - Object[] columns = readSupport.readRow(rowCursor.next()); - fields = new ArrayList<String>(); - if(columns != null && columns.length > 0) - { - for(Object value : columns){ - if(value != null ) - { - fields.add(value.toString()); - } else { - fields.add(null); - } - } - } - totalBytes += columns.length; + fields = readSupport.readRow(rowCursor.next(),dictionary); + totalBytes += fields.length; return true; } return false; @@ -122,22 +107,30 @@ public class CarbondataRecordCursor implements RecordCursor { @Override public boolean getBoolean(int field) { checkFieldType(field, BOOLEAN); - return Boolean.parseBoolean(getFieldValue(field)); + return (Boolean)getFieldValue(field); } @Override public long getLong(int field) { - String timeStr = getFieldValue(field); + Object obj = getFieldValue(field); + Long timeStr = 0L; + if( obj instanceof Integer ){ + timeStr = ((Integer)obj).longValue(); + } else if( obj instanceof Long ) { + timeStr = (Long)obj; + } else { + timeStr = Math.round(Double.parseDouble(obj.toString())); + } Type actual = getType(field); if(actual instanceof TimestampType){ - return new Timestamp(Long.parseLong(timeStr)).getTime()/1000; + return new Timestamp(timeStr).getTime()/1000; } //suppose the - return Math.round(Double.parseDouble(getFieldValue(field))); + return timeStr; } @Override public double getDouble(int field) { checkFieldType(field, DOUBLE); - return Double.parseDouble(getFieldValue(field)); + return (Double)getFieldValue(field); } @Override public Slice getSlice(int field) { @@ -150,8 +143,8 @@ public class CarbondataRecordCursor implements RecordCursor { } else { checkFieldType(field, DecimalType.createDecimalType()); } - String fieldValue = getFieldValue(field); - BigDecimal bigDecimalValue = new BigDecimal(fieldValue); + Object fieldValue = getFieldValue(field); + BigDecimal bigDecimalValue = new BigDecimal(fieldValue.toString()); if (isShortDecimal(decimalType)) { return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale())); } else { @@ -174,7 +167,7 @@ public class CarbondataRecordCursor implements RecordCursor { } } else { checkFieldType(field, VARCHAR); - return utf8Slice(getFieldValue(field)); + return utf8Slice(getFieldValue(field).toString()); } } @@ -184,12 +177,12 @@ public class CarbondataRecordCursor implements RecordCursor { @Override public boolean isNull(int field) { checkArgument(field < columnHandles.size(), "Invalid field index"); - return Strings.isNullOrEmpty(getFieldValue(field)); + return getFieldValue(field) == null; } - String getFieldValue(int field) { + Object getFieldValue(int field) { checkState(fields != null, "Cursor has not been advanced yet"); - return fields.get(field); + return fields[field]; } private void checkFieldType(int field, Type expected) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java index d75cbfb..661e83f 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java @@ -17,9 +17,10 @@ package org.apache.carbondata.presto; -import com.facebook.presto.spi.*; -import com.facebook.presto.spi.predicate.TupleDomain; -import com.facebook.presto.spi.type.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.datastore.block.BlockletInfos; import org.apache.carbondata.core.datastore.block.TableBlockInfo; @@ -32,16 +33,20 @@ import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.BatchResult; import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator; -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; -import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; -//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.RecordCursor; +import com.facebook.presto.spi.RecordSet; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.facebook.presto.spi.type.Type; +import scala.Tuple3; import static org.apache.carbondata.presto.Types.checkType; +//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl; + public class CarbondataRecordSet implements RecordSet { private CarbonTable carbonTable; @@ -53,7 +58,7 @@ public class CarbondataRecordSet implements RecordSet { private List<CarbondataColumnHandle> columns; private QueryExecutor queryExecutor; - private CarbonReadSupport<Object[]> readSupport; + private CarbonDictionaryDecodeReaderSupport readSupport; public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session, ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) { @@ -63,7 +68,7 @@ public class CarbondataRecordSet implements RecordSet { this.rebuildConstraints = this.split.getRebuildConstraints(); this.queryModel = queryModel; this.columns = columns; - this.readSupport = new DictionaryDecodeReadSupport(); + this.readSupport = new CarbonDictionaryDecodeReaderSupport(); } //todo support later @@ -84,24 +89,25 @@ public class CarbondataRecordSet implements RecordSet { tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(), split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(), split.getLocalInputSplit().getLocations().toArray(new String[0]), - split.getLocalInputSplit().getLength(),new BlockletInfos(), + split.getLocalInputSplit().getLength(), new BlockletInfos(), //blockletInfos, - ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()),null)); + ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()), null)); queryModel.setTableBlockInfos(tableBlockInfoList); queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); - //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId()); try { - readSupport + + Tuple3[] dict = readSupport .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier()); CarbonIterator<Object[]> carbonIterator = new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel)); - RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split); + RecordCursor rc = + new CarbondataRecordCursor(readSupport, carbonIterator, columns, split, dict); return rc; } catch (QueryExecutionException e) { - throw new RuntimeException(e.getMessage(), e); - } catch (Exception ex) { + throw new RuntimeException(e.getMessage(), e); + } catch (Exception ex) { throw new RuntimeException(ex.getMessage(), ex); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala new file mode 100644 index 0000000..fbdfebd --- /dev/null +++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.presto + +import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} +import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn + + +class CarbonDictionaryDecodeReaderSupport[T] { + + def initialize(carbonColumns: Array[CarbonColumn], + absoluteTableIdentifier: AbsoluteTableIdentifier): Array[(DataType, Dictionary, Int)] = { + + carbonColumns.zipWithIndex.filter(dictChecker(_)).map { carbonColumnWithIndex => + val (carbonColumn, index) = carbonColumnWithIndex + val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = + CacheProvider.getInstance() + .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier + .getStorePath) + val dict: Dictionary = forwardDictionaryCache + .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier, + carbonColumn.getColumnIdentifier, + carbonColumn.getDataType)) + (carbonColumn.getDataType, dict, index) + } + } + + private def dictChecker(carbonColumWithIndex: (CarbonColumn, Int)): Boolean = { + val (carbonColumn, _) = carbonColumWithIndex + if (!carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumn.isComplex && + carbonColumn.hasEncoding(Encoding.DICTIONARY)) { + true + } else { + false + } + } + + def readRow(data: Array[Object], + dictionaries: Array[(DataType, Dictionary, Int)]): Array[Object] = { + dictionaries.foreach { (dictionary: (DataType, Dictionary, Int)) => + val (_, dict, position) = dictionary + data(position) = dict.getDictionaryValueForKey(data(position).asInstanceOf[Int]) + } + data + } + +}
