http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java new file mode 100644 index 0000000..defe766 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.row; + +import org.apache.carbondata.core.indexstore.schema.DataMapSchema; + +/** + * It is just a normal row to store data. Implementation classes could be safe and unsafe. + * TODO move this class a global row and use across loading after DataType is changed class + */ +public abstract class DataMapRow { + + protected DataMapSchema[] schemas; + + public DataMapRow(DataMapSchema[] schemas) { + this.schemas = schemas; + } + + public abstract byte[] getByteArray(int ordinal); + + public abstract DataMapRow getRow(int ordinal); + + public abstract void setRow(DataMapRow row, int ordinal); + + public abstract void setByteArray(byte[] byteArray, int ordinal); + + public abstract int getInt(int ordinal); + + public abstract void setInt(int value, int ordinal); + + public abstract void setByte(byte value, int ordinal); + + public abstract byte getByte(int ordinal); + + public abstract void setShort(short value, int ordinal); + + public abstract short getShort(int ordinal); + + public abstract void setLong(long value, int ordinal); + + public abstract long getLong(int ordinal); + + public abstract void setFloat(float value, int ordinal); + + public abstract float getFloat(int ordinal); + + public abstract void setDouble(double value, int ordinal); + + public abstract double getDouble(int ordinal); + + public int getTotalSizeInBytes() { + int len = 0; + for (int i = 0; i < schemas.length; i++) { + len += getSizeInBytes(i); + } + return len; + } + + public int getSizeInBytes(int ordinal) { + switch (schemas[ordinal].getSchemaType()) { + case FIXED: + return schemas[ordinal].getLength(); + case VARIABLE: + return getByteArray(ordinal).length + 2; + case STRUCT: + return getRow(ordinal).getTotalSizeInBytes(); + default: + throw new UnsupportedOperationException("wrong type"); + } + } + + public int getColumnCount() { + return schemas.length; + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java new file mode 100644 index 0000000..adec346 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.row; + +import org.apache.carbondata.core.indexstore.schema.DataMapSchema; +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * Data map row. + */ +public class DataMapRowImpl extends DataMapRow { + + private Object[] data; + + public DataMapRowImpl(DataMapSchema[] schemas) { + super(schemas); + this.data = new Object[schemas.length]; + } + + @Override public byte[] getByteArray(int ordinal) { + return (byte[]) data[ordinal]; + } + + @Override public DataMapRow getRow(int ordinal) { + return (DataMapRow) data[ordinal]; + } + + @Override public void setByteArray(byte[] byteArray, int ordinal) { + assert (schemas[ordinal].getDataType() == DataType.BYTE_ARRAY); + data[ordinal] = byteArray; + } + + @Override public int getInt(int ordinal) { + return (Integer) data[ordinal]; + } + + @Override public void setInt(int value, int ordinal) { + assert (schemas[ordinal].getDataType() == DataType.INT); + data[ordinal] = value; + } + + @Override public void setByte(byte value, int ordinal) { + assert (schemas[ordinal].getDataType() == DataType.BYTE); + data[ordinal] = value; + } + + @Override public byte getByte(int ordinal) { + return (Byte) data[ordinal]; + } + + @Override public void setShort(short value, int ordinal) { + assert (schemas[ordinal].getDataType() == DataType.SHORT); + data[ordinal] = value; + } + + @Override public short getShort(int ordinal) { + return (Short) data[ordinal]; + } + + @Override public void setLong(long value, int ordinal) { + assert (schemas[ordinal].getDataType() == DataType.LONG); + data[ordinal] = value; + } + + @Override public long getLong(int ordinal) { + return (Long) data[ordinal]; + } + + @Override public void setFloat(float value, int ordinal) { + assert (schemas[ordinal].getDataType() == DataType.FLOAT); + data[ordinal] = value; + } + + @Override public float getFloat(int ordinal) { + return (Float) data[ordinal]; + } + + @Override public void setDouble(double value, int ordinal) { + assert (schemas[ordinal].getDataType() == DataType.DOUBLE); + data[ordinal] = value; + } + + @Override public void setRow(DataMapRow row, int ordinal) { + assert (schemas[ordinal].getDataType() == DataType.STRUCT); + data[ordinal] = row; + } + + @Override public double getDouble(int ordinal) { + return (Double) data[ordinal]; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java new file mode 100644 index 0000000..ef78514 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore.row; + +import org.apache.carbondata.core.indexstore.schema.DataMapSchema; +import org.apache.carbondata.core.memory.MemoryBlock; + +import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET; +import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe; + +/** + * Unsafe implementation of data map row. + */ +public class UnsafeDataMapRow extends DataMapRow { + + private MemoryBlock block; + + private int pointer; + + public UnsafeDataMapRow(DataMapSchema[] schemas, MemoryBlock block, int pointer) { + super(schemas); + this.block = block; + this.pointer = pointer; + } + + @Override public byte[] getByteArray(int ordinal) { + int length; + int position = getPosition(ordinal); + switch (schemas[ordinal].getSchemaType()) { + case VARIABLE: + length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position); + position += 2; + break; + default: + length = schemas[ordinal].getLength(); + } + byte[] data = new byte[length]; + unsafe.copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data, + BYTE_ARRAY_OFFSET, data.length); + return data; + } + + @Override public DataMapRow getRow(int ordinal) { + DataMapSchema[] childSchemas = + ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas(); + return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal)); + } + + @Override public void setByteArray(byte[] byteArray, int ordinal) { + throw new UnsupportedOperationException("Not supported to set on unsafe row"); + } + + @Override public int getInt(int ordinal) { + return unsafe + .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + } + + @Override public void setInt(int value, int ordinal) { + throw new UnsupportedOperationException("Not supported to set on unsafe row"); + } + + @Override public void setByte(byte value, int ordinal) { + throw new UnsupportedOperationException("Not supported to set on unsafe row"); + } + + @Override public byte getByte(int ordinal) { + return unsafe + .getByte(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + } + + @Override public void setShort(short value, int ordinal) { + throw new UnsupportedOperationException("Not supported to set on unsafe row"); + } + + @Override public short getShort(int ordinal) { + return unsafe + .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + } + + @Override public void setLong(long value, int ordinal) { + throw new UnsupportedOperationException("Not supported to set on unsafe row"); + } + + @Override public long getLong(int ordinal) { + return unsafe + .getLong(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + } + + @Override public void setFloat(float value, int ordinal) { + throw new UnsupportedOperationException("Not supported to set on unsafe row"); + } + + @Override public float getFloat(int ordinal) { + return unsafe + .getFloat(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + } + + @Override public void setDouble(double value, int ordinal) { + throw new UnsupportedOperationException("Not supported to set on unsafe row"); + } + + @Override public double getDouble(int ordinal) { + return unsafe + .getDouble(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + } + + @Override public void setRow(DataMapRow row, int ordinal) { + throw new UnsupportedOperationException("Not supported to set on unsafe row"); + } + + private int getPosition(int ordinal) { + int position = 0; + for (int i = 0; i < ordinal; i++) { + position += getSizeInBytes(i); + } + return position; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java new file mode 100644 index 0000000..80c68ac --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.schema; + +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * It just have 2 types right now, either fixed or variable. + */ +public abstract class DataMapSchema { + + protected DataType dataType; + + public DataMapSchema(DataType dataType) { + this.dataType = dataType; + } + + /** + * Either fixed or variable length. + * + * @return + */ + public DataType getDataType() { + return dataType; + } + + /** + * Gives length in case of fixed schema other wise returns length + * + * @return + */ + public abstract int getLength(); + + /** + * schema type + * @return + */ + public abstract DataMapSchemaType getSchemaType(); + + /* + * It has always fixed length, length cannot be updated later. + * Usage examples : all primitive types like short, int etc + */ + public static class FixedDataMapSchema extends DataMapSchema { + + private int length; + + public FixedDataMapSchema(DataType dataType) { + super(dataType); + } + + public FixedDataMapSchema(DataType dataType, int length) { + super(dataType); + this.length = length; + } + + @Override public int getLength() { + if (length == 0) { + return dataType.getSizeInBytes(); + } else { + return length; + } + } + + @Override public DataMapSchemaType getSchemaType() { + return DataMapSchemaType.FIXED; + } + } + + public static class VariableDataMapSchema extends DataMapSchema { + + public VariableDataMapSchema(DataType dataType) { + super(dataType); + } + + @Override public int getLength() { + return dataType.getSizeInBytes(); + } + + @Override public DataMapSchemaType getSchemaType() { + return DataMapSchemaType.VARIABLE; + } + } + + public static class StructDataMapSchema extends DataMapSchema { + + private DataMapSchema[] childSchemas; + + public StructDataMapSchema(DataType dataType, DataMapSchema[] childSchemas) { + super(dataType); + this.childSchemas = childSchemas; + } + + @Override public int getLength() { + return dataType.getSizeInBytes(); + } + + public DataMapSchema[] getChildSchemas() { + return childSchemas; + } + + @Override public DataMapSchemaType getSchemaType() { + return DataMapSchemaType.STRUCT; + } + } + + public enum DataMapSchemaType { + FIXED, VARIABLE, STRUCT + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java new file mode 100644 index 0000000..9d77010 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.schema; + +/** + * Types of filters of select query + */ +public enum FilterType { + EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java index bfa9d7e..f81f805 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java @@ -17,16 +17,22 @@ package org.apache.carbondata.core.metadata.blocklet; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk; import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; +import org.apache.hadoop.io.Writable; + /** * class to store the information about the blocklet */ -public class BlockletInfo implements Serializable { +public class BlockletInfo implements Serializable, Writable { /** * serialization id @@ -189,4 +195,49 @@ public class BlockletInfo implements Serializable { this.numberOfPages = numberOfPages; } + @Override public void write(DataOutput output) throws IOException { + output.writeLong(dimensionOffset); + output.writeLong(measureOffsets); + int dsize = dimensionChunkOffsets != null ? dimensionChunkOffsets.size() : 0; + output.writeShort(dsize); + for (int i = 0; i < dsize; i++) { + output.writeLong(dimensionChunkOffsets.get(i)); + } + for (int i = 0; i < dsize; i++) { + output.writeInt(dimensionChunksLength.get(i)); + } + int mSize = measureChunkOffsets != null ? measureChunkOffsets.size() : 0; + output.writeShort(mSize); + for (int i = 0; i < mSize; i++) { + output.writeLong(measureChunkOffsets.get(i)); + } + for (int i = 0; i < mSize; i++) { + output.writeInt(measureChunksLength.get(i)); + } + } + + @Override public void readFields(DataInput input) throws IOException { + dimensionOffset = input.readLong(); + measureOffsets = input.readLong(); + short dimensionChunkOffsetsSize = input.readShort(); + dimensionChunkOffsets = new ArrayList<>(dimensionChunkOffsetsSize); + for (int i = 0; i < dimensionChunkOffsetsSize; i++) { + dimensionChunkOffsets.add(input.readLong()); + } + dimensionChunksLength = new ArrayList<>(dimensionChunkOffsetsSize); + for (int i = 0; i < dimensionChunkOffsetsSize; i++) { + dimensionChunksLength.add(input.readInt()); + } + + short measureChunkOffsetsSize = input.readShort(); + measureChunkOffsets = new ArrayList<>(measureChunkOffsetsSize); + for (int i = 0; i < measureChunkOffsetsSize; i++) { + measureChunkOffsets.add(input.readLong()); + } + measureChunksLength = new ArrayList<>(measureChunkOffsetsSize); + for (int i = 0; i < measureChunkOffsetsSize; i++) { + measureChunksLength.add(input.readInt()); + } + + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java index cd86a07..ae99ed8 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.core.metadata.index; +import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; /** @@ -45,6 +46,11 @@ public class BlockIndexInfo { private BlockletIndex blockletIndex; /** + * to store blocklet info like offsets and lengths of each column. + */ + private BlockletInfo blockletInfo; + + /** * Constructor * * @param numberOfRows number of rows @@ -61,6 +67,20 @@ public class BlockIndexInfo { } /** + * + * @param numberOfRows + * @param fileName + * @param offset + * @param blockletIndex + * @param blockletInfo + */ + public BlockIndexInfo(long numberOfRows, String fileName, long offset, + BlockletIndex blockletIndex, BlockletInfo blockletInfo) { + this(numberOfRows, fileName, offset, blockletIndex); + this.blockletInfo = blockletInfo; + } + + /** * @return the numberOfRows */ public long getNumberOfRows() { @@ -87,4 +107,11 @@ public class BlockIndexInfo { public BlockletIndex getBlockletIndex() { return blockletIndex; } + + /** + * @return BlockletInfo + */ + public BlockletInfo getBlockletInfo() { + return blockletInfo; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index ff54673..e0ee5bb 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -21,8 +21,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -41,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.AbstractIndex; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier; +import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -116,23 +119,40 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { // so block will be loaded in sorted order this will be required for // query execution Collections.sort(queryModel.getTableBlockInfos()); - // get the table blocks - CacheProvider cacheProvider = CacheProvider.getInstance(); - BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache = - (BlockIndexStore) cacheProvider - .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath()); - // remove the invalid table blocks, block which is deleted or compacted - cache.removeTableBlocks(queryModel.getInvalidSegmentIds(), - queryModel.getAbsoluteTableIdentifier()); - List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = - prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(), - queryModel.getAbsoluteTableIdentifier()); - cache.removeTableBlocksIfHorizontalCompactionDone(queryModel); - queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers); - queryStatistic - .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis()); - queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic); + if (queryModel.getTableBlockInfos().get(0).getDetailInfo() != null) { + List<AbstractIndex> indexList = new ArrayList<>(); + Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>(); + for (TableBlockInfo blockInfo: queryModel.getTableBlockInfos()) { + List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath()); + if (tableBlockInfos == null) { + tableBlockInfos = new ArrayList<>(); + listMap.put(blockInfo.getFilePath(), tableBlockInfos); + } + tableBlockInfos.add(blockInfo); + } + for (List<TableBlockInfo> tableBlockInfos: listMap.values()) { + indexList.add(new IndexWrapper(tableBlockInfos)); + } + queryProperties.dataBlocks = indexList; + } else { + // get the table blocks + CacheProvider cacheProvider = CacheProvider.getInstance(); + BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache = + (BlockIndexStore) cacheProvider + .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath()); + // remove the invalid table blocks, block which is deleted or compacted + cache.removeTableBlocks(queryModel.getInvalidSegmentIds(), + queryModel.getAbsoluteTableIdentifier()); + List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = + prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(), + queryModel.getAbsoluteTableIdentifier()); + cache.removeTableBlocksIfHorizontalCompactionDone(queryModel); + queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers); + queryStatistic + .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis()); + queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic); + } // calculating the total number of aggeragted columns int aggTypeCount = queryModel.getQueryMeasures().size(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java index 8704496..a874835 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java @@ -156,7 +156,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter { int columnIndex = dimColumnEvaluatorInfo.getColumnIndex(); int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex); - boolean isScanRequired = + boolean isScanRequired = blockIndex >= blkMaxVal.length || isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues); if (isScanRequired) { bitSet.set(0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java index 6823531..c2e077e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java @@ -287,7 +287,7 @@ public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl { BitSet bitSet = new BitSet(1); byte[][] filterValues = this.filterRangesValues; int columnIndex = this.dimColEvaluatorInfo.getColumnIndex(); - boolean isScanRequired = + boolean isScanRequired = columnIndex >= blockMinValue.length || isScanRequired(blockMinValue[columnIndex], blockMaxValue[columnIndex], filterValues); if (isScanRequired) { bitSet.set(0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java index be82be7..73352cb 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java @@ -79,7 +79,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) { BitSet bitSet = new BitSet(1); - boolean isScanRequired = + boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length || isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues); if (isScanRequired) { bitSet.set(0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java index 53da6c5..6e8e188 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java @@ -81,7 +81,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) { BitSet bitSet = new BitSet(1); - boolean isScanRequired = + boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length || isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues); if (isScanRequired) { bitSet.set(0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java index d694960..d6f7c86 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java @@ -81,7 +81,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) { BitSet bitSet = new BitSet(1); - boolean isScanRequired = + boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length || isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues); if (isScanRequired) { bitSet.set(0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java index b3dd921..597ba52 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java @@ -82,7 +82,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) { BitSet bitSet = new BitSet(1); - boolean isScanRequired = + boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length || isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues); if (isScanRequired) { bitSet.set(0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java index fdb5483..ff4f5dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java @@ -165,6 +165,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(), blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader); blocksChunkHolder.setDataBlock(dataBlockIterator.next()); + if (blocksChunkHolder.getDataBlock().getColumnsMaxValue() == null) { + return blocksChunkHolder; + } if (blockletScanner.isScanRequired(blocksChunkHolder)) { return blocksChunkHolder; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java index 92e9594..95030d3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.FileHolder; import org.apache.carbondata.core.datastore.block.AbstractIndex; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper; import org.apache.carbondata.core.mutate.DeleteDeltaVo; import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; @@ -127,20 +128,27 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato // set the deleted row to block execution info blockInfo.setDeletedRecordsMap(deletedRowsMap); } - DataRefNode startDataBlock = finder - .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey()); - while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) { - startDataBlock = startDataBlock.getNextDataRefNode(); - } - long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan(); - //if number of block is less than 0 then take end block. - if (numberOfBlockToScan <= 0) { - DataRefNode endDataBlock = finder - .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey()); - numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1; + DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode(); + if (dataRefNode instanceof BlockletDataRefNodeWrapper) { + BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) dataRefNode; + blockInfo.setFirstDataBlock(wrapper); + blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes()); + + } else { + DataRefNode startDataBlock = + finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey()); + while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) { + startDataBlock = startDataBlock.getNextDataRefNode(); + } + long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan(); + //if number of block is less than 0 then take end block. + if (numberOfBlockToScan <= 0) { + DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, blockInfo.getEndKey()); + numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1; + } + blockInfo.setFirstDataBlock(startDataBlock); + blockInfo.setNumberOfBlockToScan(numberOfBlockToScan); } - blockInfo.setFirstDataBlock(startDataBlock); - blockInfo.setNumberOfBlockToScan(numberOfBlockToScan); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java index 97b1a1f..34c7709 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java @@ -122,6 +122,57 @@ public abstract class AbstractDataFileFooterConverter { } /** + * Below method will be used to get the index info from index file + * + * @param filePath file path of the index file + * @return list of index info + * @throws IOException problem while reading the index file + */ + public List<DataFileFooter> getIndexInfo(String filePath) throws IOException { + CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); + List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>(); + String parentPath = filePath.substring(0, filePath.lastIndexOf("/")); + try { + // open the reader + indexReader.openThriftReader(filePath); + // get the index header + org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader(); + List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); + List<org.apache.carbondata.format.ColumnSchema> table_columns = + readIndexHeader.getTable_columns(); + for (int i = 0; i < table_columns.size(); i++) { + columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))); + } + // get the segment info + SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info()); + BlockletIndex blockletIndex = null; + DataFileFooter dataFileFooter = null; + // read the block info from file + while (indexReader.hasNext()) { + BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo(); + blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index()); + dataFileFooter = new DataFileFooter(); + TableBlockInfo tableBlockInfo = new TableBlockInfo(); + tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset()); + tableBlockInfo.setVersion( + ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion())); + int blockletSize = getBlockletSize(readBlockIndexInfo); + tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize); + tableBlockInfo.setFilePath(parentPath + "/" + readBlockIndexInfo.file_name); + dataFileFooter.setBlockletIndex(blockletIndex); + dataFileFooter.setColumnInTable(columnSchemaList); + dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows()); + dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo)); + dataFileFooter.setSegmentInfo(segmentInfo); + dataFileFooters.add(dataFileFooter); + } + } finally { + indexReader.closeThriftReader(); + } + return dataFileFooters; + } + + /** * the methods returns the number of blocklets in a block * * @param readBlockIndexInfo @@ -148,6 +199,8 @@ public abstract class AbstractDataFileFooterConverter { public abstract DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo) throws IOException; + public abstract List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException; + /** * Below method will be used to get blocklet index for data file meta * http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index b9c164a..f62f3d8 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -54,10 +54,13 @@ import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.ValueEncoderMeta; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.blocklet.SegmentInfo; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.TableInfo; @@ -926,10 +929,26 @@ public final class CarbonUtil { * Below method will be used to read the data file matadata */ public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) throws IOException { - AbstractDataFileFooterConverter fileFooterConverter = - DataFileFooterConverterFactory.getInstance() - .getDataFileFooterConverter(tableBlockInfo.getVersion()); - return fileFooterConverter.readDataFileFooter(tableBlockInfo); + BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo(); + if (detailInfo == null) { + AbstractDataFileFooterConverter fileFooterConverter = + DataFileFooterConverterFactory.getInstance() + .getDataFileFooterConverter(tableBlockInfo.getVersion()); + return fileFooterConverter.readDataFileFooter(tableBlockInfo); + } else { + DataFileFooter fileFooter = new DataFileFooter(); + fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp()); + ColumnarFormatVersion version = + ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber()); + AbstractDataFileFooterConverter dataFileFooterConverter = + DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version); + fileFooter.setColumnInTable(dataFileFooterConverter.getSchema(tableBlockInfo)); + SegmentInfo segmentInfo = new SegmentInfo(); + segmentInfo.setColumnCardinality(detailInfo.getDimLens()); + segmentInfo.setNumberOfColumns(detailInfo.getRowCount()); + fileFooter.setSegmentInfo(segmentInfo); + return fileFooter; + } } /** @@ -1567,24 +1586,23 @@ public final class CarbonUtil { } /** - * @param tableInfo * @param invalidBlockVOForSegmentId * @param updateStatusMngr * @return */ - public static boolean isInvalidTableBlock(TableBlockInfo tableInfo, + public static boolean isInvalidTableBlock(String segmentId, String filePath, UpdateVO invalidBlockVOForSegmentId, SegmentUpdateStatusManager updateStatusMngr) { - if (!updateStatusMngr.isBlockValid(tableInfo.getSegmentId(), - CarbonTablePath.getCarbonDataFileName(tableInfo.getFilePath()) + CarbonTablePath + if (!updateStatusMngr.isBlockValid(segmentId, + CarbonTablePath.getCarbonDataFileName(filePath) + CarbonTablePath .getCarbonDataExtension())) { return true; } if (null != invalidBlockVOForSegmentId) { - Long blockTimeStamp = Long.parseLong(tableInfo.getFilePath() - .substring(tableInfo.getFilePath().lastIndexOf('-') + 1, - tableInfo.getFilePath().lastIndexOf('.'))); + Long blockTimeStamp = Long.parseLong(filePath + .substring(filePath.lastIndexOf('-') + 1, + filePath.lastIndexOf('.'))); if ((blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp() && ( invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp() != null && blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java index 0f82b95..3ac6987 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java @@ -121,4 +121,8 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter { blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows()); return blockletInfo; } + + @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException { + return null; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java index 4882b0f..8cd437f 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java @@ -140,4 +140,7 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter { return numberOfDimensionColumns; } + @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException { + return null; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java index 143c1b1..ccb8b29 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java @@ -85,6 +85,17 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter { return dataFileFooter; } + @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException { + CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(tableBlockInfo.getFilePath()); + FileHeader fileHeader = carbonHeaderReader.readHeader(); + List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); + List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema(); + for (int i = 0; i < table_columns.size(); i++) { + columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))); + } + return columnSchemaList; + } + /** * Below method is to convert the blocklet info of the thrift to wrapper * blocklet info http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/format/src/main/thrift/carbondata_index.thrift ---------------------------------------------------------------------- diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift index c055031..4df085a 100644 --- a/format/src/main/thrift/carbondata_index.thrift +++ b/format/src/main/thrift/carbondata_index.thrift @@ -41,4 +41,5 @@ struct BlockIndex{ 2: required string file_name; // Block file name 3: required i64 offset; // Offset of the footer 4: required carbondata.BlockletIndex block_index; // Blocklet index + 5: optional carbondata.BlockletInfo3 blocklet_info; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 1673193..81226a2 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -21,7 +21,14 @@ import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.lang.reflect.Constructor; -import java.util.*; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.DataRefNode; @@ -375,8 +382,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { if (isIUDTable) { // In case IUD is not performed in this table avoid searching for // invalidated blocks. - if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId, - updateStatusManager)) { + if (CarbonUtil + .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(), + invalidBlockVOForSegmentId, updateStatusManager)) { continue; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 631bc2c..56bade7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -29,6 +29,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.BlockletInfos; import org.apache.carbondata.core.datastore.block.Distributable; import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.util.ByteUtil; @@ -77,6 +78,8 @@ public class CarbonInputSplit extends FileSplit */ private String[] deleteDeltaFiles; + private BlockletDetailInfo detailInfo; + public CarbonInputSplit() { segmentId = null; taskId = "0"; @@ -138,10 +141,12 @@ public class CarbonInputSplit extends FileSplit BlockletInfos blockletInfos = new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets()); try { - tableBlockInfoList.add( + TableBlockInfo blockInfo = new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(), split.getLocations(), split.getLength(), blockletInfos, split.getVersion(), - split.getDeleteDeltaFiles())); + split.getDeleteDeltaFiles()); + blockInfo.setDetailInfo(split.getDetailInfo()); + tableBlockInfoList.add(blockInfo); } catch (IOException e) { throw new RuntimeException("fail to get location of split: " + split, e); } @@ -153,9 +158,12 @@ public class CarbonInputSplit extends FileSplit BlockletInfos blockletInfos = new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets()); try { - return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), - inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(), - blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles()); + TableBlockInfo blockInfo = + new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), + inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(), + blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles()); + blockInfo.setDetailInfo(inputSplit.getDetailInfo()); + return blockInfo; } catch (IOException e) { throw new RuntimeException("fail to get location of split: " + inputSplit, e); } @@ -180,6 +188,11 @@ public class CarbonInputSplit extends FileSplit for (int i = 0; i < numberOfDeleteDeltaFiles; i++) { deleteDeltaFiles[i] = in.readUTF(); } + boolean detailInfoExists = in.readBoolean(); + if (detailInfoExists) { + detailInfo = new BlockletDetailInfo(); + detailInfo.readFields(in); + } } @Override public void write(DataOutput out) throws IOException { @@ -197,6 +210,10 @@ public class CarbonInputSplit extends FileSplit out.writeUTF(deleteDeltaFiles[i]); } } + out.writeBoolean(detailInfo != null); + if (detailInfo != null) { + detailInfo.write(out); + } } public List<String> getInvalidSegments() { @@ -310,4 +327,16 @@ public class CarbonInputSplit extends FileSplit public String[] getDeleteDeltaFiles() { return deleteDeltaFiles; } + + public void setDeleteDeltaFiles(String[] deleteDeltaFiles) { + this.deleteDeltaFiles = deleteDeltaFiles; + } + + public BlockletDetailInfo getDetailInfo() { + return detailInfo; + } + + public void setDetailInfo(BlockletDetailInfo detailInfo) { + this.detailInfo = detailInfo; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index ae9c676..e73c04a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -18,152 +18,556 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.DataMapStoreManager; +import org.apache.carbondata.core.indexstore.DataMapType; +import org.apache.carbondata.core.indexstore.TableDataMap; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.SegmentUpdateDetails; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.mutate.data.BlockMappingVO; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.CarbonQueryPlan; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.partition.PartitionUtil; +import org.apache.carbondata.core.scan.partition.Partitioner; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; import org.apache.carbondata.hadoop.CarbonProjection; -import org.apache.carbondata.hadoop.internal.CarbonInputSplit; -import org.apache.carbondata.hadoop.internal.segment.Segment; -import org.apache.carbondata.hadoop.internal.segment.SegmentManager; -import org.apache.carbondata.hadoop.internal.segment.SegmentManagerFactory; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.util.StringUtils; /** * Input format of CarbonData file. + * * @param <T> */ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { + // comma separated list of input segment numbers + public static final String INPUT_SEGMENT_NUMBERS = + "mapreduce.input.carboninputformat.segmentnumbers"; + // comma separated list of input files + public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; + private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class); private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate"; + private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; + private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table"; + private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; - private SegmentManager segmentManager; + /** + * It is optional, if user does not set then it reads from store + * + * @param configuration + * @param carbonTable + * @throws IOException + */ + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (null != carbonTable) { + configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable)); + } + } - public CarbonTableInputFormat() { - this.segmentManager = SegmentManagerFactory.getGlobalSegmentManager(); + public static CarbonTable getCarbonTable(Configuration configuration) throws IOException { + String carbonTableStr = configuration.get(CARBON_TABLE); + if (carbonTableStr == null) { + populateCarbonTable(configuration); + // read it from schema file in the store + carbonTableStr = configuration.get(CARBON_TABLE); + return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); + } + return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); } - @Override - public RecordReader<Void, T> createRecordReader(InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException { - switch (((CarbonInputSplit)split).formatType()) { - case COLUMNAR: - // TODO: create record reader for columnar format - break; - default: - throw new RuntimeException("Unsupported format type"); + /** + * this method will read the schema from the physical file and populate into CARBON_TABLE + * + * @param configuration + * @throws IOException + */ + private static void populateCarbonTable(Configuration configuration) throws IOException { + String dirs = configuration.get(INPUT_DIR, ""); + String[] inputPaths = StringUtils.split(dirs); + if (inputPaths.length == 0) { + throw new InvalidPathException("No input paths specified in job"); } - return null; + AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); + // read the schema file to get the absoluteTableIdentifier having the correct table id + // persisted in the schema + CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier); + setCarbonTable(configuration, carbonTable); } - @Override - public List<InputSplit> getSplits(JobContext job) throws IOException { + public static void setTablePath(Configuration configuration, String tablePath) + throws IOException { + configuration.set(FileInputFormat.INPUT_DIR, tablePath); + } - // work as following steps: - // get all current valid segment - // for each segment, get all input split + /** + * It sets unresolved filter expression. + * + * @param configuration + * @param filterExpression + */ + public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { + if (filterExpression == null) { + return; + } + try { + String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); + configuration.set(FILTER_PREDICATE, filterString); + } catch (Exception e) { + throw new RuntimeException("Error while setting filter expression to Job", e); + } + } - List<InputSplit> output = new LinkedList<>(); - Expression filter = getFilter(job.getConfiguration()); - Segment[] segments = segmentManager.getAllValidSegments(); - FilterResolverIntf filterResolver = CarbonInputFormatUtil.resolveFilter(filter, null); - for (Segment segment: segments) { - List<InputSplit> splits = segment.getSplits(job, filterResolver); - output.addAll(splits); + public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { + if (projection == null || projection.isEmpty()) { + return; + } + String[] allColumns = projection.getAllColumns(); + StringBuilder builder = new StringBuilder(); + for (String column : allColumns) { + builder.append(column).append(","); } - return output; + String columnString = builder.toString(); + columnString = columnString.substring(0, columnString.length() - 1); + configuration.set(COLUMN_PROJECTION, columnString); } - /** - * set the table path into configuration - * @param conf configuration of the job - * @param tablePath table path string - */ - public void setTablePath(Configuration conf, String tablePath) { + public static String getColumnProjection(Configuration configuration) { + return configuration.get(COLUMN_PROJECTION); + } + + public static void setCarbonReadSupport(Configuration configuration, + Class<? extends CarbonReadSupport> readSupportClass) { + if (readSupportClass != null) { + configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName()); + } + } + private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) { + return CarbonStorePath.getCarbonTablePath(absIdentifier); } /** - * return the table path in the configuration - * @param conf configuration of the job - * @return table path string + * Set list of segments to access */ - public String getTablePath(Configuration conf) { - return null; + public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) { + configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments)); } /** - * set projection columns into configuration - * @param conf configuration of the job - * @param projection projection + * Set list of files to access */ - public void setProjection(Configuration conf, CarbonProjection projection) { + public static void setFilesToAccess(Configuration configuration, List<String> validFiles) { + configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles)); + } + private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + throws IOException { + return getCarbonTable(configuration).getAbsoluteTableIdentifier(); } /** - * return the projection in the configuration - * @param conf configuration of the job - * @return projection + * {@inheritDoc} + * Configurations FileInputFormat.INPUT_DIR + * are used to get table path to read. + * + * @param job + * @return List<InputSplit> list of CarbonInputSplit + * @throws IOException */ - public CarbonProjection getProjection(Configuration conf) { - return null; + @Override public List<InputSplit> getSplits(JobContext job) throws IOException { + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + TableDataMap blockletMap = + DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET); + List<String> invalidSegments = new ArrayList<>(); + List<UpdateVO> invalidTimestampsList = new ArrayList<>(); + List<String> validSegments = Arrays.asList(getSegmentsToAccess(job)); + // get all valid segments and set them into the configuration + if (validSegments.size() == 0) { + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); + SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = + segmentStatusManager.getValidAndInvalidSegments(); + SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); + validSegments = segments.getValidSegments(); + if (validSegments.size() == 0) { + return new ArrayList<>(0); + } + + // remove entry in the segment index if there are invalid segments + invalidSegments.addAll(segments.getInvalidSegments()); + for (String invalidSegmentId : invalidSegments) { + invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId)); + } + if (invalidSegments.size() > 0) { + List<TableSegmentUniqueIdentifier> invalidSegmentsIds = + new ArrayList<>(invalidSegments.size()); + for (String segId : invalidSegments) { + invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId)); + } + blockletMap.clear(invalidSegments); + } + } + + // process and resolve the expression + Expression filter = getFilterPredicates(job.getConfiguration()); + CarbonTable carbonTable = getCarbonTable(job.getConfiguration()); + // this will be null in case of corrupt schema file. + if (null == carbonTable) { + throw new IOException("Missing/Corrupt schema file for table."); + } + + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + + // prune partitions for filter query on partition table + BitSet matchedPartitions = null; + if (null != filter) { + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); + if (null != partitionInfo) { + Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo); + matchedPartitions = new FilterExpressionProcessor() + .getFilteredPartitions(filter, partitionInfo, partitioner); + if (matchedPartitions.cardinality() == 0) { + // no partition is required + return new ArrayList<InputSplit>(); + } + if (matchedPartitions.cardinality() == partitioner.numPartitions()) { + // all partitions are required, no need to prune partitions + matchedPartitions = null; + } + } + } + + FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); + + // do block filtering and get split + List<InputSplit> splits = getSplits(job, filterInterface, validSegments, matchedPartitions); + // pass the invalid segment to task side in order to remove index entry in task side + if (invalidSegments.size() > 0) { + for (InputSplit split : splits) { + ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments); + ((org.apache.carbondata.hadoop.CarbonInputSplit) split) + .setInvalidTimestampRange(invalidTimestampsList); + } + } + return splits; } /** - * set filter expression into the configuration - * @param conf configuration of the job - * @param filter filter expression + * {@inheritDoc} + * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS + * are used to get table path to read. + * + * @return + * @throws IOException */ - public void setFilter(Configuration conf, Expression filter) { + private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver, + List<String> validSegments, BitSet matchedPartitions) throws IOException { + + List<InputSplit> result = new LinkedList<InputSplit>(); + UpdateVO invalidBlockVOForSegmentId = null; + Boolean isIUDTable = false; + + AbsoluteTableIdentifier absoluteTableIdentifier = + getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); + SegmentUpdateStatusManager updateStatusManager = + new SegmentUpdateStatusManager(absoluteTableIdentifier); + + isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); + + //for each segment fetch blocks matching filter in Driver BTree + List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment = + getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions, + validSegments); + for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) { + + // Get the UpdateVO for those tables on which IUD operations being performed. + if (isIUDTable) { + invalidBlockVOForSegmentId = + updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId()); + } + if (isIUDTable) { + // In case IUD is not performed in this table avoid searching for + // invalidated blocks. + if (CarbonUtil + .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(), + invalidBlockVOForSegmentId, updateStatusManager)) { + continue; + } + } + String[] deleteDeltaFilePath = null; + try { + deleteDeltaFilePath = + updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString()); + } catch (Exception e) { + throw new IOException(e); + } + inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath); + result.add(inputSplit); + } + return result; + } + + protected Expression getFilterPredicates(Configuration configuration) { try { - String filterString = ObjectSerializationUtil.convertObjectToString(filter); - conf.set(FILTER_PREDICATE, filterString); - } catch (Exception e) { - throw new RuntimeException("Error while setting filter expression to Job", e); + String filterExprString = configuration.get(FILTER_PREDICATE); + if (filterExprString == null) { + return null; + } + Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString); + return (Expression) filter; + } catch (IOException e) { + throw new RuntimeException("Error while reading filter expression", e); } } /** - * return filter expression in the configuration - * @param conf configuration of the job - * @return filter expression + * get data blocks of given segment */ - public Expression getFilter(Configuration conf) { - Object filter; - String filterExprString = conf.get(FILTER_PREDICATE); - if (filterExprString == null) { - return null; + private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job, + AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, + BitSet matchedPartitions, List<String> segmentIds) throws IOException { + + QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); + QueryStatistic statistic = new QueryStatistic(); + + // get tokens for all the required FileSystem for table path + TokenCache.obtainTokensForNamenodes(job.getCredentials(), + new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); + + TableDataMap blockletMap = DataMapStoreManager.getInstance() + .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET); + List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver); + + List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); + for (Blocklet blocklet : prunedBlocklets) { + int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo( + CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString())); + + // matchedPartitions variable will be null in two cases as follows + // 1. the table is not a partition table + // 2. the table is a partition table, and all partitions are matched by query + // for partition table, the task id of carbaondata file name is the partition id. + // if this partition is not required, here will skip it. + if (matchedPartitions == null || matchedPartitions.get(taskId)) { + resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet)); + } } + statistic + .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); + recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); + return resultFilterredBlocks; + } + + private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet blocklet) + throws IOException { + blocklet.updateLocations(); + org.apache.carbondata.hadoop.CarbonInputSplit split = + org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(), + new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()), + ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber())); + split.setDetailInfo(blocklet.getDetailInfo()); + return split; + } + + @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + Configuration configuration = taskAttemptContext.getConfiguration(); + QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext); + CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); + return new CarbonRecordReader<T>(queryModel, readSupport); + } + + public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException { + Configuration configuration = taskAttemptContext.getConfiguration(); + CarbonTable carbonTable = getCarbonTable(configuration); + // getting the table absoluteTableIdentifier from the carbonTable + // to avoid unnecessary deserialization + AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); + + // query plan includes projection column + String projection = getColumnProjection(configuration); + CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection); + QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable); + + // set the filter to the query model in order to filter blocklet before scan + Expression filter = getFilterPredicates(configuration); + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier); + queryModel.setFilterExpressionResolverTree(filterIntf); + + // update the file level index store if there are invalid segment + if (inputSplit instanceof CarbonMultiBlockSplit) { + CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; + List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); + if (invalidSegments.size() > 0) { + queryModel.setInvalidSegmentIds(invalidSegments); + } + List<UpdateVO> invalidTimestampRangeList = + split.getAllSplits().get(0).getInvalidTimestampRange(); + if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) { + queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList); + } + } + return queryModel; + } + + public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) { + String readSupportClass = configuration.get(CARBON_READ_SUPPORT); + //By default it uses dictionary decoder read class + CarbonReadSupport<T> readSupport = null; + if (readSupportClass != null) { + try { + Class<?> myClass = Class.forName(readSupportClass); + Constructor<?> constructor = myClass.getConstructors()[0]; + Object object = constructor.newInstance(); + if (object instanceof CarbonReadSupport) { + readSupport = (CarbonReadSupport) object; + } + } catch (ClassNotFoundException ex) { + LOG.error("Class " + readSupportClass + "not found", ex); + } catch (Exception ex) { + LOG.error("Error while creating " + readSupportClass, ex); + } + } else { + readSupport = new DictionaryDecodeReadSupport<>(); + } + return readSupport; + } + + @Override protected boolean isSplitable(JobContext context, Path filename) { try { - filter = ObjectSerializationUtil.convertStringToObject(filterExprString); - } catch (IOException e) { - throw new RuntimeException("Error while reading filter expression", e); + // Don't split the file if it is local file system + FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); + if (fileSystem instanceof LocalFileSystem) { + return false; + } + } catch (Exception e) { + return true; + } + return true; + } + + /** + * required to be moved to core + * + * @return updateExtension + */ + private String getUpdateExtension() { + // TODO: required to modify when supporting update, mostly will be update timestamp + return "update"; + } + + /** + * return valid segment to access + */ + private String[] getSegmentsToAccess(JobContext job) { + String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); + if (segmentString.trim().isEmpty()) { + return new String[0]; } - assert (filter instanceof Expression); - return (Expression) filter; + return segmentString.split(","); } /** - * Optional API. It can be used by query optimizer to select index based on filter - * in the configuration of the job. After selecting index internally, index' name will be set - * in the configuration. + * Get the row count of the Block and mapping of segment and Block count. * - * The process of selection is simple, just use the default index. Subclass can provide a more - * advanced selection logic like cost based. - * @param conf job configuration + * @param job + * @param identifier + * @return + * @throws IOException + * @throws KeyGenException */ - public void selectIndex(Configuration conf) { - // set the default index in configuration + public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier) + throws IOException, KeyGenException { + TableDataMap blockletMap = + DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET); + SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); + SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = + new SegmentStatusManager(identifier).getValidAndInvalidSegments(); + Map<String, Long> blockRowCountMapping = + new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + Map<String, Long> segmentAndBlockCountMapping = + new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(), null); + for (Blocklet blocklet : blocklets) { + String blockName = blocklet.getPath().toString(); + blockName = CarbonTablePath.getCarbonDataFileName(blockName); + blockName = blockName + CarbonTablePath.getCarbonDataExtension(); + + long rowCount = blocklet.getDetailInfo().getRowCount(); + + String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName); + + // if block is invalid then dont add the count + SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key); + + if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) { + Long blockCount = blockRowCountMapping.get(key); + if (blockCount == null) { + blockCount = 0L; + Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId()); + if (count == null) { + count = 0L; + } + segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1); + } + blockCount += rowCount; + blockRowCountMapping.put(key, blockCount); + } + } + return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 8270304..8269757 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -34,7 +34,7 @@ import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryDimension; import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.hadoop.CarbonInputFormat; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -77,9 +77,10 @@ public class CarbonInputFormatUtil { return plan; } - public static <V> CarbonInputFormat<V> createCarbonInputFormat(AbsoluteTableIdentifier identifier, + public static <V> CarbonTableInputFormat<V> createCarbonInputFormat( + AbsoluteTableIdentifier identifier, Job job) throws IOException { - CarbonInputFormat<V> carbonInputFormat = new CarbonInputFormat<>(); + CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>(); FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); return carbonInputFormat; }
