This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1516472 [CARBONDATA-3974] Improve partition purning performance in
presto carbon integration
1516472 is described below
commit 15164725bf20b8d2ef5208ec3ab2e4991de2f64f
Author: ajantha-bhat <[email protected]>
AuthorDate: Fri Sep 4 22:34:19 2020 +0530
[CARBONDATA-3974] Improve partition purning performance in presto carbon
integration
Why is this PR needed?
a) For 200K segments table in cloud, presto partition query was taking
more than 5 hours. the reason is it was reading all segment files for
partition pruning. Now it is less than a minute !
b) If the query filter is < or > or more than one value.
Partition pruning is not working in presto.
c) prestodb profile compilation is broken from previous PR
What changes were proposed in this PR?
a) HiveTableHandle already have partition spec, matching for the filters
(it has queried metastore to get all partitions and pruned it). So, create
partitionSpec based on that. Also handled for both prestodb and prestosql
b) #3885 , broke prestodb compilation, only prestosql is compiled.
c) #3887, also didn't handled prestodb
This closes #3913
---
.../apache/carbondata/presto/CarbondataModule.java | 2 +
.../carbondata/presto/CarbondataSplitManager.java | 17 +-
.../carbondata/presto/impl/CarbonTableReader.java | 13 +-
.../presto/readers/ComplexTypeStreamReader.java | 196 +++++++++++++++++++++
.../presto/readers/SliceStreamReader.java | 2 +-
.../carbondata/presto/CarbondataSplitManager.java | 28 ++-
.../carbondata/presto/impl/CarbonTableReader.java | 16 +-
.../carbondata/presto/server/PrestoTestUtil.scala | 117 ++++++++++++
.../carbondata/presto/server/PrestoTestUtil.scala | 117 ++++++++++++
.../PrestoTestNonTransactionalTableFiles.scala | 44 +----
10 files changed, 474 insertions(+), 78 deletions(-)
diff --git
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
index 56b52b0..401b5b4 100755
---
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
+++
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
@@ -21,6 +21,8 @@ import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.presto.impl.CarbonTableReader;
import com.facebook.presto.hive.CoercionPolicy;
diff --git
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
index fbb387f..3dccde5 100755
---
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
+++
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -27,11 +27,14 @@ import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import static java.util.Objects.requireNonNull;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
@@ -48,6 +51,7 @@ import com.facebook.presto.hive.ForHiveClient;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HivePartition;
import com.facebook.presto.hive.HiveSplit;
import com.facebook.presto.hive.HiveSplitManager;
import com.facebook.presto.hive.HiveTableLayoutHandle;
@@ -55,6 +59,7 @@ import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.hive.NamenodeStats;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.Table;
+import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
@@ -117,6 +122,16 @@ public class CarbondataSplitManager extends
HiveSplitManager {
// file metastore case tablePath can be null, so get from location
location = table.getStorage().getLocation();
}
+ List<PartitionSpec> filteredPartitions = new ArrayList<>();
+ if (layout.getPartitionColumns().size() > 0 &&
layout.getPartitions().isPresent()) {
+ List<String> colNames =
+ layout.getPartitionColumns().stream().map(x -> ((HiveColumnHandle)
x).getName())
+ .collect(Collectors.toList());
+ for (HivePartition partition : layout.getPartitions().get()) {
+ filteredPartitions.add(new PartitionSpec(colNames,
+ location + CarbonCommonConstants.FILE_SEPARATOR +
partition.getPartitionId()));
+ }
+ }
String queryId = System.nanoTime() + "";
QueryStatistic statistic = new QueryStatistic();
QueryStatisticsRecorder statisticRecorder =
CarbonTimeStatisticsFactory.createDriverRecorder();
@@ -139,7 +154,7 @@ public class CarbondataSplitManager extends
HiveSplitManager {
try {
List<CarbonLocalMultiBlockSplit> splits =
- carbonTableReader.getInputSplits(cache, filters, predicate,
configuration);
+ carbonTableReader.getInputSplits(cache, filters, filteredPartitions,
configuration);
ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
long index = 0;
diff --git
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
index d4d4e88..67b4656 100755
---
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -247,7 +247,7 @@ public class CarbonTableReader {
*
* @param tableCacheModel cached table
* @param filters carbonData filters
- * @param constraints presto filters
+ * @param filteredPartitions matched partitionSpec for the filter
* @param config hadoop conf
* @return list of multiblock split
* @throws IOException
@@ -255,7 +255,7 @@ public class CarbonTableReader {
public List<CarbonLocalMultiBlockSplit> getInputSplits(
CarbonTableCacheModel tableCacheModel,
Expression filters,
- TupleDomain<HiveColumnHandle> constraints,
+ List<PartitionSpec> filteredPartitions,
Configuration config) throws IOException {
List<CarbonLocalInputSplit> result = new ArrayList<>();
List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
@@ -272,15 +272,6 @@ public class CarbonTableReader {
CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
JobConf jobConf = new JobConf(config);
- List<PartitionSpec> filteredPartitions = new ArrayList<>();
-
- PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
- LoadMetadataDetails[] loadMetadataDetails = null;
- if (partitionInfo != null && partitionInfo.getPartitionType() ==
PartitionType.NATIVE_HIVE) {
- loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
- CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
- filteredPartitions = findRequiredPartitions(constraints, carbonTable,
loadMetadataDetails);
- }
try {
CarbonTableInputFormat.setTableInfo(config, tableInfo);
CarbonTableInputFormat<Object> carbonTableInputFormat =
diff --git
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
new file mode 100644
index 0000000..090ac39
--- /dev/null
+++
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import com.facebook.presto.spi.block.ArrayBlock;
+import com.facebook.presto.spi.block.RowBlock;
+import com.facebook.presto.spi.type.*;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+
+import org.apache.carbondata.presto.CarbonVectorBatch;
+import org.apache.carbondata.presto.ColumnarVectorWrapperDirect;
+
+/**
+ * Class to read the complex type Stream [array/struct/map]
+ */
+
+public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
+ implements PrestoVectorBlockBuilder {
+
+ protected int batchSize;
+
+ protected Type type;
+ protected BlockBuilder builder;
+
+ public ComplexTypeStreamReader(int batchSize, StructField field) {
+ super(batchSize, field.getDataType());
+ this.batchSize = batchSize;
+ this.type = getType(field);
+ List<CarbonColumnVector> childrenList = new ArrayList<>();
+ for (StructField child : field.getChildren()) {
+ childrenList.add(new ColumnarVectorWrapperDirect(Objects.requireNonNull(
+ CarbonVectorBatch.createDirectStreamReader(this.batchSize,
child.getDataType(), child))));
+ }
+ setChildrenVector(childrenList);
+ this.builder = type.createBlockBuilder(null, batchSize);
+ }
+
+ Type getType(StructField field) {
+ DataType dataType = field.getDataType();
+ if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
+ return VarcharType.VARCHAR;
+ } else if (dataType == DataTypes.SHORT) {
+ return SmallintType.SMALLINT;
+ } else if (dataType == DataTypes.INT) {
+ return IntegerType.INTEGER;
+ } else if (dataType == DataTypes.LONG) {
+ return BigintType.BIGINT;
+ } else if (dataType == DataTypes.DOUBLE) {
+ return DoubleType.DOUBLE;
+ } else if (dataType == DataTypes.FLOAT) {
+ return RealType.REAL;
+ } else if (dataType == DataTypes.BOOLEAN) {
+ return BooleanType.BOOLEAN;
+ } else if (dataType == DataTypes.BINARY) {
+ return VarbinaryType.VARBINARY;
+ } else if (dataType == DataTypes.DATE) {
+ return DateType.DATE;
+ } else if (dataType == DataTypes.TIMESTAMP) {
+ return TimestampType.TIMESTAMP;
+ } else if (dataType == DataTypes.BYTE) {
+ return TinyintType.TINYINT;
+ } else if (DataTypes.isDecimal(dataType)) {
+ org.apache.carbondata.core.metadata.datatype.DecimalType decimal =
+ (org.apache.carbondata.core.metadata.datatype.DecimalType) dataType;
+ return DecimalType.createDecimalType(decimal.getPrecision(),
decimal.getScale());
+ } else if (DataTypes.isArrayType(dataType)) {
+ return new ArrayType(getType(field.getChildren().get(0)));
+ } else if (DataTypes.isStructType(dataType)) {
+ List<RowType.Field> children = new ArrayList<>();
+ for (StructField child : field.getChildren()) {
+ children.add(new RowType.Field(Optional.of(child.getFieldName()),
getType(child)));
+ }
+ return RowType.from(children);
+ } else {
+ throw new UnsupportedOperationException("Unsupported type: " + dataType);
+ }
+ }
+
+ @Override public Block buildBlock() {
+ return builder.build();
+ }
+
+ @Override public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public void putComplexObject(List<Integer> offsetVector) {
+ if (type instanceof ArrayType) {
+ // build child block
+ Block childBlock = buildChildBlock(getChildrenVector().get(0));
+ // prepare an offset vector with 0 as initial offset
+ int[] offsetVectorArray = new int[offsetVector.size() + 1];
+ for (int i = 1; i <= offsetVector.size(); i++) {
+ offsetVectorArray[i] = offsetVectorArray[i - 1] + offsetVector.get(i -
1);
+ }
+ // prepare Array block
+ Block arrayBlock = ArrayBlock
+ .fromElementBlock(offsetVector.size(), Optional.empty(),
offsetVectorArray,
+ childBlock);
+ for (int position = 0; position < offsetVector.size(); position++) {
+ type.writeObject(builder, arrayBlock.getObject(position, Block.class));
+ }
+ getChildrenVector().get(0).getColumnVector().reset();
+ } else {
+ // build child blocks
+ List<Block> childBlocks = new ArrayList<>(getChildrenVector().size());
+ for (CarbonColumnVector child : getChildrenVector()) {
+ childBlocks.add(buildChildBlock(child));
+ }
+ // prepare ROW block
+ Block rowBlock = RowBlock
+ .fromFieldBlocks(childBlocks.get(0).getPositionCount(),
Optional.empty(),
+ childBlocks.toArray(new Block[0]));
+ for (int position = 0; position < childBlocks.get(0).getPositionCount();
position++) {
+ type.writeObject(builder, rowBlock.getObject(position, Block.class));
+ }
+ for (CarbonColumnVector child : getChildrenVector()) {
+ child.getColumnVector().reset();
+ }
+ }
+ }
+
+ private Block buildChildBlock(CarbonColumnVector carbonColumnVector) {
+ DataType dataType = carbonColumnVector.getType();
+ carbonColumnVector = carbonColumnVector.getColumnVector();
+ if (dataType == DataTypes.STRING || dataType == DataTypes.BINARY
+ || dataType == DataTypes.VARCHAR) {
+ return ((SliceStreamReader) carbonColumnVector).buildBlock();
+ } else if (dataType == DataTypes.SHORT) {
+ return ((ShortStreamReader) carbonColumnVector).buildBlock();
+ } else if (dataType == DataTypes.INT || dataType == DataTypes.DATE) {
+ return ((IntegerStreamReader) carbonColumnVector).buildBlock();
+ } else if (dataType == DataTypes.LONG) {
+ return ((LongStreamReader) carbonColumnVector).buildBlock();
+ } else if (dataType == DataTypes.DOUBLE) {
+ return ((DoubleStreamReader) carbonColumnVector).buildBlock();
+ } else if (dataType == DataTypes.FLOAT) {
+ return ((FloatStreamReader) carbonColumnVector).buildBlock();
+ } else if (dataType == DataTypes.TIMESTAMP) {
+ return ((TimestampStreamReader) carbonColumnVector).buildBlock();
+ } else if (dataType == DataTypes.BOOLEAN) {
+ return ((BooleanStreamReader) carbonColumnVector).buildBlock();
+ } else if (DataTypes.isDecimal(dataType)) {
+ return ((DecimalSliceStreamReader) carbonColumnVector).buildBlock();
+ } else if (dataType == DataTypes.BYTE) {
+ return ((ByteStreamReader) carbonColumnVector).buildBlock();
+ } else if (DataTypes.isArrayType(dataType) ||
(DataTypes.isStructType(dataType))) {
+ return ((ComplexTypeStreamReader) carbonColumnVector).buildBlock();
+ } else {
+ throw new UnsupportedOperationException("unsupported for type :" +
dataType);
+ }
+ }
+
+ @Override public void putNull(int rowId) {
+ builder.appendNull();
+ }
+
+ @Override public void reset() {
+ builder = type.createBlockBuilder(null, batchSize);
+ }
+
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; i++) {
+ builder.appendNull();
+ }
+ }
+}
+
diff --git
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/SliceStreamReader.java
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/SliceStreamReader.java
index 51deb54..7cadb38 100644
---
a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++
b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -151,7 +151,7 @@ public class SliceStreamReader extends
CarbonColumnVectorImpl implements PrestoV
putNull(rowId);
} else {
if (dictionaryBlock == null) {
- putByteArray(rowId, ByteUtil.toBytes((String) value));
+ putByteArray(rowId, (byte []) value);
} else {
putInt(rowId, (int) value);
}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
index 76c3fd0..633ad61 100755
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -27,11 +27,14 @@ import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import static java.util.Objects.requireNonNull;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
@@ -49,6 +52,7 @@ import io.prestosql.plugin.hive.ForHive;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HivePartition;
import io.prestosql.plugin.hive.HivePartitionManager;
import io.prestosql.plugin.hive.HiveSplit;
import io.prestosql.plugin.hive.HiveSplitManager;
@@ -106,10 +110,8 @@ public class CarbondataSplitManager extends
HiveSplitManager {
public ConnectorSplitSource getSplits(ConnectorTransactionHandle
transactionHandle,
ConnectorSession session, ConnectorTableHandle tableHandle,
SplitSchedulingStrategy splitSchedulingStrategy) {
-
- HiveTableHandle hiveTable = (HiveTableHandle) tableHandle;
- SchemaTableName schemaTableName = hiveTable.getSchemaTableName();
-
+ HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
+ SchemaTableName schemaTableName = hiveTableHandle.getSchemaTableName();
carbonTableReader.setPrestoQueryId(session.getQueryId());
// get table metadata
SemiTransactionalHiveMetastore metastore =
@@ -126,6 +128,17 @@ public class CarbondataSplitManager extends
HiveSplitManager {
// file metastore case tablePath can be null, so get from location
location = table.getStorage().getLocation();
}
+ List<PartitionSpec> filteredPartitions = new ArrayList<>();
+ if (hiveTableHandle.getPartitionColumns().size() > 0 &&
hiveTableHandle.getPartitions()
+ .isPresent()) {
+ List<String> colNames =
+
hiveTableHandle.getPartitionColumns().stream().map(HiveColumnHandle::getName)
+ .collect(Collectors.toList());
+ for (HivePartition partition : hiveTableHandle.getPartitions().get()) {
+ filteredPartitions.add(new PartitionSpec(colNames,
+ location + CarbonCommonConstants.FILE_SEPARATOR +
partition.getPartitionId()));
+ }
+ }
String queryId = System.nanoTime() + "";
QueryStatistic statistic = new QueryStatistic();
QueryStatisticsRecorder statisticRecorder =
CarbonTimeStatisticsFactory.createDriverRecorder();
@@ -134,8 +147,7 @@ public class CarbondataSplitManager extends
HiveSplitManager {
statistic = new QueryStatistic();
carbonTableReader.setQueryId(queryId);
- TupleDomain<HiveColumnHandle> predicate =
- (TupleDomain<HiveColumnHandle>)
hiveTable.getCompactEffectivePredicate();
+ TupleDomain<HiveColumnHandle> predicate =
hiveTableHandle.getCompactEffectivePredicate();
Configuration configuration = this.hdfsEnvironment.getConfiguration(
new HdfsEnvironment.HdfsContext(session,
schemaTableName.getSchemaName(),
schemaTableName.getTableName()), new Path(location));
@@ -146,10 +158,8 @@ public class CarbondataSplitManager extends
HiveSplitManager {
carbonTableReader.getCarbonCache(schemaTableName, location,
configuration);
Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
try {
-
List<CarbonLocalMultiBlockSplit> splits =
- carbonTableReader.getInputSplits(cache, filters, predicate,
configuration);
-
+ carbonTableReader.getInputSplits(cache, filters, filteredPartitions,
configuration);
ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
long index = 0;
for (CarbonLocalMultiBlockSplit split : splits) {
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
index de59a9f..45c82f3 100755
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -41,15 +41,12 @@ import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.PartitionInfo;
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.reader.ThriftReader;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -245,7 +242,7 @@ public class CarbonTableReader {
*
* @param tableCacheModel cached table
* @param filters carbonData filters
- * @param constraints presto filters
+ * @param filteredPartitions matched partitionSpec for the filter
* @param config hadoop conf
* @return list of multiblock split
* @throws IOException
@@ -253,7 +250,7 @@ public class CarbonTableReader {
public List<CarbonLocalMultiBlockSplit> getInputSplits(
CarbonTableCacheModel tableCacheModel,
Expression filters,
- TupleDomain<HiveColumnHandle> constraints,
+ List<PartitionSpec> filteredPartitions,
Configuration config) throws IOException {
List<CarbonLocalInputSplit> result = new ArrayList<>();
List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
@@ -270,15 +267,6 @@ public class CarbonTableReader {
CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
JobConf jobConf = new JobConf(config);
- List<PartitionSpec> filteredPartitions = new ArrayList<>();
-
- PartitionInfo partitionInfo = carbonTable.getPartitionInfo();
- LoadMetadataDetails[] loadMetadataDetails = null;
- if (partitionInfo != null && partitionInfo.getPartitionType() ==
PartitionType.NATIVE_HIVE) {
- loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
- CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
- filteredPartitions = findRequiredPartitions(constraints, carbonTable,
loadMetadataDetails);
- }
try {
CarbonTableInputFormat.setTableInfo(config, tableInfo);
CarbonTableInputFormat<Object> carbonTableInputFormat =
diff --git
a/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala
b/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala
new file mode 100644
index 0000000..e31ac70
--- /dev/null
+++
b/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto.server
+
+import com.facebook.presto.jdbc.PrestoArray
+
+object PrestoTestUtil {
+
+ // this method depends on prestodb jdbc PrestoArray class
+ def validateArrayOfPrimitiveTypeData(actualResult: List[Map[String, Any]],
+ longChar: String): Unit = {
+ for (row <- 0 to 1) {
+ val column1 = actualResult(row)("stringfield")
+ if (column1 == "row1") {
+ val column2 = actualResult(row)("arraybyte")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column3 = actualResult(row)("arrayshort")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column4 = actualResult(row)("arrayint")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ assert(column2(0) == null)
+ assert(column3(0) == null)
+ assert(column4(0) == null)
+ } else if (column1 == "row2") {
+ val column2 = actualResult(row)("arrayint")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ if (column2.sameElements(Array(4))) {
+ val column3 = actualResult(row)("arraybyte")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column4 = actualResult(row)("arrayshort")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column5 = actualResult(row)("arraylong")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column6 = actualResult(row)("arrayfloat")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column7 = actualResult(row)("arraydouble")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column8 = actualResult(row)("arraybinary")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column9 = actualResult(row)("arraydate")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column10 = actualResult(row)("arraytimestamp")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column11 = actualResult(row)("arrayboolean")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column12 = actualResult(row)("arrayvarchar")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column13 = actualResult(row)("arraydecimal")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column14 = actualResult(row)("arraystring")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+
+ assert(column3.sameElements(Array(3, 5, 4)))
+ assert(column4.sameElements(Array(4, 5, 6)))
+ assert(column5.sameElements(Array(2L, 59999999L, 99999999999L)))
+ assert(column6.sameElements(Array(5.4646f, 5.55f, 0.055f)))
+ assert(column7.sameElements(Array(5.46464646464, 5.55, 0.055)))
+ assert(column8(0).asInstanceOf[Array[Byte]].length == 118198)
+ assert(column9.sameElements(Array("2019-03-02", "2020-03-02",
"2021-04-02")))
+ assert(column10.sameElements(Array("2019-02-12 03:03:34.000",
+ "2020-02-12 03:03:34.000",
+ "2021-03-12 03:03:34.000")))
+ assert(column11.sameElements(Array(true, false)))
+ assert(column12.sameElements(Array(longChar)))
+ assert(column13.sameElements(Array("999.23", "0.12")))
+ assert(column14.sameElements(Array("japan", "china", "iceland")))
+ }
+ }
+ }
+ }
+}
diff --git
a/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala
b/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala
new file mode 100644
index 0000000..63d31cd
--- /dev/null
+++
b/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto.server
+
+import io.prestosql.jdbc.PrestoArray
+
+object PrestoTestUtil {
+
+ // this method depends on prestosql jdbc PrestoArray class
+ def validateArrayOfPrimitiveTypeData(actualResult: List[Map[String, Any]],
+ longChar: String): Unit = {
+ for (row <- 0 to 1) {
+ val column1 = actualResult(row)("stringfield")
+ if (column1 == "row1") {
+ val column2 = actualResult(row)("arraybyte")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column3 = actualResult(row)("arrayshort")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column4 = actualResult(row)("arrayint")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ assert(column2(0) == null)
+ assert(column3(0) == null)
+ assert(column4(0) == null)
+ } else if (column1 == "row2") {
+ val column2 = actualResult(row)("arrayint")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ if (column2.sameElements(Array(4))) {
+ val column3 = actualResult(row)("arraybyte")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column4 = actualResult(row)("arrayshort")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column5 = actualResult(row)("arraylong")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column6 = actualResult(row)("arrayfloat")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column7 = actualResult(row)("arraydouble")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column8 = actualResult(row)("arraybinary")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column9 = actualResult(row)("arraydate")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column10 = actualResult(row)("arraytimestamp")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column11 = actualResult(row)("arrayboolean")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column12 = actualResult(row)("arrayvarchar")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column13 = actualResult(row)("arraydecimal")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+ val column14 = actualResult(row)("arraystring")
+ .asInstanceOf[PrestoArray]
+ .getArray()
+ .asInstanceOf[Array[Object]]
+
+ assert(column3.sameElements(Array(3, 5, 4)))
+ assert(column4.sameElements(Array(4, 5, 6)))
+ assert(column5.sameElements(Array(2L, 59999999L, 99999999999L)))
+ assert(column6.sameElements(Array(5.4646f, 5.55f, 0.055f)))
+ assert(column7.sameElements(Array(5.46464646464, 5.55, 0.055)))
+ assert(column8(0).asInstanceOf[Array[Byte]].length == 118198)
+ assert(column9.sameElements(Array("2019-03-02", "2020-03-02",
"2021-04-02")))
+ assert(column10.sameElements(Array("2019-02-12 03:03:34.000",
+ "2020-02-12 03:03:34.000",
+ "2021-03-12 03:03:34.000")))
+ assert(column11.sameElements(Array(true, false)))
+ assert(column12.sameElements(Array(longChar)))
+ assert(column13.sameElements(Array("999.23", "0.12")))
+ assert(column14.sameElements(Array("japan", "china", "iceland")))
+ }
+ }
+ }
+ }
+}
diff --git
a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
index f5c5629..611f33d 100644
---
a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
+++
b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
@@ -32,10 +32,9 @@ import
org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.{DataTypes, Field,
StructField}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.presto.server.PrestoServer
+import org.apache.carbondata.presto.server.{PrestoServer, PrestoTestUtil}
import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
-import io.prestosql.jdbc.PrestoArray
class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with
BeforeAndAfterAll with BeforeAndAfterEach {
@@ -617,46 +616,7 @@ class PrestoTestNonTransactionalTableFiles extends
FunSuiteLike with BeforeAndAf
.executeQuery("select * from files5 ")
assert(actualResult.size == 2)
- for( row <- 0 to 1) {
- var column1 = actualResult(row)("stringfield")
- if(column1 == "row1") {
- var column2 =
actualResult(row)("arraybyte").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column3 =
actualResult(row)("arrayshort").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column4 =
actualResult(row)("arrayint").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- assert(column2(0) == null)
- assert(column3(0) == null)
- assert(column4(0) == null)
- } else if(column1 == "row2") {
- var column2 =
actualResult(row)("arrayint").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- if (column2.sameElements(Array(4))) {
- var column3 =
actualResult(row)("arraybyte").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column4 =
actualResult(row)("arrayshort").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column5 =
actualResult(row)("arraylong").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column6 =
actualResult(row)("arrayfloat").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column7 =
actualResult(row)("arraydouble").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column8 =
actualResult(row)("arraybinary").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column9 =
actualResult(row)("arraydate").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column10 =
actualResult(row)("arraytimestamp").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column11 =
actualResult(row)("arrayboolean").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column12 =
actualResult(row)("arrayvarchar").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column13 =
actualResult(row)("arraydecimal").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
- var column14 =
actualResult(row)("arraystring").asInstanceOf[PrestoArray].getArray().asInstanceOf[Array[Object]]
-
- assert(column3.sameElements(Array(3,5,4)))
- assert(column4.sameElements(Array(4,5,6)))
- assert(column5.sameElements(Array(2L,59999999L, 99999999999L)))
- assert(column6.sameElements(Array(5.4646f,5.55f,0.055f)))
- assert(column7.sameElements(Array(5.46464646464,5.55,0.055)))
- assert(column8(0).asInstanceOf[Array[Byte]].size == 118198)
-
assert(column9.sameElements(Array("2019-03-02","2020-03-02","2021-04-02")))
- assert(column10.sameElements(Array("2019-02-12
03:03:34.000","2020-02-12 03:03:34.000","2021-03-12 03:03:34.000")))
- assert(column11.sameElements(Array(true,false)))
- assert(column12.sameElements(Array(longChar)))
- assert(column13.sameElements(Array("999.23","0.12")))
- assert(column14.sameElements(Array("japan","china","iceland")))
- }
- }
- }
+ PrestoTestUtil.validateArrayOfPrimitiveTypeData(actualResult, longChar)
FileUtils.deleteDirectory(new File(writerPathComplex))
}