Repository: carbondata Updated Branches: refs/heads/master cac650f01 -> 9c83bd18e
[CARBONDATA-1418] Use CarbonTableInputFormat for creating the Splits and QueryModel Refactored Code to use CarbonTableInputFormat for creating splits This closes #1294 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9c83bd18 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9c83bd18 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9c83bd18 Branch: refs/heads/master Commit: 9c83bd18e82de9734666cab297de7b21b5f517ac Parents: cac650f Author: Bhavya <[email protected]> Authored: Mon Aug 28 23:21:58 2017 +0530 Committer: chenliang613 <[email protected]> Committed: Tue Aug 29 10:47:11 2017 +0800 ---------------------------------------------------------------------- .../carbondata/presto/CarbondataRecordSet.java | 16 +- .../presto/CarbondataRecordSetProvider.java | 265 ++------ .../carbondata/presto/PrestoFilterUtil.java | 239 +++++++ .../presto/impl/CarbonLocalInputSplit.java | 47 +- .../presto/impl/CarbonTableReader.java | 676 +++---------------- 5 files changed, 435 insertions(+), 808 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java index 435b008..4294403 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java @@ -33,6 +33,8 @@ import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.BatchResult; import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.presto.impl.CarbonLocalInputSplit; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; @@ -77,18 +79,12 @@ public class CarbondataRecordSet implements RecordSet { * get data blocks via Carbondata QueryModel API */ @Override public RecordCursor cursor() { - List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>(); - - tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(), - split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(), - split.getLocalInputSplit().getLocations().toArray(new String[0]), - split.getLocalInputSplit().getLength(), new BlockletInfos(), - //blockletInfos, - ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()), null)); + CarbonLocalInputSplit carbonLocalInputSplit = split.getLocalInputSplit(); + List<CarbonInputSplit> splitList = new ArrayList<>(1); + splitList.add(CarbonLocalInputSplit.convertSplit(carbonLocalInputSplit)); + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); queryModel.setTableBlockInfos(tableBlockInfoList); - queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); - try { Tuple3[] dict = readSupport http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java index 8aacf88..0c7b77f 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java @@ -17,45 +17,40 @@ package org.apache.carbondata.presto; -import org.apache.carbondata.core.util.DataTypeConverterImpl; +import javax.inject.Inject; +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.service.impl.PathFactory; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.presto.impl.CarbonLocalInputSplit; import org.apache.carbondata.presto.impl.CarbonTableCacheModel; import org.apache.carbondata.presto.impl.CarbonTableReader; + import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.RecordSet; import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; -import com.facebook.presto.spi.predicate.Domain; -import com.facebook.presto.spi.predicate.Range; -import com.facebook.presto.spi.predicate.TupleDomain; -import com.facebook.presto.spi.type.*; import com.google.common.collect.ImmutableList; -import io.airlift.slice.Slice; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.expression.ColumnExpression; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.expression.LiteralExpression; -import org.apache.carbondata.core.scan.expression.conditional.*; -import org.apache.carbondata.core.scan.expression.logical.AndExpression; -import org.apache.carbondata.core.scan.expression.logical.OrExpression; -import org.apache.carbondata.core.scan.filter.SingleTableProvider; -import org.apache.carbondata.core.scan.filter.TableProvider; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskType; -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.carbondata.presto.Types.checkType; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +import static org.apache.carbondata.presto.Types.checkType; public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { @@ -70,31 +65,19 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) { - requireNonNull(split, "split is null"); - requireNonNull(columns, "columns is null"); CarbondataSplit carbondataSplit = checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit"); - checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector"); + checkArgument(carbondataSplit.getConnectorId().equals(connectorId), + "split is not for this connector"); - StringBuffer targetColsBuffer = new StringBuffer(); - String targetCols = ""; + CarbonProjection carbonProjection = new CarbonProjection(); // Convert all columns handles ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder(); for (ColumnHandle handle : columns) { handles.add(checkType(handle, CarbondataColumnHandle.class, "handle")); - targetColsBuffer.append(((CarbondataColumnHandle) handle).getColumnName()).append(","); - } - - // Build column projection(check the column order) - if (targetColsBuffer.length() > 0) { - targetCols = targetColsBuffer.substring(0, targetCols.length() - 1); - } - else - { - targetCols = null; + carbonProjection.addColumn(((CarbondataColumnHandle) handle).getColumnName()); } - //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList())); CarbonTableCacheModel tableCacheModel = carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName()); @@ -104,173 +87,47 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { // Build Query Model CarbonTable targetTable = tableCacheModel.carbonTable; - CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols); - QueryModel queryModel = - QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), - queryPlan, targetTable, new DataTypeConverterImpl()); - // Push down filter - fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable); - - // Return new record set - return new CarbondataRecordSet(targetTable, session, carbondataSplit, - handles.build(), queryModel); - } - - // Build filter for QueryModel - private void fillFilter2QueryModel(QueryModel queryModel, - TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) { - - //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf()); - - //Build Predicate Expression - ImmutableList.Builder<Expression> filters = ImmutableList.builder(); - - Domain domain = null; - - for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) { - - // Build ColumnExpresstion for Expresstion(Carbondata) - CarbondataColumnHandle cdch = (CarbondataColumnHandle) c; - Type type = cdch.getColumnType(); - - DataType coltype = spi2CarbondataTypeMapper(cdch); - Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype); - - domain = originalConstraint.getDomains().get().get(c); - checkArgument(domain.getType().isOrderable(), "Domain type must be orderable"); - - if (domain.getValues().isNone()) { - } - - if (domain.getValues().isAll()) { - } - - List<Object> singleValues = new ArrayList<>(); - List<Expression> disjuncts = new ArrayList<>(); - for (Range range : domain.getValues().getRanges().getOrderedRanges()) { - if (range.isSingleValue()) { - singleValues.add(range.getLow().getValue()); - } else { - List<Expression> rangeConjuncts = new ArrayList<>(); - if (!range.getLow().isLowerUnbounded()) { - Object value = convertDataByType(range.getLow().getValue(), type); - switch (range.getLow().getBound()) { - case ABOVE: - if (type == TimestampType.TIMESTAMP) { - //todo not now - } else { - GreaterThanExpression greater = new GreaterThanExpression(colExpression, - new LiteralExpression(value, coltype)); - rangeConjuncts.add(greater); - } - break; - case EXACTLY: - GreaterThanEqualToExpression greater = - new GreaterThanEqualToExpression(colExpression, - new LiteralExpression(value, coltype)); - rangeConjuncts.add(greater); - break; - case BELOW: - throw new IllegalArgumentException("Low marker should never use BELOW bound"); - default: - throw new AssertionError("Unhandled bound: " + range.getLow().getBound()); - } - } - if (!range.getHigh().isUpperUnbounded()) { - Object value = convertDataByType(range.getHigh().getValue(), type); - switch (range.getHigh().getBound()) { - case ABOVE: - throw new IllegalArgumentException("High marker should never use ABOVE bound"); - case EXACTLY: - LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, - new LiteralExpression(value, coltype)); - rangeConjuncts.add(less); - break; - case BELOW: - LessThanExpression less2 = - new LessThanExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(less2); - break; - default: - throw new AssertionError("Unhandled bound: " + range.getHigh().getBound()); - } - } - disjuncts.addAll(rangeConjuncts); - } - } - if (singleValues.size() == 1) { - Expression ex = null; - if (coltype.equals(DataType.STRING)) { - ex = new EqualToExpression(colExpression, - new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype)); - } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) { - Long value = (Long) singleValues.get(0) * 1000; - ex = new EqualToExpression(colExpression, - new LiteralExpression(value , coltype)); - } else ex = new EqualToExpression(colExpression, - new LiteralExpression(singleValues.get(0), coltype)); - filters.add(ex); - } else if (singleValues.size() > 1) { - ListExpression candidates = null; - List<Expression> exs = singleValues.stream().map((a) -> { - return new LiteralExpression(convertDataByType(a, type), coltype); - }).collect(Collectors.toList()); - candidates = new ListExpression(exs); - - if (candidates != null) filters.add(new InExpression(colExpression, candidates)); - } else if (disjuncts.size() > 0) { - if (disjuncts.size() > 1) { - Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1)); - if (disjuncts.size() > 2) { - for (int i = 2; i < disjuncts.size(); i++) { - filters.add(new AndExpression(finalFilters, disjuncts.get(i))); - } - } - } else if (disjuncts.size() == 1) filters.add(disjuncts.get(0)); - } + QueryModel queryModel = null; + try { + Configuration conf = new Configuration(); + conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, ""); + String carbonTablePath = PathFactory.getInstance() + .getCarbonTablePath(targetTable.getAbsoluteTableIdentifier().getStorePath(), + targetTable.getCarbonTableIdentifier(), null).getPath(); + + conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath); + JobConf jobConf = new JobConf(conf); + CarbonTableInputFormat carbonTableInputFormat = + createInputFormat(jobConf, tableCacheModel.carbonTable, + PrestoFilterUtil.getFilters(targetTable.getFactTableName().hashCode()), + carbonProjection); + TaskAttemptContextImpl hadoopAttemptContext = + new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); + CarbonInputSplit carbonInputSplit = + CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit()); + queryModel = carbonTableInputFormat.getQueryModel(carbonInputSplit, hadoopAttemptContext); + } catch (IOException e) { + throw new RuntimeException("Unable to get the Query Model ", e); } - - Expression finalFilters; - List<Expression> tmp = filters.build(); - if (tmp.size() > 1) { - finalFilters = new OrExpression(tmp.get(0), tmp.get(1)); - if (tmp.size() > 2) { - for (int i = 2; i < tmp.size(); i++) { - finalFilters = new OrExpression(finalFilters, tmp.get(i)); - } - } - } else if (tmp.size() == 1) finalFilters = tmp.get(0); - else return; - - TableProvider tableProvider = new SingleTableProvider(carbonTable); - // todo set into QueryModel - CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable); - queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil - .resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier(), tableProvider)); + return new CarbondataRecordSet(targetTable, session, carbondataSplit, handles.build(), + queryModel); } - public static DataType spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) { - Type colType = carbondataColumnHandle.getColumnType(); - if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN; - else if (colType == SmallintType.SMALLINT) return DataType.SHORT; - else if (colType == IntegerType.INTEGER) return DataType.INT; - else if (colType == BigintType.BIGINT) return DataType.LONG; - else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE; - else if (colType == VarcharType.VARCHAR) return DataType.STRING; - else if (colType == DateType.DATE) return DataType.DATE; - else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP; - else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), - carbondataColumnHandle.getScale())) return DataType.DECIMAL; - else return DataType.STRING; - } + private CarbonTableInputFormat<Object> createInputFormat(Configuration conf, + CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) { - public Object convertDataByType(Object rawdata, Type type) { - if (type.equals(IntegerType.INTEGER)) return Integer.valueOf(rawdata.toString()); - else if (type.equals(BigintType.BIGINT)) return (Long) rawdata; - else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8(); - else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata); + AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + try { + CarbonTableInputFormat + .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath())); + } catch (IOException e) { + throw new RuntimeException("Unable to create the CarbonTableInputFormat", e); + } + CarbonTableInputFormat.setFilterPredicates(conf, filterExpression); + CarbonTableInputFormat.setColumnProjection(conf, projection); - return rawdata; + return format; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java new file mode 100644 index 0000000..f665d9d --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.scan.expression.ColumnExpression; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression; +import org.apache.carbondata.core.scan.expression.conditional.InExpression; +import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression; +import org.apache.carbondata.core.scan.expression.conditional.ListExpression; +import org.apache.carbondata.core.scan.expression.logical.AndExpression; +import org.apache.carbondata.core.scan.expression.logical.OrExpression; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.predicate.Domain; +import com.facebook.presto.spi.predicate.Range; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.facebook.presto.spi.type.BigintType; +import com.facebook.presto.spi.type.BooleanType; +import com.facebook.presto.spi.type.DateType; +import com.facebook.presto.spi.type.DecimalType; +import com.facebook.presto.spi.type.DoubleType; +import com.facebook.presto.spi.type.IntegerType; +import com.facebook.presto.spi.type.SmallintType; +import com.facebook.presto.spi.type.TimestampType; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.VarcharType; +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * PrestoFilterUtil create the carbonData Expression from the presto-domain + */ +public class PrestoFilterUtil { + + private static Map<Integer, Expression> filterMap = new HashMap<>(); + + private static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) { + Type colType = carbondataColumnHandle.getColumnType(); + if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN; + else if (colType == SmallintType.SMALLINT) return DataType.SHORT; + else if (colType == IntegerType.INTEGER) return DataType.INT; + else if (colType == BigintType.BIGINT) return DataType.LONG; + else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE; + else if (colType == VarcharType.VARCHAR) return DataType.STRING; + else if (colType == DateType.DATE) return DataType.DATE; + else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP; + else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), + carbondataColumnHandle.getScale())) return DataType.DECIMAL; + else return DataType.STRING; + } + + /** + * Convert presto-TupleDomain predication into Carbon scan express condition + * + * @param originalConstraint presto-TupleDomain + * @return + */ + static Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint) { + ImmutableList.Builder<Expression> filters = ImmutableList.builder(); + + Domain domain; + + for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) { + + // Build ColumnExpression for Expression(Carbondata) + CarbondataColumnHandle cdch = (CarbondataColumnHandle) c; + Type type = cdch.getColumnType(); + + DataType coltype = Spi2CarbondataTypeMapper(cdch); + Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype); + + domain = originalConstraint.getDomains().get().get(c); + checkArgument(domain.getType().isOrderable(), "Domain type must be orderable"); + + List<Object> singleValues = new ArrayList<>(); + List<Expression> disjuncts = new ArrayList<>(); + for (Range range : domain.getValues().getRanges().getOrderedRanges()) { + if (range.isSingleValue()) { + Object value = ConvertDataByType(range.getLow().getValue(), type); + singleValues.add(value); + } else { + List<Expression> rangeConjuncts = new ArrayList<>(); + if (!range.getLow().isLowerUnbounded()) { + Object value = ConvertDataByType(range.getLow().getValue(), type); + switch (range.getLow().getBound()) { + case ABOVE: + if (type == TimestampType.TIMESTAMP) { + //todo not now + } else { + GreaterThanExpression greater = new GreaterThanExpression(colExpression, + new LiteralExpression(value, coltype)); + rangeConjuncts.add(greater); + } + break; + case EXACTLY: + GreaterThanEqualToExpression greater = + new GreaterThanEqualToExpression(colExpression, + new LiteralExpression(value, coltype)); + rangeConjuncts.add(greater); + break; + case BELOW: + throw new IllegalArgumentException("Low marker should never use BELOW bound"); + default: + throw new AssertionError("Unhandled bound: " + range.getLow().getBound()); + } + } + if (!range.getHigh().isUpperUnbounded()) { + Object value = ConvertDataByType(range.getHigh().getValue(), type); + switch (range.getHigh().getBound()) { + case ABOVE: + throw new IllegalArgumentException("High marker should never use ABOVE bound"); + case EXACTLY: + LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, + new LiteralExpression(value, coltype)); + rangeConjuncts.add(less); + break; + case BELOW: + LessThanExpression less2 = + new LessThanExpression(colExpression, new LiteralExpression(value, coltype)); + rangeConjuncts.add(less2); + break; + default: + throw new AssertionError("Unhandled bound: " + range.getHigh().getBound()); + } + } + disjuncts.addAll(rangeConjuncts); + } + } + if (singleValues.size() == 1) { + Expression ex; + if (coltype.equals(DataType.STRING)) { + ex = new EqualToExpression(colExpression, + new LiteralExpression(singleValues.get(0), coltype)); + } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) { + Long value = (Long) singleValues.get(0); + ex = new EqualToExpression(colExpression, new LiteralExpression(value, coltype)); + } else ex = new EqualToExpression(colExpression, + new LiteralExpression(singleValues.get(0), coltype)); + filters.add(ex); + } else if (singleValues.size() > 1) { + ListExpression candidates = null; + List<Expression> exs = singleValues.stream() + .map((a) -> new LiteralExpression(ConvertDataByType(a, type), coltype)) + .collect(Collectors.toList()); + candidates = new ListExpression(exs); + + filters.add(new InExpression(colExpression, candidates)); + } else if (disjuncts.size() > 0) { + if (disjuncts.size() > 1) { + Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1)); + if (disjuncts.size() > 2) { + for (int i = 2; i < disjuncts.size(); i++) { + filters.add(new AndExpression(finalFilters, disjuncts.get(i))); + } + } else { + filters.add(finalFilters); + } + } else if (disjuncts.size() == 1) filters.add(disjuncts.get(0)); + } + } + + Expression finalFilters; + List<Expression> tmp = filters.build(); + if (tmp.size() > 1) { + finalFilters = new AndExpression(tmp.get(0), tmp.get(1)); + if (tmp.size() > 2) { + for (int i = 2; i < tmp.size(); i++) { + finalFilters = new OrExpression(finalFilters, tmp.get(i)); + } + } + } else if (tmp.size() == 1) finalFilters = tmp.get(0); + else return null; + return finalFilters; + } + + private static Object ConvertDataByType(Object rawdata, Type type) { + if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString())); + else if (type.equals(BigintType.BIGINT)) return rawdata; + else if (type.equals(VarcharType.VARCHAR)) { + if (rawdata instanceof Slice) { + return ((Slice) rawdata).toStringUtf8(); + } else { + return rawdata; + } + + } else if (type.equals(BooleanType.BOOLEAN)) return rawdata; + else if (type.equals(DateType.DATE)) { + Calendar c = Calendar.getInstance(); + c.setTime(new Date(0)); + c.add(Calendar.DAY_OF_YEAR, ((Long) rawdata).intValue()); + Date date = c.getTime(); + return date.getTime() * 1000; + } + + return rawdata; + } + + /** + * get the filters from key + */ + static Expression getFilters(Integer key) { + return filterMap.get(key); + } + + static void setFilter(Integer tableId, Expression filter) { + filterMap.put(tableId, filter); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java index f0a8428..e209b97 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java @@ -19,9 +19,15 @@ package org.apache.carbondata.presto.impl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; import java.util.List; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.hadoop.CarbonInputSplit; + /** * CarbonLocalInputSplit represents a block, it contains a set of blocklet. */ @@ -34,6 +40,10 @@ public class CarbonLocalInputSplit { private long length; // the length of the block. private List<String> locations;// locations are the locations for different replicas. private short version; + private String[] deleteDeltaFiles; + + + private String detailInfo; /** * Number of BlockLets in a block @@ -68,12 +78,29 @@ public class CarbonLocalInputSplit { return numberOfBlocklets; } + @JsonProperty public String[] getDeleteDeltaFiles() { + return deleteDeltaFiles; + } + + @JsonProperty public String getDetailInfo() { + return detailInfo; + } + + public void setDetailInfo(BlockletDetailInfo blockletDetailInfo) { + Gson gson = new Gson(); + detailInfo = gson.toJson(blockletDetailInfo); + + } + @JsonCreator public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId, @JsonProperty("path") String path, @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("locations") List<String> locations, @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*, @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/, - @JsonProperty("version") short version) { + @JsonProperty("version") short version, + @JsonProperty("deleteDeltaFiles") String[] deleteDeltaFiles, + @JsonProperty("detailInfo") String detailInfo + ) { this.path = path; this.start = start; this.length = length; @@ -82,5 +109,23 @@ public class CarbonLocalInputSplit { this.numberOfBlocklets = numberOfBlocklets; //this.tableBlockInfo = tableBlockInfo; this.version = version; + this.deleteDeltaFiles = deleteDeltaFiles; + this.detailInfo = detailInfo; + } + + public static CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit) { + CarbonInputSplit inputSplit = new CarbonInputSplit(carbonLocalInputSplit.getSegmentId(), + new Path(carbonLocalInputSplit.getPath()), carbonLocalInputSplit.getStart(), + carbonLocalInputSplit.getLength(), carbonLocalInputSplit.getLocations() + .toArray(new String[carbonLocalInputSplit.getLocations().size()]), + carbonLocalInputSplit.getNumberOfBlocklets(), ColumnarFormatVersion.valueOf(carbonLocalInputSplit.getVersion()), + carbonLocalInputSplit.getDeleteDeltaFiles()); + Gson gson = new Gson(); + BlockletDetailInfo blockletDetailInfo = gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class); + inputSplit.setDetailInfo(blockletDetailInfo); + return inputSplit; + } + + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index d78f786..5c00026 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -20,105 +20,76 @@ package org.apache.carbondata.presto.impl; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.DataRefNode; -import org.apache.carbondata.core.datastore.DataRefNodeFinder; -import org.apache.carbondata.core.datastore.IndexKey; -import org.apache.carbondata.core.datastore.SegmentTaskIndexStore; -import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier; -import org.apache.carbondata.core.datastore.block.AbstractIndex; -import org.apache.carbondata.core.datastore.block.BlockletInfos; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder; -import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode; -import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.reader.ThriftReader; import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; -import org.apache.carbondata.core.scan.filter.FilterUtil; -import org.apache.carbondata.core.scan.filter.SingleTableProvider; -import org.apache.carbondata.core.scan.filter.TableProvider; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.service.impl.PathFactory; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; -import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.hadoop.CacheClient; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import com.facebook.presto.hadoop.$internal.com.google.gson.Gson; import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.thrift.TBase; import static java.util.Objects.requireNonNull; -import com.facebook.presto.spi.TableNotFoundException; -/** CarbonTableReader will be a facade of these utils - * +/** + * CarbonTableReader will be a facade of these utils * 1:CarbonMetadata,(logic table) * 2:FileFactory, (physic table file) * 3:CarbonCommonFactory, (offer some ) * 4:DictionaryFactory, (parse dictionary util) - * * Currently, it is mainly used to parse metadata of tables under * the configured carbondata-store path and filter the relevant * input splits with given query predicates. */ public class CarbonTableReader { + // default PathFilter, accepts files in carbondata format (with .carbondata extension). + private static final PathFilter DefaultFilter = new PathFilter() { + @Override public boolean accept(Path path) { + return CarbonTablePath.isCarbonDataFile(path.getName()); + } + }; private CarbonTableConfig config; - /** * The names of the tables under the schema (this.carbonFileList). */ private List<SchemaTableName> tableList; - /** * carbonFileList represents the store path of the schema, which is configured as carbondata-store * in the CarbonData catalog file ($PRESTO_HOME$/etc/catalog/carbondata.properties). */ private CarbonFile carbonFileList; private FileFactory.FileType fileType; - /** * A cache for Carbon reader, with this cache, * metadata of a table is only read from file system once. @@ -132,11 +103,12 @@ public class CarbonTableReader { /** * For presto worker node to initialize the metadata cache of a table. + * * @param table the name of the table and schema. * @return */ public CarbonTableCacheModel getCarbonCache(SchemaTableName table) { - if (!cc.containsKey(table)) { + if (!cc.containsKey(table) || cc.get(table) == null) { // if this table is not cached, try to read the metadata of the table and cache it. try (ThreadContextClassLoader ignored = new ThreadContextClassLoader( FileFactory.class.getClassLoader())) { @@ -153,31 +125,29 @@ public class CarbonTableReader { parseCarbonMetadata(table); } - if (cc.containsKey(table)) return cc.get(table); - else return null; + if (cc.containsKey(table)) { + return cc.get(table); + } else { + return null; + } } /** * Return the schema names under a schema store path (this.carbonFileList). + * * @return */ public List<String> getSchemaNames() { return updateSchemaList(); } - // default PathFilter, accepts files in carbondata format (with .carbondata extension). - private static final PathFilter DefaultFilter = new PathFilter() { - @Override public boolean accept(Path path) { - return CarbonTablePath.isCarbonDataFile(path.getName()); - } - }; - /** * Get the CarbonFile instance which represents the store path in the configuration, and assign it to * this.carbonFileList. + * * @return */ - public boolean updateCarbonFile() { + private boolean updateCarbonFile() { if (carbonFileList == null) { fileType = FileFactory.getFileType(config.getStorePath()); try { @@ -191,20 +161,20 @@ public class CarbonTableReader { /** * Return the schema names under a schema store path (this.carbonFileList). + * * @return */ - public List<String> updateSchemaList() { + private List<String> updateSchemaList() { updateCarbonFile(); if (carbonFileList != null) { - List<String> schemaList = - Stream.of(carbonFileList.listFiles()).map(a -> a.getName()).collect(Collectors.toList()); - return schemaList; + return Stream.of(carbonFileList.listFiles()).map(CarbonFile::getName).collect(Collectors.toList()); } else return ImmutableList.of(); } /** * Get the names of the tables in the given schema. + * * @param schema name of the schema * @return */ @@ -215,20 +185,23 @@ public class CarbonTableReader { /** * Get the names of the tables in the given schema. + * * @param schemaName name of the schema * @return */ - public Set<String> updateTableList(String schemaName) { - List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName())) - .collect(Collectors.toList()); + private Set<String> updateTableList(String schemaName) { + List<CarbonFile> schema = + Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName())) + .collect(Collectors.toList()); if (schema.size() > 0) { - return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()) + return Stream.of((schema.get(0)).listFiles()).map(CarbonFile::getName) .collect(Collectors.toSet()); } else return ImmutableSet.of(); } /** * Get the CarbonTable instance of the given table. + * * @param schemaTableName name of the given table. * @return */ @@ -250,7 +223,7 @@ public class CarbonTableReader { * and cache all the table names in this.tableList. Notice that whenever this method * is called, it clears this.tableList and populate the list by reading the files. */ - public void updateSchemaTables() { + private void updateSchemaTables() { // update logic determine later if (carbonFileList == null) { updateSchemaList(); @@ -268,6 +241,7 @@ public class CarbonTableReader { /** * Find the table with the given name and build a CarbonTable instance for it. * This method should be called after this.updateSchemaTables(). + * * @param schemaTableName name of the given table. * @return */ @@ -282,10 +256,11 @@ public class CarbonTableReader { /** * Read the metadata of the given table and cache it in this.cc (CarbonTableReader cache). + * * @param table name of the given table. * @return the CarbonTable instance which contains all the needed metadata for a table. */ - public CarbonTable parseCarbonMetadata(SchemaTableName table) { + private CarbonTable parseCarbonMetadata(SchemaTableName table) { CarbonTable result = null; try { CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel()); @@ -295,14 +270,14 @@ public class CarbonTableReader { // Step 1: get store path of the table and cache it. String storePath = config.getStorePath(); - // create table identifier. the table id is randomly generated. + // create table identifier. the table id is randomly generated. cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString()); - // get the store path of the table. - cache.carbonTablePath = PathFactory.getInstance() - .getCarbonTablePath(storePath, cache.carbonTableIdentifier, null); - // cache the table + // get the store path of the table. + cache.carbonTablePath = + PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier, null); + // cache the table cc.put(table, cache); //Step 2: read the metadata (tableInfo) of the table. @@ -323,7 +298,7 @@ public class CarbonTableReader { // Step 3: convert format level TableInfo to code level TableInfo SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo. + // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo. TableInfo wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(), storePath); @@ -344,542 +319,57 @@ public class CarbonTableReader { return result; } - /** - * Apply filters to the table and get valid input splits of the table. - * @param tableCacheModel the table - * @param filters the filters - * @return - * @throws Exception - */ - public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, - Expression filters) throws Exception { - - // need apply filters to segment - FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); - UpdateVO invalidBlockVOForSegmentId = null; - Boolean IUDTable = false; - - AbsoluteTableIdentifier absoluteTableIdentifier = - tableCacheModel.carbonTable.getAbsoluteTableIdentifier(); - CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath()); - List<String> invalidSegments = new ArrayList<>(); - - // get all valid segments and set them into the configuration - SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = - segmentStatusManager.getValidAndInvalidSegments(); - - tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]); - if (segments.getValidSegments().size() == 0) { - return new ArrayList<>(0); - } - - // remove entry in the segment index if there are invalid segments - invalidSegments.addAll(segments.getInvalidSegments()); - if (invalidSegments.size() > 0) { - List<TableSegmentUniqueIdentifier> invalidSegmentsIds = - new ArrayList<>(invalidSegments.size()); - for (String segId : invalidSegments) { - invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId)); - } - cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds); - } - - TableProvider tableProvider = new SingleTableProvider(tableCacheModel.carbonTable); - - // get filter for segment - CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable); - FilterResolverIntf filterInterface = CarbonInputFormatUtil - .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier(), - tableProvider); - IUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); + public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, + Expression filters) { List<CarbonLocalInputSplit> result = new ArrayList<>(); - // for each segment fetch blocks matching filter in Driver BTree - for (String segmentNo : tableCacheModel.segments) { - - if (IUDTable) { - // update not being performed on this table. - invalidBlockVOForSegmentId = - updateStatusManager.getInvalidTimestampRange(segmentNo); - } - - try { - List<DataRefNode> dataRefNodes = - getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier, - tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient, - updateStatusManager); - for (DataRefNode dataRefNode : dataRefNodes) { - BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode; - TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo(); - - if (IUDTable) { - if (CarbonUtil - .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(), - invalidBlockVOForSegmentId, updateStatusManager)) { - continue; - } - } - result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(), - tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(), - Arrays.asList(tableBlockInfo.getLocations()), - tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), - tableBlockInfo.getVersion().number())); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - cacheClient.close(); - return result; - } - - /** - * Get all the data blocks of a given segment. - * @param filterExpressionProcessor - * @param absoluteTableIdentifier - * @param tablePath - * @param resolver - * @param segmentId - * @param cacheClient - * @param updateStatusManager - * @return - * @throws IOException - */ - private List<DataRefNode> getDataBlocksOfSegment( - FilterExpressionProcessor filterExpressionProcessor, - AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, - FilterResolverIntf resolver, String segmentId, CacheClient cacheClient, - SegmentUpdateStatusManager updateStatusManager) throws IOException { - //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance(); - //QueryStatistic statistic = new QueryStatistic(); - - // read segment index - Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = - getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient, - updateStatusManager); - - List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>(); - - if (null != segmentIndexMap) { - // build result - for (AbstractIndex abstractIndex : segmentIndexMap.values()) { - List<DataRefNode> filterredBlocks; - // if no filter is given, get all blocks from Btree Index - if (null == resolver) { - filterredBlocks = getDataBlocksOfIndex(abstractIndex); - } else { - // apply filter and get matching blocks - filterredBlocks = filterExpressionProcessor - .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex, - absoluteTableIdentifier); - } - resultFilterredBlocks.addAll(filterredBlocks); - } - } - //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); - //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/); - return resultFilterredBlocks; - } - - private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper, - UpdateVO updateDetails) { - Long refreshedTime = segmentTaskIndexWrapper.getRefreshedTimeStamp(); - Long updateTime = updateDetails.getLatestUpdateTimestamp(); - if (null != refreshedTime && null != updateTime && updateTime > refreshedTime) { - return true; - } - return false; - } - - /** - * Build and load the B-trees of the segment. - * @param absoluteTableIdentifier - * @param tablePath - * @param segmentId - * @param cacheClient - * @param updateStatusManager - * @return - * @throws IOException - */ - private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/ - AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId, - CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException { - Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null; - SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; - UpdateVO updateDetails = null; - boolean isSegmentUpdated = false; - Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null; - TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = - new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); - segmentTaskIndexWrapper = - cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier); - - // Until Updates or Deletes being performed on the table Invalid Blocks will not - // be formed. So it is unnecessary to get the InvalidTimeStampRange. - if (updateStatusManager.getUpdateStatusDetails().length != 0) { - updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId); - } - - if (null != segmentTaskIndexWrapper) { - segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); - // IUD operations should be performed on the table in order to mark the segment as Updated. - // For Normal table no need to check for invalided blocks as there will be none of them. - if ((null != updateDetails) && isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) { - taskKeys = segmentIndexMap.keySet(); - isSegmentUpdated = true; - } - } - - // if segment tree is not loaded, load the segment tree - if (segmentIndexMap == null || isSegmentUpdated) { - - List<FileStatus> fileStatusList = new LinkedList<FileStatus>(); - FileSystem fs = - getFileStatusOfSegments(new String[] { segmentId }, tablePath, fileStatusList); - List<InputSplit> splits = getSplit(fileStatusList, fs); + CarbonTable carbonTable = tableCacheModel.carbonTable; + TableInfo tableInfo = tableCacheModel.tableInfo; + Configuration config = new Configuration(); + config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, ""); + String carbonTablePath = PathFactory.getInstance() + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier().getStorePath(), + carbonTable.getCarbonTableIdentifier(), null).getPath(); + config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath); - List<FileSplit> carbonSplits = new ArrayList<>(); - for (InputSplit inputSplit : splits) { - FileSplit fileSplit = (FileSplit) inputSplit; - String segId = CarbonTablePath.DataPathUtil - .getSegmentId(fileSplit.getPath().toString());//è¿éçseperatoråºè¯¥æä¹å ï¼ï¼ - if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) { - continue; - } - carbonSplits.add(fileSplit); - } - - List<TableBlockInfo> tableBlockInfoList = new ArrayList<>(); - for (FileSplit inputSplit : carbonSplits) { - if ((null == updateDetails) || (isValidBlockBasedOnUpdateDetails( - taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId))) { - - BlockletInfos blockletInfos = new BlockletInfos(0, 0, - 0);//this level we do not need blocklet info!!!! Is this a trick? - tableBlockInfoList.add( - new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), segmentId, - inputSplit.getLocations(), inputSplit.getLength(), blockletInfos, - ColumnarFormatVersion - .valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//è¿éçnullæ¯å¦ä¼å¼å¸¸ï¼ + try { + CarbonTableInputFormat.setTableInfo(config, tableInfo); + CarbonTableInputFormat carbonTableInputFormat = + createInputFormat(config, carbonTable.getAbsoluteTableIdentifier(), filters); + JobConf jobConf = new JobConf(config); + Job job = Job.getInstance(jobConf); + List<InputSplit> splits = carbonTableInputFormat.getSplits(job); + CarbonInputSplit carbonInputSplit = null; + Gson gson = new Gson(); + if (splits != null && splits.size() > 0) { + for (InputSplit inputSplit : splits) { + carbonInputSplit = (CarbonInputSplit) inputSplit; + result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(), + carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), + carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()), + carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(), + carbonInputSplit.getDeleteDeltaFiles(), + gson.toJson(carbonInputSplit.getDetailInfo()))); } } - Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>(); - segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList); - // get Btree blocks for given segment - tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); - tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated); - segmentTaskIndexWrapper = - cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier); - segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); + } catch (IOException e) { + throw new RuntimeException("Error creating Splits from CarbonTableInputFormat", e); } - return segmentIndexMap; - } - - private boolean isValidBlockBasedOnUpdateDetails( - Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit, - UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) { - String taskID = null; - if (null != carbonInputSplit) { - if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) { - return false; - } - - if (null == taskKeys) { - return true; - } - - taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName()); - String bucketNo = - CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName()); - SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder = - new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo); - - String blockTimestamp = carbonInputSplit.getPath().getName() - .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1, - carbonInputSplit.getPath().getName().lastIndexOf('.')); - if (!(updateDetails.getUpdateDeltaStartTimestamp() != null - && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) { - if (!taskKeys.contains(taskBucketHolder)) { - return true; - } - } - } - return false; + return result; } - /** - * Get the input splits of a set of carbondata files. - * @param fileStatusList the file statuses of the set of carbondata files. - * @param targetSystem hdfs FileSystem - * @return - * @throws IOException - */ - private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem) + private CarbonTableInputFormat<Object> createInputFormat( Configuration conf, AbsoluteTableIdentifier identifier, Expression filterExpression) throws IOException { + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + CarbonTableInputFormat.setTablePath(conf, + identifier.appendWithLocalPrefix(identifier.getTablePath())); + CarbonTableInputFormat.setFilterPredicates(conf, filterExpression); - Iterator split = fileStatusList.iterator(); - - List<InputSplit> splits = new ArrayList<>(); - - while (true) { - while (true) { - while (split.hasNext()) { - // file is a carbondata file - FileStatus file = (FileStatus) split.next(); - Path path = file.getPath(); - long length = file.getLen(); - if (length != 0L) { - BlockLocation[] blkLocations; - if (file instanceof LocatedFileStatus) { - blkLocations = ((LocatedFileStatus) file).getBlockLocations(); - } else { - blkLocations = targetSystem.getFileBlockLocations(file, 0L, length); - } - - if (this.isSplitable()) { - long blockSize1 = file.getBlockSize(); - long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE); - - long bytesRemaining; - int blkIndex; - for ( - bytesRemaining = length; - (double) bytesRemaining / (double) splitSize > 1.1D;// when there are more than one splits left. - bytesRemaining -= splitSize) { - blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); - } - - if (bytesRemaining != 0L) { - blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); - } - } else { - splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, - blkLocations[0].getHosts())); - } - } else { - splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, - new String[0])); - } - } - return splits; - } - } - - } - - private String[] getValidPartitions() { - //TODO: has to Identify partitions by partition pruning - return new String[] { "0" }; - } - - /** - * Get all file statuses of the carbondata files with a segmentId in segmentsToConsider - * under the tablePath, and add them to the result. - * @param segmentsToConsider - * @param tablePath - * @param result - * @return the FileSystem instance been used in this function. - * @throws IOException - */ - private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath, - List<FileStatus> result) throws IOException { - String[] partitionsToConsider = getValidPartitions(); - if (partitionsToConsider.length == 0) { - throw new IOException("No partitions/data found"); - } - - FileSystem fs = null; - - //PathFilter inputFilter = getDataFileFilter(job); - - // get tokens for all the required FileSystem for table path - /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath }, - job.getConfiguration());*/ - - //get all data files of valid partitions and segments - for (int i = 0; i < partitionsToConsider.length; ++i) { - String partition = partitionsToConsider[i]; - - for (int j = 0; j < segmentsToConsider.length; ++j) { - String segmentId = segmentsToConsider[j]; - Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId)); - - try { - Configuration conf = new Configuration(); - fs = segmentPath.getFileSystem(conf); - - RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath); - while (iter.hasNext()) { - LocatedFileStatus stat = iter.next(); - if (DefaultFilter.accept(stat.getPath())) { - if (stat.isDirectory()) { - // DefaultFiler accepts carbondata files. - addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter); - } else { - result.add(stat); - } - } - } - } catch (Exception ex) { - System.out.println(ex.toString()); - } - } - } - return fs; - } - - /** - * Get the FileStatus of all carbondata files under the path recursively, - * and add the file statuses into the result - * @param result - * @param fs - * @param path - * @param inputFilter the filter used to determinate whether a path is a carbondata file - * @throws IOException - */ - protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, - PathFilter inputFilter) throws IOException { - RemoteIterator iter = fs.listLocatedStatus(path); - - while (iter.hasNext()) { - LocatedFileStatus stat = (LocatedFileStatus) iter.next(); - if (inputFilter.accept(stat.getPath())) { - if (stat.isDirectory()) { - this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); - } - } - } - + return format; } - /** - * Get the data blocks of a b tree. the root node of the b tree is abstractIndex.dataRefNode. - * BTreeNode is a sub class of DataRefNode. - * @param abstractIndex - * @return - */ - private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) { - List<DataRefNode> blocks = new LinkedList<DataRefNode>(); - SegmentProperties segmentProperties = abstractIndex.getSegmentProperties(); - try { - IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties); - IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties); - - // Add all blocks of btree into result - DataRefNodeFinder blockFinder = - new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize(), - segmentProperties.getNumberOfSortColumns(), - segmentProperties.getNumberOfNoDictSortColumns()); - DataRefNode startBlock = - blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey); - DataRefNode endBlock = - blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey); - while (startBlock != endBlock) { - blocks.add(startBlock); - startBlock = startBlock.getNextDataRefNode(); - } - blocks.add(endBlock); - - } catch (KeyGenException e) { - System.out.println("Could not generate start key" + e.getMessage()); - } - return blocks; - } - - private boolean isSplitable() { - try { - // Don't split the file if it is local file system - if (this.fileType == FileFactory.FileType.LOCAL) { - return false; - } - } catch (Exception e) { - return true; - } - return true; - } - - private long computeSplitSize(long blockSize, long minSize, long maxSize) { - return Math.max(minSize, Math.min(maxSize, blockSize)); - } - - private FileSplit makeSplit(Path file, long start, long length, String[] hosts) { - return new FileSplit(file, start, length, hosts); - } - - private int getBlockIndex(BlockLocation[] blkLocations, long offset) { - for (int i = 0; i < blkLocations.length; i++) { - // is the offset inside this block? - if ((blkLocations[i].getOffset() <= offset) && (offset - < blkLocations[i].getOffset() + blkLocations[i].getLength())) { - return i; - } - } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1; - throw new IllegalArgumentException("Offset " + offset + - " is outside of file (0.." + - fileLength + ")"); - } - - /** - * get total number of rows. for count(*) - * - * @throws IOException - * @throws IndexBuilderException - */ - public long getRowCount() throws IOException, IndexBuilderException { - long rowCount = 0; - /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier(); - - // no of core to load the blocks in driver - //addSegmentsIfEmpty(job, absoluteTableIdentifier); - int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE; - try { - numberOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT)); - } catch (NumberFormatException e) { - numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE; - } - // creating a thread pool - ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores); - List<Future<Map<String, AbstractIndex>>> loadedBlocks = - new ArrayList<Future<Map<String, AbstractIndex>>>(); - //for each segment fetch blocks matching filter in Driver BTree - for (String segmentNo : this.segmentList) { - // submitting the task - loadedBlocks - .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo))); - } - threadPool.shutdown(); - try { - threadPool.awaitTermination(1, TimeUnit.HOURS); - } catch (InterruptedException e) { - throw new IndexBuilderException(e); - } - try { - // adding all the rows of the blocks to get the total row - // count - for (Future<Map<String, AbstractIndex>> block : loadedBlocks) { - for (AbstractIndex abstractIndex : block.get().values()) { - rowCount += abstractIndex.getTotalNumberOfRows(); - } - } - } catch (InterruptedException | ExecutionException e) { - throw new IndexBuilderException(e); - }*/ - return rowCount; - } -} +} \ No newline at end of file
