Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3001#discussion_r243176905
--- Diff:
integration/presto/src/main/java/org/apache/carbondata/presto/streaming/PrestoCarbonStreamRecordReader.java
---
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.streaming;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import
org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
+import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
+import org.apache.carbondata.core.scan.complextypes.StructQueryType;
+import
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class PrestoCarbonStreamRecordReader extends RecordReader<Void,
Object> {
+
+ public static final String READ_BUFFER_SIZE =
"carbon.stream.read.buffer.size";
+ public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+
+ // metadata
+ private CarbonTable carbonTable;
+ private CarbonColumn[] storageColumns;
+ private boolean[] isRequired;
+ private DataType[] measureDataTypes;
+ private int dimensionCount;
+ private int measureCount;
+
+ // input
+ private FileSplit fileSplit;
+ private Configuration hadoopConf;
+ private PrestoStreamBlockletReader input;
+ private boolean isFirstRow = true;
+ private QueryModel model;
+
+ // decode data
+ private BitSet allNonNull;
+ private boolean[] isNoDictColumn;
+ private DirectDictionaryGenerator[] directDictionaryGenerators;
+ private CacheProvider cacheProvider;
+ private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+ private GenericQueryType[] queryTypes;
+ private String compressorName;
+
+ // vectorized reader
+ private boolean isFinished = false;
+
+ // filter
+ private FilterExecuter filter;
+ private boolean[] isFilterRequired;
+ private Object[] filterValues;
+ private RowIntf filterRow;
+ private int[] filterMap;
+
+ // output
+ private CarbonColumn[] projection;
+ private boolean[] isProjectionRequired;
+ private int[] projectionMap;
+ private Object[] outputValues;
+
+ // empty project, null filter
+ private boolean skipScanData;
+
+ public PrestoCarbonStreamRecordReader(QueryModel mdl) {
+ this.model = mdl;
+
+ }
+ @Override public void initialize(InputSplit split, TaskAttemptContext
context)
+ throws IOException {
+ // input
+ if (split instanceof CarbonInputSplit) {
+ fileSplit = (CarbonInputSplit) split;
+ } else if (split instanceof CarbonMultiBlockSplit) {
+ fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+ } else {
+ fileSplit = (FileSplit) split;
+ }
+
+ // metadata
+ hadoopConf = context.getConfiguration();
+ if (model == null) {
+ CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+ model = format.createQueryModel(split, context);
+ }
+ carbonTable = model.getTable();
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
+ dimensionCount = dimensions.size();
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
+ measureCount = measures.size();
+ List<CarbonColumn> carbonColumnList =
+
carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
+ storageColumns = carbonColumnList.toArray(new
CarbonColumn[carbonColumnList.size()]);
+ isNoDictColumn =
CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+ directDictionaryGenerators = new
DirectDictionaryGenerator[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+ }
+ }
+ measureDataTypes = new DataType[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureDataTypes[i] = storageColumns[dimensionCount +
i].getDataType();
+ }
+
+ // decode data
+ allNonNull = new BitSet(storageColumns.length);
+ projection = model.getProjectionColumns();
+
+ isRequired = new boolean[storageColumns.length];
+ boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+ boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+ isFilterRequired = new boolean[storageColumns.length];
+ filterMap = new int[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].isDimension()) {
+ if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+ isRequired[i] = true;
+ isFilterRequired[i] = true;
+ filterMap[i] = storageColumns[i].getOrdinal();
+ }
+ } else {
+ if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+ isRequired[i] = true;
+ isFilterRequired[i] = true;
+ filterMap[i] = carbonTable.getDimensionOrdinalMax() +
storageColumns[i].getOrdinal();
+ }
+ }
+ }
+
+ isProjectionRequired = new boolean[storageColumns.length];
+ projectionMap = new int[storageColumns.length];
+ for (int j = 0; j < projection.length; j++) {
+ for (int i = 0; i < storageColumns.length; i++) {
+ if
(storageColumns[i].getColName().equals(projection[j].getColName())) {
+ isRequired[i] = true;
+ isProjectionRequired[i] = true;
+ projectionMap[i] = j;
+ break;
+ }
+ }
+ }
+
+ // initialize filter
+ if (null != model.getFilterExpressionResolverTree()) {
+ initializeFilter();
+ } else if (projection.length == 0) {
+ skipScanData = true;
+ }
+
+ }
+
+ private void initializeFilter() {
+ List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+ int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+ for (int i = 0; i < dimLensWithComplex.length; i++) {
+ dimLensWithComplex[i] = Integer.MAX_VALUE;
+ }
+
+ int[] dictionaryColumnCardinality =
+ CarbonUtil.getFormattedCardinality(dimLensWithComplex,
wrapperColumnSchemaList);
+ SegmentProperties segmentProperties =
+ new SegmentProperties(wrapperColumnSchemaList,
dictionaryColumnCardinality);
+ Map<Integer, GenericQueryType> complexDimensionInfoMap = new
HashMap<>();
+
+ FilterResolverIntf resolverIntf =
model.getFilterExpressionResolverTree();
+ filter = FilterUtil.getFilterExecuterTree(resolverIntf,
segmentProperties,
+ complexDimensionInfoMap);
+ // for row filter, we need update column index
+
FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+ carbonTable.getDimensionOrdinalMax());
+
+ }
+
+ private byte[] getSyncMarker(String filePath) throws IOException {
+ CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+ FileHeader header = headerReader.readHeader();
+ // legacy store does not have this member
+ if (header.isSetCompressor_name()) {
+ compressorName = header.getCompressor_name();
+ } else {
+ compressorName =
CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
+ }
+ return header.getSync_marker();
+ }
+
+ private void initializeAtFirstRow() throws IOException {
+ filterValues = new Object[carbonTable.getDimensionOrdinalMax() +
measureCount];
+ filterRow = new RowImpl();
+ filterRow.setValues(filterValues);
+
+ outputValues = new Object[projection.length];
+
+ Path file = fileSplit.getPath();
+
+ byte[] syncMarker = getSyncMarker(file.toString());
+
+ FileSystem fs = file.getFileSystem(hadoopConf);
+
+ int bufferSize = Integer.parseInt(hadoopConf.get(READ_BUFFER_SIZE,
READ_BUFFER_SIZE_DEFAULT));
+
+ FSDataInputStream fileIn = fs.open(file, bufferSize);
+ fileIn.seek(fileSplit.getStart());
+ input = new PrestoStreamBlockletReader(syncMarker, fileIn,
fileSplit.getLength(),
+ fileSplit.getStart() == 0, compressorName);
+
+ cacheProvider = CacheProvider.getInstance();
+ cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
+ queryTypes = getComplexDimensions(carbonTable, storageColumns, cache);
+ }
+
+ @Override public boolean nextKeyValue() throws IOException,
InterruptedException {
+ if (isFirstRow) {
+ isFirstRow = false;
+ initializeAtFirstRow();
+ }
+ if (isFinished) {
+ return false;
+ }
+
+ return nextRow();
--- End diff --
The stream file is row format, it doesn't have vector flow.
I already remove this file and just reuse old record reader.
---