Repository: carbondata Updated Branches: refs/heads/master c40d85478 -> 2b7c8b374
[CARBONDATA-2925]Wrong data displayed for spark file format if carbon has more blocklets Issue:- if Carbon file has multiple blocklet ,in select query wrong data displayed. Root Cause :- it is showing records of only for 1st Blocklet and other blocklet in that block is getting skipped. This is because default blocklet is 0 and CarbonFileformat create blockletInfo with default configuration (not changed blockletID). Solution :- Set default blockletID to -1 so that all blocklets are considered. refer org.apache.carbondata.core.scan.executor.impl.AbstractQueryExecutor#readAndFillBlockletInfo This closes #2703 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2b7c8b37 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2b7c8b37 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2b7c8b37 Branch: refs/heads/master Commit: 2b7c8b3743de1cccf3a2943ceb345684b7fb8d69 Parents: c40d854 Author: BJangir <[email protected]> Authored: Mon Sep 10 19:35:33 2018 +0530 Committer: ravipesala <[email protected]> Committed: Fri Sep 14 19:35:02 2018 +0530 ---------------------------------------------------------------------- .../core/indexstore/BlockletDetailInfo.java | 3 +- ...tCreateTableUsingSparkCarbonFileFormat.scala | 60 +++++++++++++++++++- 2 files changed, 59 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b7c8b37/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java index 47455c7..973a240 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java @@ -51,7 +51,8 @@ public class BlockletDetailInfo implements Serializable, Writable { private short versionNumber; - private short blockletId; + // default blockletId should be -1,which means consider all the blocklets in block + private short blockletId = -1; private int[] dimLens; http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b7c8b37/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala index 43f04b8..6a803fc 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -18,18 +18,24 @@ package org.apache.spark.sql.carbondata.datasource import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} import org.apache.commons.io.FileUtils import org.apache.commons.lang.RandomStringUtils import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark.util.SparkUtil import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _} - import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants} import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.Row +import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndAfterAll { @@ -322,6 +328,54 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA assert(new File(filePath).exists()) cleanTestData() } + test("Read data having multi blocklet ") { + buildTestDataMuliBlockLet(700000) + assert(new File(writerPath).exists()) + spark.sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + //data source file format + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """) + } else { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTable USING carbon LOCATION + |'$writerPath' """.stripMargin) + } + spark.sql("select count(*) from sdkOutputTable").show(false) + val result=checkAnswer(spark.sql("select count(*) from sdkOutputTable"),Seq(Row(700000))) + if(result.isDefined){ + assert(false,result.get) + } + spark.sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + def buildTestDataMuliBlockLet(records :Int): Unit ={ + FileUtils.deleteDirectory(new File(writerPath)) + val fields=new Array[Field](8) + fields(0)=new Field("myid",DataTypes.INT); + fields(1)=new Field("event_id",DataTypes.STRING); + fields(2)=new Field("eve_time",DataTypes.DATE); + fields(3)=new Field("ingestion_time",DataTypes.TIMESTAMP); + fields(4)=new Field("alldate",DataTypes.createArrayType(DataTypes.DATE)); + fields(5)=new Field("subject",DataTypes.STRING); + fields(6)=new Field("from_email",DataTypes.STRING); + fields(7)=new Field("sal",DataTypes.DOUBLE); + import scala.collection.JavaConverters._ + try{ + val options=Map("bad_records_action"->"FORCE","complex_delimiter_level_1"->"$").asJava + val writer=CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16).sortBy(Array("myid","ingestion_time","event_id")).withLoadOptions(options).buildWriterForCSVInput(new Schema(fields),spark.sessionState.newHadoopConf()) + val timeF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val date_F=new SimpleDateFormat("yyyy-MM-dd") + for(i<-1 to records){ + val time=new Date(System.currentTimeMillis()) + writer.write(Array(""+i,"event_"+i,""+date_F.format(time),""+timeF.format(time),""+date_F.format(time)+"$"+date_F.format(time),"Subject_0","FromEmail",""+new Random().nextDouble())) + } + writer.close() + } + } test("Test with long string columns") { FileUtils.deleteDirectory(new File(writerPath))
