[CARBONDATA-1469] Optimizations for Presto Integration This closes #1348
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1551a7c7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1551a7c7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1551a7c7 Branch: refs/heads/branch-1.2 Commit: 1551a7c7d4046964a299d01a927b2900a84dc2f3 Parents: 0ab928e Author: Bhavya <bha...@knoldus.com> Authored: Mon Sep 11 16:33:07 2017 +0530 Committer: CHEN LIANG <chenliang...@huawei.com> Committed: Tue Sep 12 07:08:37 2017 +0800 ---------------------------------------------------------------------- integration/presto/pom.xml | 536 ++++++++++++------- .../carbondata/presto/PrestoFilterUtil.java | 75 ++- .../readers/DecimalSliceStreamReader.java | 58 +- .../presto/readers/DoubleStreamReader.java | 27 +- .../presto/readers/IntegerStreamReader.java | 28 +- .../presto/readers/LongStreamReader.java | 27 +- .../presto/readers/ShortStreamReader.java | 80 +++ .../presto/readers/StreamReaders.java | 6 + .../presto/readers/TimestampStreamReader.java | 79 +++ 9 files changed, 682 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/pom.xml ---------------------------------------------------------------------- diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml index 562718f..617ce93 100644 --- a/integration/presto/pom.xml +++ b/integration/presto/pom.xml @@ -15,7 +15,9 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -37,49 +39,223 @@ <dependencies> <dependency> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - <version>0.9.3</version> - </dependency> - - <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-core</artifactId> + <artifactId>carbondata-hadoop</artifactId> <version>${project.version}</version> <exclusions> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> + <artifactId>spark-network-shuffle_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sketch_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>com.esotericsoftware</groupId> + <artifactId>minlog</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + </exclusion> + <exclusion> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.py4j</groupId> + <artifactId>py4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.spark-project.spark</groupId> + <artifactId>unused</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-tags_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-servlet</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-servlet-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-servlet-core</artifactId> </exclusion> - </exclusions> - </dependency> - - - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-processing</artifactId> - <version>${project.version}</version> - <exclusions> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + </exclusion> + <exclusion> + <groupId>com.twitter</groupId> + <artifactId>chill_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-jvm</artifactId> + </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-json</artifactId> + </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + <exclusion> + <groupId>net.java.dev</groupId> + <artifactId>jets3t</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.xbean</groupId> + <artifactId>xbean-asm5-shaded</artifactId> + </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> + <artifactId>spark-launcher_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-common_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>com.ning</groupId> + <artifactId>compress-lzf</artifactId> + </exclusion> + <exclusion> + <groupId>org.roaringbitmap</groupId> + <artifactId>RoaringBitmap</artifactId> + </exclusion> + <exclusion> + <groupId>com.thoughtworks.paranamer</groupId> + <artifactId>paranamer</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang..modules</groupId> + <artifactId>parser-combinators_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang..modules</groupId> + <artifactId>scala-xml_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.11</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.py4</groupId> + <artifactId>py4j</artifactId> + </exclusion> + <exclusion> + <groupId>net.razorvine</groupId> + <artifactId>pyrolite</artifactId> + </exclusion> + <exclusion> + <groupId>com.clearspring.analytics</groupId> + <artifactId>stream</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.ivy</groupId> + <artifactId>ivy</artifactId> + </exclusion> + <exclusion> + <groupId>oro</groupId> + <artifactId>oro</artifactId> </exclusion> </exclusions> </dependency> <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-hadoop</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> <groupId>io.airlift</groupId> <artifactId>bootstrap</artifactId> <version>0.144</version> @@ -87,6 +263,38 @@ <exclusions> <exclusion> <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>javax.xml.bind</groupId> + <artifactId>jaxb-api</artifactId> + </exclusion> + <exclusion> + <groupId>aopalliance</groupId> + <artifactId>aopalliance</artifactId> + </exclusion> + <exclusion> + <groupId>org.weakref</groupId> + <artifactId>jmxutils</artifactId> + </exclusion> + <exclusion> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>annotations</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> </exclusions> @@ -98,21 +306,6 @@ <version>0.144</version> <!--<scope>provided</scope>--> </dependency> - - <dependency> - <groupId>io.airlift</groupId> - <artifactId>log</artifactId> - <version>0.144</version> - <!--<scope>provided</scope>--> - </dependency> - - <dependency> - <groupId>io.airlift</groupId> - <artifactId>slice</artifactId> - <version>0.29</version> - <scope>provided</scope> - </dependency> - <dependency> <groupId>io.airlift</groupId> <artifactId>units</artifactId> @@ -126,19 +319,6 @@ <version>2.6.0</version> <scope>provided</scope> </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>18.0</version> - </dependency> - - <dependency> - <groupId>com.google.inject</groupId> - <artifactId>guice</artifactId> - <version>3.0</version> - </dependency> - <!--presto integrated--> <dependency> <groupId>com.facebook.presto</groupId> @@ -146,152 +326,140 @@ <version>${presto.version}</version> <scope>provided</scope> </dependency> - + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.5</version> + </dependency> <dependency> <groupId>com.facebook.presto.hadoop</groupId> <artifactId>hadoop-apache2</artifactId> <version>2.7.3-1</version> </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> - <version>2.1.0</version> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.4.1</version> <exclusions> <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> + <groupId>org.tukaani</groupId> + <artifactId>xz</artifactId> </exclusion> </exclusions> </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 --> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-catalyst_2.11</artifactId> - <version>2.1.0</version> - </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> - <version>2.1.0</version> - <exclusions> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </exclusion> - </exclusions> - </dependency> </dependencies> - <build> - <plugins> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.18</version> - <!-- Note config is repeated in scalatest config --> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> - <systemProperties> - <java.awt.headless>true</java.awt.headless> - </systemProperties> - <failIfNoTests>false</failIfNoTests> - </configuration> - </plugin> + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.18</version> + <!-- Note config is repeated in scalatest config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + </systemProperties> + <failIfNoTests>false</failIfNoTests> + </configuration> + </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <version>2.17</version> - <configuration> - <skip>true</skip> - </configuration> - </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-enforcer-plugin</artifactId> - <version>1.4.1</version> - <configuration> - <skip>true</skip> - </configuration> - </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.4.1</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> - <plugin> - <groupId>com.ning.maven.plugins</groupId> - <artifactId>maven-dependency-versions-check-plugin</artifactId> - <configuration> - <skip>true</skip> - <failBuildInCaseOfConflict>false</failBuildInCaseOfConflict> - </configuration> - </plugin> + <plugin> + <groupId>com.ning.maven.plugins</groupId> + <artifactId>maven-dependency-versions-check-plugin</artifactId> + <configuration> + <skip>true</skip> + <failBuildInCaseOfConflict>false</failBuildInCaseOfConflict> + </configuration> + </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <configuration> - <skip>false</skip> - </configuration> - </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <skip>false</skip> + </configuration> + </plugin> - <plugin> - <groupId>com.ning.maven.plugins</groupId> - <artifactId>maven-duplicate-finder-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> + <plugin> + <groupId>com.ning.maven.plugins</groupId> + <artifactId>maven-duplicate-finder-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> - <plugin> - <groupId>io.takari.maven.plugins</groupId> - <artifactId>presto-maven-plugin</artifactId> - <version>0.1.12</version> - <extensions>true</extensions> - </plugin> + <plugin> + <groupId>io.takari.maven.plugins</groupId> + <artifactId>presto-maven-plugin</artifactId> + <version>0.1.12</version> + <extensions>true</extensions> + </plugin> - <plugin> - <groupId>pl.project13.maven</groupId> - <artifactId>git-commit-id-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <version>2.15.2</version> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - <execution> - <id>testCompile</id> - <goals> - <goal>testCompile</goal> - </goals> - <phase>test</phase> - </execution> - <execution> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> + <plugin> + <groupId>pl.project13.maven</groupId> + <artifactId>git-commit-id-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java index 9a5a5cb..a958e63 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java @@ -17,6 +17,8 @@ package org.apache.carbondata.presto; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; @@ -75,8 +77,8 @@ public class PrestoFilterUtil { else if (colType == VarcharType.VARCHAR) return DataType.STRING; else if (colType == DateType.DATE) return DataType.DATE; else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP; - else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), - carbondataColumnHandle.getScale())) return DataType.DECIMAL; + else if (colType.equals(DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), + carbondataColumnHandle.getScale()))) return DataType.DECIMAL; else return DataType.STRING; } @@ -104,13 +106,12 @@ public class PrestoFilterUtil { checkArgument(domain.getType().isOrderable(), "Domain type must be orderable"); List<Object> singleValues = new ArrayList<>(); - List<Expression> disjuncts = new ArrayList<>(); + Map<Object, List<Expression>> valueExpressionMap = new HashMap<>(); for (Range range : domain.getValues().getRanges().getOrderedRanges()) { if (range.isSingleValue()) { Object value = ConvertDataByType(range.getLow().getValue(), type); singleValues.add(value); } else { - List<Expression> rangeConjuncts = new ArrayList<>(); if (!range.getLow().isLowerUnbounded()) { Object value = ConvertDataByType(range.getLow().getValue(), type); switch (range.getLow().getBound()) { @@ -120,14 +121,20 @@ public class PrestoFilterUtil { } else { GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(greater); + if(valueExpressionMap.get(value) == null) { + valueExpressionMap.put(value, new ArrayList<>()); + } + valueExpressionMap.get(value).add(greater); } break; case EXACTLY: GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(greater); + if(valueExpressionMap.get(value) == null) { + valueExpressionMap.put(value, new ArrayList<>()); + } + valueExpressionMap.get(value).add(greater); break; case BELOW: throw new IllegalArgumentException("Low marker should never use BELOW bound"); @@ -143,18 +150,23 @@ public class PrestoFilterUtil { case EXACTLY: LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(less); + if(valueExpressionMap.get(value) == null) { + valueExpressionMap.put(value, new ArrayList<>()); + } + valueExpressionMap.get(value).add(less); break; case BELOW: LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(less2); + if(valueExpressionMap.get(value) == null) { + valueExpressionMap.put(value, new ArrayList<>()); + } + valueExpressionMap.get(value).add(less2); break; default: throw new AssertionError("Unhandled bound: " + range.getHigh().getBound()); } } - disjuncts.addAll(rangeConjuncts); } } if (singleValues.size() == 1) { @@ -174,19 +186,34 @@ public class PrestoFilterUtil { .map((a) -> new LiteralExpression(ConvertDataByType(a, type), coltype)) .collect(Collectors.toList()); candidates = new ListExpression(exs); - filters.add(new InExpression(colExpression, candidates)); - } else if (disjuncts.size() > 0) { - if (disjuncts.size() > 1) { - Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1)); - if (disjuncts.size() > 2) { - for (int i = 2; i < disjuncts.size(); i++) { - filters.add(new AndExpression(finalFilters, disjuncts.get(i))); + } else if (valueExpressionMap.size() > 0) { + List<Expression> valuefilters = new ArrayList<>(); + Expression finalFilters = null; + List<Expression> expressions; + for (Map.Entry<Object, List<Expression>> entry : valueExpressionMap.entrySet()) { + expressions = valueExpressionMap.get(entry.getKey()); + if (expressions.size() == 1) { + finalFilters = expressions.get(0); + } else if (expressions.size() >= 2) { + finalFilters = new OrExpression(expressions.get(0), expressions.get(1)); + for (int i = 2; i < expressions.size(); i++) { + finalFilters = new OrExpression(finalFilters, expressions.get(i)); } - } else { - filters.add(finalFilters); } - } else if (disjuncts.size() == 1) filters.add(disjuncts.get(0)); + valuefilters.add(finalFilters); + } + + if(valuefilters.size() == 1){ + finalFilters = valuefilters.get(0); + } else if (valuefilters.size() >= 2) { + finalFilters = new AndExpression(valuefilters.get(0), valuefilters.get(1)); + for (int i = 2; i < valuefilters.size() ; i++) { + finalFilters = new AndExpression(finalFilters, valuefilters.get(i)); + } + } + + filters.add(finalFilters); } } @@ -196,7 +223,7 @@ public class PrestoFilterUtil { finalFilters = new AndExpression(tmp.get(0), tmp.get(1)); if (tmp.size() > 2) { for (int i = 2; i < tmp.size(); i++) { - finalFilters = new OrExpression(finalFilters, tmp.get(i)); + finalFilters = new AndExpression(finalFilters, tmp.get(i)); } } } else if (tmp.size() == 1) finalFilters = tmp.get(0); @@ -223,6 +250,14 @@ public class PrestoFilterUtil { Date date = c.getTime(); return date.getTime() * 1000; } + else if (type instanceof DecimalType) { + if(rawdata instanceof Double) { + return new BigDecimal((Double) rawdata); + } else if (rawdata instanceof Long) { + return new BigDecimal(new BigInteger(String.valueOf(rawdata)), + ((DecimalType) type).getScale()); + } + } return rawdata; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java index 89d4e60..6612ab0 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java @@ -66,20 +66,17 @@ public class DecimalSliceStreamReader extends AbstractStreamReader { int scale = ((DecimalType)type).getScale(); int precision = ((DecimalType)type).getPrecision(); if (columnVector != null) { - for(int i = 0; i < numberOfRows ; i++ ){ - if(columnVector.isNullAt(i)) { - builder.appendNull(); + if(columnVector.anyNullsSet()) + { + handleNullInVector(type, numberOfRows, builder, scale, precision); + } else { + if(isShortDecimal(type)) { + populateShortDecimalVector(type, numberOfRows, builder, scale, precision); } else { - Slice slice = - getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type); - if (isShortDecimal(type)) { - type.writeLong(builder, parseLong((DecimalType) type, slice, 0, slice.length())); - } else { - type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); - } + populateLongDecimalVector(type, numberOfRows, builder, scale, precision); } } - } + } } else { if (streamData != null) { @@ -182,4 +179,43 @@ public class DecimalSliceStreamReader extends AbstractStreamReader { return decimal; } + + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder, int scale, + int precision) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + if (isShortDecimal(type)) { + long rescaledDecimal = Decimals + .rescale(columnVector.getDecimal(i, precision, scale).toLong(), + columnVector.getDecimal(i, precision, scale).scale(), scale); + type.writeLong(builder, rescaledDecimal); + } else { + Slice slice = + getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type); + type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); + } + } + } + } + + private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder, + int scale, int precision) { + for (int i = 0; i < numberOfRows; i++) { + BigDecimal decimalValue = columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(); + long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(), + decimalValue.scale(), scale); + type.writeLong(builder, rescaledDecimal); + } + } + + private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder, + int scale, int precision) { + for (int i = 0; i < numberOfRows; i++) { + Slice slice = getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type); + type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java index cacf5ce..2b90a8d 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java @@ -47,12 +47,11 @@ public class DoubleStreamReader extends AbstractStreamReader { numberOfRows = batchSize; builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); if (columnVector != null) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeDouble(builder, columnVector.getDouble(i)); - } + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); } } } else { @@ -68,4 +67,20 @@ public class DoubleStreamReader extends AbstractStreamReader { return builder.build(); } + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeDouble(builder, columnVector.getDouble(i)); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeDouble(builder, columnVector.getDouble(i)); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java index 13280c8..ccc0192 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java @@ -41,13 +41,11 @@ public class IntegerStreamReader extends AbstractStreamReader { numberOfRows = batchSize; builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); if (columnVector != null) { - for(int i = 0; i < numberOfRows ; i++ ){ - if(columnVector.isNullAt(i)){ - builder.appendNull(); - } else { - type.writeLong(builder, ((Integer)columnVector.getInt(i)).longValue()); - } - + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); } } @@ -64,4 +62,20 @@ public class IntegerStreamReader extends AbstractStreamReader { return builder.build(); } + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeLong(builder, ((Integer) columnVector.getInt(i)).longValue()); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, columnVector.getInt(i)); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java index 9d602a6..5081b32 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java @@ -37,12 +37,11 @@ public class LongStreamReader extends AbstractStreamReader { numberOfRows = batchSize; builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); if (columnVector != null) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeLong(builder, columnVector.getLong(i)); - } + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); } } @@ -59,4 +58,20 @@ public class LongStreamReader extends AbstractStreamReader { return builder.build(); } + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeLong(builder, columnVector.getLong(i)); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, columnVector.getLong(i)); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java new file mode 100644 index 0000000..59d8e96 --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.readers; + +import java.io.IOException; + +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.block.BlockBuilder; +import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.Type; + +public class ShortStreamReader extends AbstractStreamReader { + + + public ShortStreamReader( ) { + + } + + public Block readBlock(Type type) + throws IOException + { + int numberOfRows = 0; + BlockBuilder builder = null; + if(isVectorReader) { + numberOfRows = batchSize; + builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); + if (columnVector != null) { + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); + } + } + + } else { + numberOfRows = streamData.length; + builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); + if (streamData != null) { + for(int i = 0; i < numberOfRows ; i++ ){ + type.writeLong(builder,(Short)streamData[i]); + } + } + } + + return builder.build(); + } + + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeLong(builder, (columnVector.getShort(i))); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, (columnVector.getShort(i))); + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java index abd8787..86f863a 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java @@ -23,6 +23,8 @@ import com.facebook.presto.spi.block.SliceArrayBlock; import com.facebook.presto.spi.type.DateType; import com.facebook.presto.spi.type.DecimalType; import com.facebook.presto.spi.type.IntegerType; +import com.facebook.presto.spi.type.SmallintType; +import com.facebook.presto.spi.type.TimestampType; import com.facebook.presto.spi.type.Type; import io.airlift.slice.Slice; @@ -44,6 +46,10 @@ public final class StreamReaders { return new IntegerStreamReader(); } else if (type instanceof DecimalType) { return new DecimalSliceStreamReader(); + } else if (type instanceof SmallintType) { + return new ShortStreamReader(); + } else if (type instanceof TimestampType) { + return new TimestampStreamReader(); } return new LongStreamReader(); } else if (javaType == double.class) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java new file mode 100644 index 0000000..8ea3efb --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.readers; + +import java.io.IOException; + +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.block.BlockBuilder; +import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.Type; + +public class TimestampStreamReader extends AbstractStreamReader { + + private int TIMESTAMP_DIVISOR = 1000; + + public TimestampStreamReader() { + + } + + public Block readBlock(Type type) throws IOException { + int numberOfRows = 0; + BlockBuilder builder = null; + if (isVectorReader) { + numberOfRows = batchSize; + builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); + if (columnVector != null) { + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); + } + } + + } else { + numberOfRows = streamData.length; + builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); + if (streamData != null) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, (Long) streamData[i]); + } + } + } + + return builder.build(); + } + + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeLong(builder, columnVector.getLong(i)/ TIMESTAMP_DIVISOR); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, columnVector.getLong(i)/TIMESTAMP_DIVISOR); + } + } + +}