Repository: carbondata
Updated Branches:
  refs/heads/master f1d84646a -> 3740535d5


[CARBONDATA-2267] [Presto] Support Reading CarbonData Standard Hive Partition 
From Presto Integration

Support Reading CarbonData Standard Hive Partition From Presto Integration

This closes #2139


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3740535d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3740535d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3740535d

Branch: refs/heads/master
Commit: 3740535d50e787df547dec2d2998591dfd1f9875
Parents: f1d8464
Author: anubhav100 <[email protected]>
Authored: Wed Apr 4 13:48:15 2018 +0530
Committer: chenliang613 <[email protected]>
Committed: Wed Apr 11 23:10:32 2018 +0800

----------------------------------------------------------------------
 .../presto/CarbondataSplitManager.java          |   2 +-
 .../carbondata/presto/PrestoFilterUtil.java     |  84 ++++++++++++++-
 .../presto/impl/CarbonTableReader.java          | 108 +++++++++++++++----
 3 files changed, 171 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3740535d/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index db2a5e7..811393f 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -69,7 +69,7 @@ public class CarbondataSplitManager implements 
ConnectorSplitManager {
     CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
     Expression filters = 
PrestoFilterUtil.parseFilterExpression(layoutHandle.getConstraint());
     try {
-      List<CarbonLocalInputSplit> splits = 
carbonTableReader.getInputSplits2(cache, filters);
+      List<CarbonLocalInputSplit> splits = 
carbonTableReader.getInputSplits2(cache, filters, layoutHandle.getConstraint());
 
       ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
       for (CarbonLocalInputSplit split : splits) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3740535d/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
index 6f9b0f3..65f5eb1 100644
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
@@ -19,16 +19,18 @@ package org.apache.carbondata.presto;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
@@ -43,6 +45,7 @@ import 
org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 
 import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.predicate.Domain;
 import com.facebook.presto.spi.predicate.Range;
 import com.facebook.presto.spi.predicate.TupleDomain;
@@ -60,7 +63,10 @@ import com.facebook.presto.spi.type.VarcharType;
 import com.google.common.collect.ImmutableList;
 import io.airlift.slice.Slice;
 
+import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
 
 /**
  * PrestoFilterUtil create the carbonData Expression from the presto-domain
@@ -69,6 +75,12 @@ public class PrestoFilterUtil {
 
   private static Map<Integer, Expression> filterMap = new HashMap<>();
 
+  private final static String HIVE_DEFAULT_DYNAMIC_PARTITION = 
"__HIVE_DEFAULT_PARTITION__";
+
+  /**
+   * @param carbondataColumnHandle
+   * @return
+   */
   private static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle 
carbondataColumnHandle) {
     Type colType = carbondataColumnHandle.getColumnType();
     if (colType == BooleanType.BOOLEAN) return DataTypes.BOOLEAN;
@@ -87,6 +99,72 @@ public class PrestoFilterUtil {
   }
 
   /**
+   * Return partition filters using domain constraints
+   * @param carbonTable
+   * @param originalConstraint
+   * @return
+   */
+  public static List<String> getPartitionFilters(CarbonTable carbonTable, 
TupleDomain<ColumnHandle> originalConstraint) {
+    List<ColumnSchema> columnSchemas = 
carbonTable.getPartitionInfo().getColumnSchemaList();
+    List<String> filter = new ArrayList<>();
+    for (ColumnHandle columnHandle : 
originalConstraint.getDomains().get().keySet()) {
+      CarbondataColumnHandle carbondataColumnHandle = (CarbondataColumnHandle) 
columnHandle;
+      List<ColumnSchema> partitionedColumnSchema = 
columnSchemas.stream().filter(
+          columnSchema -> 
carbondataColumnHandle.getColumnName().equals(columnSchema.getColumnName())).collect(toList());
+      if(partitionedColumnSchema.size() != 0) {
+        filter.addAll(createPartitionFilters(originalConstraint, 
carbondataColumnHandle));
+      }
+    }
+    return filter;
+  }
+
+  /** Returns list of partition key and values using domain constraints
+   * @param originalConstraint
+   * @param carbonDataColumnHandle
+   */
+  private static List<String> createPartitionFilters(TupleDomain<ColumnHandle> 
originalConstraint,
+      CarbondataColumnHandle carbonDataColumnHandle) {
+    List<String> filter = new ArrayList<>();
+    Domain domain = 
originalConstraint.getDomains().get().get(carbonDataColumnHandle);
+    if (domain != null && domain.isNullableSingleValue()) {
+      Object value = domain.getNullableSingleValue();
+      Type type = domain.getType();
+      if (value == null) {
+        filter.add(carbonDataColumnHandle.getColumnName() + "=" + 
HIVE_DEFAULT_DYNAMIC_PARTITION);
+      } else if(carbonDataColumnHandle.getColumnType() instanceof DecimalType) 
{
+        int scale = ((DecimalType) 
carbonDataColumnHandle.getColumnType()).getScale();
+        if (value instanceof Long) {
+          //create decimal value from Long
+          BigDecimal decimalValue = new BigDecimal(new 
BigInteger(String.valueOf(value)), scale);
+          filter.add(carbonDataColumnHandle.getColumnName() + "=" + 
decimalValue.toString());
+        } else if (value instanceof Slice) {
+          //create decimal value from Slice
+          BigDecimal decimalValue = new 
BigDecimal(Decimals.decodeUnscaledValue((Slice) value), scale);
+          filter.add(carbonDataColumnHandle.getColumnName() + "=" + 
decimalValue.toString());
+        }
+      } else if (value instanceof Slice) {
+        filter.add(carbonDataColumnHandle.getColumnName() + "=" + ((Slice) 
value).toStringUtf8());
+      } else if (value instanceof Long && 
carbonDataColumnHandle.getColumnType()
+          .equals(DateType.DATE)) {
+        Calendar c = Calendar.getInstance();
+        c.setTime(new java.sql.Date(0));
+        c.add(Calendar.DAY_OF_YEAR, ((Long) value).intValue());
+        java.sql.Date date = new java.sql.Date(c.getTime().getTime());
+        filter.add(carbonDataColumnHandle.getColumnName() + "=" + 
date.toString());
+      } else if (value instanceof Long && 
carbonDataColumnHandle.getColumnType()
+          .equals(TimestampType.TIMESTAMP)) {
+        String timeStamp = new Timestamp((Long) value).toString();
+        filter.add(carbonDataColumnHandle.getColumnName() + "=" + 
timeStamp.substring(0,timeStamp.indexOf('.')));
+      } else if ((value instanceof Boolean) || (value instanceof Double) || 
(value instanceof Long)) {
+        filter.add(carbonDataColumnHandle.getColumnName() + "=" + 
value.toString());
+      } else {
+        throw new PrestoException(NOT_SUPPORTED, format("Unsupported partition 
key type: %s", type.getDisplayName()));
+      }
+    }
+    return filter;
+  }
+
+  /**
    * Convert presto-TupleDomain predication into Carbon scan express condition
    *
    * @param originalConstraint presto-TupleDomain
@@ -103,6 +181,7 @@ public class PrestoFilterUtil {
       CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
       Type type = cdch.getColumnType();
 
+
       DataType coltype = Spi2CarbondataTypeMapper(cdch);
       Expression colExpression = new ColumnExpression(cdch.getColumnName(), 
coltype);
 
@@ -176,7 +255,7 @@ public class PrestoFilterUtil {
         ListExpression candidates = null;
         List<Expression> exs = singleValues.stream()
             .map((a) -> new LiteralExpression(a, coltype))
-            .collect(Collectors.toList());
+            .collect(toList());
         candidates = new ListExpression(exs);
         filters.add(new InExpression(colExpression, candidates));
       } else if (valueExpressionMap.size() > 0) {
@@ -234,7 +313,6 @@ public class PrestoFilterUtil {
       } else {
         return rawdata;
       }
-
     } else if (type.equals(BooleanType.BOOLEAN)) return rawdata;
     else if (type.equals(DateType.DATE)) {
       Calendar c = Calendar.getInstance();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3740535d/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index fa3fb9c..e8e986a 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -30,29 +31,41 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.PrestoFilterUtil;
 
 import com.facebook.presto.hadoop.$internal.com.google.gson.Gson;
 import 
com.facebook.presto.hadoop.$internal.io.netty.util.internal.ConcurrentSet;
+import 
com.facebook.presto.hadoop.$internal.org.apache.commons.collections.CollectionUtils;
+import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.SchemaTableName;
 import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.predicate.TupleDomain;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
@@ -102,6 +115,15 @@ public class CarbonTableReader {
    */
   private AtomicReference<HashMap<SchemaTableName, CarbonTableCacheModel>> 
carbonCache;
 
+  private LoadMetadataDetails loadMetadataDetails[];
+
+  /**
+   * Logger instance
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonTableReader.class.getName());
+
+
   @Inject public CarbonTableReader(CarbonTableConfig config) {
     this.config = requireNonNull(config, "CarbonTableConfig is null");
     this.carbonCache = new AtomicReference(new HashMap());
@@ -246,8 +268,8 @@ public class CarbonTableReader {
     try {
       if (isKeyExists
           && !FileFactory.isFileExist(
-              CarbonTablePath.getSchemaFilePath(
-                  
carbonCache.get().get(schemaTableName).carbonTable.getTablePath()), fileType)) {
+          CarbonTablePath.getSchemaFilePath(
+              
carbonCache.get().get(schemaTableName).carbonTable.getTablePath()), fileType)) {
         removeTableFromCache(schemaTableName);
         throw new TableNotFoundException(schemaTableName);
       }
@@ -317,8 +339,8 @@ public class CarbonTableReader {
       // Step 1: get store path of the table and cache it.
       // create table identifier. the table id is randomly generated.
       CarbonTableIdentifier carbonTableIdentifier =
-              new CarbonTableIdentifier(table.getSchemaName(), 
table.getTableName(),
-                      UUID.randomUUID().toString());
+          new CarbonTableIdentifier(table.getSchemaName(), 
table.getTableName(),
+              UUID.randomUUID().toString());
       String storePath = config.getStorePath();
       String tablePath = storePath + "/" + 
carbonTableIdentifier.getDatabaseName() + "/"
           + carbonTableIdentifier.getTableName();
@@ -333,10 +355,10 @@ public class CarbonTableReader {
         }
       };
       ThriftReader thriftReader =
-              new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), 
createTBase);
+          new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), 
createTBase);
       thriftReader.open();
       org.apache.carbondata.format.TableInfo tableInfo =
-              (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          (org.apache.carbondata.format.TableInfo) thriftReader.read();
       thriftReader.close();
 
 
@@ -364,9 +386,8 @@ public class CarbonTableReader {
     return result;
   }
 
-
   public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel 
tableCacheModel,
-                                                     Expression filters)  {
+      Expression filters, TupleDomain<ColumnHandle> constraints) throws 
IOException {
     List<CarbonLocalInputSplit> result = new ArrayList<>();
     if(config.getUnsafeMemoryInMb() != null) {
       CarbonProperties.getInstance().addProperty(
@@ -387,11 +408,25 @@ public class CarbonTableReader {
     config.set(CarbonTableInputFormat.DATABASE_NAME, 
carbonTable.getDatabaseName());
     config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
 
+    JobConf jobConf = new JobConf(config);
+    List<PartitionSpec> filteredPartitions = new ArrayList();
+
+    PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getTableName());
+
+    if(partitionInfo!=null && partitionInfo.getPartitionType()== 
PartitionType.NATIVE_HIVE) {
+      try {
+        loadMetadataDetails= SegmentStatusManager
+            
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+      } catch (IOException exception) {
+        LOGGER.error(exception.getMessage());
+        throw exception;
+      }
+      filteredPartitions = findRequiredPartitions(constraints, 
carbonTable,loadMetadataDetails);
+    }
     try {
       CarbonTableInputFormat.setTableInfo(config, tableInfo);
       CarbonTableInputFormat carbonTableInputFormat =
-              createInputFormat(config, 
carbonTable.getAbsoluteTableIdentifier(), filters);
-      JobConf jobConf = new JobConf(config);
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(), 
filters,filteredPartitions);
       Job job = Job.getInstance(jobConf);
       List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
       CarbonInputSplit carbonInputSplit = null;
@@ -400,11 +435,11 @@ public class CarbonTableReader {
         for (InputSplit inputSplit : splits) {
           carbonInputSplit = (CarbonInputSplit) inputSplit;
           result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
-                  carbonInputSplit.getPath().toString(), 
carbonInputSplit.getStart(),
-                  carbonInputSplit.getLength(), 
Arrays.asList(carbonInputSplit.getLocations()),
-                  carbonInputSplit.getNumberOfBlocklets(), 
carbonInputSplit.getVersion().number(),
-                  carbonInputSplit.getDeleteDeltaFiles(),
-                  gson.toJson(carbonInputSplit.getDetailInfo())));
+              carbonInputSplit.getPath().toString(), 
carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), 
Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), 
carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(),
+              gson.toJson(carbonInputSplit.getDetailInfo())));
         }
       }
 
@@ -415,14 +450,49 @@ public class CarbonTableReader {
     return result;
   }
 
+  /** Returns list of partition specs to query based on the domain constraints
+   * @param constraints
+   * @param carbonTable
+   * @throws IOException
+   */
+  private List<PartitionSpec> findRequiredPartitions(TupleDomain<ColumnHandle> 
constraints, CarbonTable carbonTable,
+      LoadMetadataDetails[]loadMetadataDetails) throws IOException {
+    Set<PartitionSpec> partitionSpecs = new HashSet<>();
+    List<PartitionSpec> prunePartitions = new ArrayList();
+
+    for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+      SegmentFileStore segmentFileStore = null;
+      try {
+        segmentFileStore =
+            new SegmentFileStore(carbonTable.getTablePath(), 
loadMetadataDetail.getSegmentFile());
+        partitionSpecs.addAll(segmentFileStore.getPartitionSpecs());
+
+      } catch (IOException exception) {
+        LOGGER.error(exception.getMessage());
+        throw exception;
+      }
+    }
+    List<String> partitionValuesFromExpression =
+        PrestoFilterUtil.getPartitionFilters(carbonTable, constraints);
+
+    List<PartitionSpec> partitionSpecList = partitionSpecs.stream().filter( 
partitionSpec ->
+        CollectionUtils.isSubCollection(partitionValuesFromExpression, 
partitionSpec.getPartitions())).collect(Collectors.toList());
+
+    prunePartitions.addAll(partitionSpecList);
+
+    return prunePartitions;
+  }
+
   private CarbonTableInputFormat<Object>  createInputFormat( Configuration 
conf,
-       AbsoluteTableIdentifier identifier, Expression filterExpression)
-          throws IOException {
+      AbsoluteTableIdentifier identifier, Expression filterExpression, 
List<PartitionSpec> filteredPartitions)
+      throws IOException {
     CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
     CarbonTableInputFormat.setTablePath(conf,
-            identifier.appendWithLocalPrefix(identifier.getTablePath()));
+        identifier.appendWithLocalPrefix(identifier.getTablePath()));
     CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
-
+    if(filteredPartitions.size() != 0) {
+      CarbonTableInputFormat.setPartitionsToPrune(conf, filteredPartitions);
+    }
     return format;
   }
 

Reply via email to