Repository: carbondata Updated Branches: refs/heads/master f1d84646a -> 3740535d5
[CARBONDATA-2267] [Presto] Support Reading CarbonData Standard Hive Partition From Presto Integration Support Reading CarbonData Standard Hive Partition From Presto Integration This closes #2139 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3740535d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3740535d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3740535d Branch: refs/heads/master Commit: 3740535d50e787df547dec2d2998591dfd1f9875 Parents: f1d8464 Author: anubhav100 <[email protected]> Authored: Wed Apr 4 13:48:15 2018 +0530 Committer: chenliang613 <[email protected]> Committed: Wed Apr 11 23:10:32 2018 +0800 ---------------------------------------------------------------------- .../presto/CarbondataSplitManager.java | 2 +- .../carbondata/presto/PrestoFilterUtil.java | 84 ++++++++++++++- .../presto/impl/CarbonTableReader.java | 108 +++++++++++++++---- 3 files changed, 171 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3740535d/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java index db2a5e7..811393f 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java @@ -69,7 +69,7 @@ public class CarbondataSplitManager implements ConnectorSplitManager { CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key); Expression filters = PrestoFilterUtil.parseFilterExpression(layoutHandle.getConstraint()); try { - List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters); + List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters, layoutHandle.getConstraint()); ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder(); for (CarbonLocalInputSplit split : splits) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3740535d/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java index 6f9b0f3..65f5eb1 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java @@ -19,16 +19,18 @@ package org.apache.carbondata.presto; import java.math.BigDecimal; import java.math.BigInteger; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.expression.LiteralExpression; @@ -43,6 +45,7 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression; import org.apache.carbondata.core.scan.expression.logical.OrExpression; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.predicate.Domain; import com.facebook.presto.spi.predicate.Range; import com.facebook.presto.spi.predicate.TupleDomain; @@ -60,7 +63,10 @@ import com.facebook.presto.spi.type.VarcharType; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; /** * PrestoFilterUtil create the carbonData Expression from the presto-domain @@ -69,6 +75,12 @@ public class PrestoFilterUtil { private static Map<Integer, Expression> filterMap = new HashMap<>(); + private final static String HIVE_DEFAULT_DYNAMIC_PARTITION = "__HIVE_DEFAULT_PARTITION__"; + + /** + * @param carbondataColumnHandle + * @return + */ private static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) { Type colType = carbondataColumnHandle.getColumnType(); if (colType == BooleanType.BOOLEAN) return DataTypes.BOOLEAN; @@ -87,6 +99,72 @@ public class PrestoFilterUtil { } /** + * Return partition filters using domain constraints + * @param carbonTable + * @param originalConstraint + * @return + */ + public static List<String> getPartitionFilters(CarbonTable carbonTable, TupleDomain<ColumnHandle> originalConstraint) { + List<ColumnSchema> columnSchemas = carbonTable.getPartitionInfo().getColumnSchemaList(); + List<String> filter = new ArrayList<>(); + for (ColumnHandle columnHandle : originalConstraint.getDomains().get().keySet()) { + CarbondataColumnHandle carbondataColumnHandle = (CarbondataColumnHandle) columnHandle; + List<ColumnSchema> partitionedColumnSchema = columnSchemas.stream().filter( + columnSchema -> carbondataColumnHandle.getColumnName().equals(columnSchema.getColumnName())).collect(toList()); + if(partitionedColumnSchema.size() != 0) { + filter.addAll(createPartitionFilters(originalConstraint, carbondataColumnHandle)); + } + } + return filter; + } + + /** Returns list of partition key and values using domain constraints + * @param originalConstraint + * @param carbonDataColumnHandle + */ + private static List<String> createPartitionFilters(TupleDomain<ColumnHandle> originalConstraint, + CarbondataColumnHandle carbonDataColumnHandle) { + List<String> filter = new ArrayList<>(); + Domain domain = originalConstraint.getDomains().get().get(carbonDataColumnHandle); + if (domain != null && domain.isNullableSingleValue()) { + Object value = domain.getNullableSingleValue(); + Type type = domain.getType(); + if (value == null) { + filter.add(carbonDataColumnHandle.getColumnName() + "=" + HIVE_DEFAULT_DYNAMIC_PARTITION); + } else if(carbonDataColumnHandle.getColumnType() instanceof DecimalType) { + int scale = ((DecimalType) carbonDataColumnHandle.getColumnType()).getScale(); + if (value instanceof Long) { + //create decimal value from Long + BigDecimal decimalValue = new BigDecimal(new BigInteger(String.valueOf(value)), scale); + filter.add(carbonDataColumnHandle.getColumnName() + "=" + decimalValue.toString()); + } else if (value instanceof Slice) { + //create decimal value from Slice + BigDecimal decimalValue = new BigDecimal(Decimals.decodeUnscaledValue((Slice) value), scale); + filter.add(carbonDataColumnHandle.getColumnName() + "=" + decimalValue.toString()); + } + } else if (value instanceof Slice) { + filter.add(carbonDataColumnHandle.getColumnName() + "=" + ((Slice) value).toStringUtf8()); + } else if (value instanceof Long && carbonDataColumnHandle.getColumnType() + .equals(DateType.DATE)) { + Calendar c = Calendar.getInstance(); + c.setTime(new java.sql.Date(0)); + c.add(Calendar.DAY_OF_YEAR, ((Long) value).intValue()); + java.sql.Date date = new java.sql.Date(c.getTime().getTime()); + filter.add(carbonDataColumnHandle.getColumnName() + "=" + date.toString()); + } else if (value instanceof Long && carbonDataColumnHandle.getColumnType() + .equals(TimestampType.TIMESTAMP)) { + String timeStamp = new Timestamp((Long) value).toString(); + filter.add(carbonDataColumnHandle.getColumnName() + "=" + timeStamp.substring(0,timeStamp.indexOf('.'))); + } else if ((value instanceof Boolean) || (value instanceof Double) || (value instanceof Long)) { + filter.add(carbonDataColumnHandle.getColumnName() + "=" + value.toString()); + } else { + throw new PrestoException(NOT_SUPPORTED, format("Unsupported partition key type: %s", type.getDisplayName())); + } + } + return filter; + } + + /** * Convert presto-TupleDomain predication into Carbon scan express condition * * @param originalConstraint presto-TupleDomain @@ -103,6 +181,7 @@ public class PrestoFilterUtil { CarbondataColumnHandle cdch = (CarbondataColumnHandle) c; Type type = cdch.getColumnType(); + DataType coltype = Spi2CarbondataTypeMapper(cdch); Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype); @@ -176,7 +255,7 @@ public class PrestoFilterUtil { ListExpression candidates = null; List<Expression> exs = singleValues.stream() .map((a) -> new LiteralExpression(a, coltype)) - .collect(Collectors.toList()); + .collect(toList()); candidates = new ListExpression(exs); filters.add(new InExpression(colExpression, candidates)); } else if (valueExpressionMap.size() > 0) { @@ -234,7 +313,6 @@ public class PrestoFilterUtil { } else { return rawdata; } - } else if (type.equals(BooleanType.BOOLEAN)) return rawdata; else if (type.equals(DateType.DATE)) { Calendar c = Calendar.getInstance(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/3740535d/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index fa3fb9c..e8e986a 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; @@ -30,29 +31,41 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.reader.ThriftReader; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.presto.PrestoFilterUtil; import com.facebook.presto.hadoop.$internal.com.google.gson.Gson; import com.facebook.presto.hadoop.$internal.io.netty.util.internal.ConcurrentSet; +import com.facebook.presto.hadoop.$internal.org.apache.commons.collections.CollectionUtils; +import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.predicate.TupleDomain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; @@ -102,6 +115,15 @@ public class CarbonTableReader { */ private AtomicReference<HashMap<SchemaTableName, CarbonTableCacheModel>> carbonCache; + private LoadMetadataDetails loadMetadataDetails[]; + + /** + * Logger instance + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonTableReader.class.getName()); + + @Inject public CarbonTableReader(CarbonTableConfig config) { this.config = requireNonNull(config, "CarbonTableConfig is null"); this.carbonCache = new AtomicReference(new HashMap()); @@ -246,8 +268,8 @@ public class CarbonTableReader { try { if (isKeyExists && !FileFactory.isFileExist( - CarbonTablePath.getSchemaFilePath( - carbonCache.get().get(schemaTableName).carbonTable.getTablePath()), fileType)) { + CarbonTablePath.getSchemaFilePath( + carbonCache.get().get(schemaTableName).carbonTable.getTablePath()), fileType)) { removeTableFromCache(schemaTableName); throw new TableNotFoundException(schemaTableName); } @@ -317,8 +339,8 @@ public class CarbonTableReader { // Step 1: get store path of the table and cache it. // create table identifier. the table id is randomly generated. CarbonTableIdentifier carbonTableIdentifier = - new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), - UUID.randomUUID().toString()); + new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), + UUID.randomUUID().toString()); String storePath = config.getStorePath(); String tablePath = storePath + "/" + carbonTableIdentifier.getDatabaseName() + "/" + carbonTableIdentifier.getTableName(); @@ -333,10 +355,10 @@ public class CarbonTableReader { } }; ThriftReader thriftReader = - new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), createTBase); + new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), createTBase); thriftReader.open(); org.apache.carbondata.format.TableInfo tableInfo = - (org.apache.carbondata.format.TableInfo) thriftReader.read(); + (org.apache.carbondata.format.TableInfo) thriftReader.read(); thriftReader.close(); @@ -364,9 +386,8 @@ public class CarbonTableReader { return result; } - public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, - Expression filters) { + Expression filters, TupleDomain<ColumnHandle> constraints) throws IOException { List<CarbonLocalInputSplit> result = new ArrayList<>(); if(config.getUnsafeMemoryInMb() != null) { CarbonProperties.getInstance().addProperty( @@ -387,11 +408,25 @@ public class CarbonTableReader { config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName()); config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName()); + JobConf jobConf = new JobConf(config); + List<PartitionSpec> filteredPartitions = new ArrayList(); + + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName()); + + if(partitionInfo!=null && partitionInfo.getPartitionType()== PartitionType.NATIVE_HIVE) { + try { + loadMetadataDetails= SegmentStatusManager + .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath())); + } catch (IOException exception) { + LOGGER.error(exception.getMessage()); + throw exception; + } + filteredPartitions = findRequiredPartitions(constraints, carbonTable,loadMetadataDetails); + } try { CarbonTableInputFormat.setTableInfo(config, tableInfo); CarbonTableInputFormat carbonTableInputFormat = - createInputFormat(config, carbonTable.getAbsoluteTableIdentifier(), filters); - JobConf jobConf = new JobConf(config); + createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(), filters,filteredPartitions); Job job = Job.getInstance(jobConf); List<InputSplit> splits = carbonTableInputFormat.getSplits(job); CarbonInputSplit carbonInputSplit = null; @@ -400,11 +435,11 @@ public class CarbonTableReader { for (InputSplit inputSplit : splits) { carbonInputSplit = (CarbonInputSplit) inputSplit; result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(), - carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), - carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()), - carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(), - carbonInputSplit.getDeleteDeltaFiles(), - gson.toJson(carbonInputSplit.getDetailInfo()))); + carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), + carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()), + carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(), + carbonInputSplit.getDeleteDeltaFiles(), + gson.toJson(carbonInputSplit.getDetailInfo()))); } } @@ -415,14 +450,49 @@ public class CarbonTableReader { return result; } + /** Returns list of partition specs to query based on the domain constraints + * @param constraints + * @param carbonTable + * @throws IOException + */ + private List<PartitionSpec> findRequiredPartitions(TupleDomain<ColumnHandle> constraints, CarbonTable carbonTable, + LoadMetadataDetails[]loadMetadataDetails) throws IOException { + Set<PartitionSpec> partitionSpecs = new HashSet<>(); + List<PartitionSpec> prunePartitions = new ArrayList(); + + for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) { + SegmentFileStore segmentFileStore = null; + try { + segmentFileStore = + new SegmentFileStore(carbonTable.getTablePath(), loadMetadataDetail.getSegmentFile()); + partitionSpecs.addAll(segmentFileStore.getPartitionSpecs()); + + } catch (IOException exception) { + LOGGER.error(exception.getMessage()); + throw exception; + } + } + List<String> partitionValuesFromExpression = + PrestoFilterUtil.getPartitionFilters(carbonTable, constraints); + + List<PartitionSpec> partitionSpecList = partitionSpecs.stream().filter( partitionSpec -> + CollectionUtils.isSubCollection(partitionValuesFromExpression, partitionSpec.getPartitions())).collect(Collectors.toList()); + + prunePartitions.addAll(partitionSpecList); + + return prunePartitions; + } + private CarbonTableInputFormat<Object> createInputFormat( Configuration conf, - AbsoluteTableIdentifier identifier, Expression filterExpression) - throws IOException { + AbsoluteTableIdentifier identifier, Expression filterExpression, List<PartitionSpec> filteredPartitions) + throws IOException { CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); CarbonTableInputFormat.setTablePath(conf, - identifier.appendWithLocalPrefix(identifier.getTablePath())); + identifier.appendWithLocalPrefix(identifier.getTablePath())); CarbonTableInputFormat.setFilterPredicates(conf, filterExpression); - + if(filteredPartitions.size() != 0) { + CarbonTableInputFormat.setPartitionsToPrune(conf, filteredPartitions); + } return format; }
