Repository: incubator-carbondata Updated Branches: refs/heads/master a473f553e -> 5279d5b15
add codec and testcase Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/11198c4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/11198c4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/11198c4a Branch: refs/heads/master Commit: 11198c4a845da1ca460819da02a01bb7f719f575 Parents: a473f55 Author: jackylk <jacky.li...@huawei.com> Authored: Tue Sep 13 22:44:47 2016 +0800 Committer: Vimal-Das <vimal...@apache.org> Committed: Wed Oct 19 11:10:37 2016 +0530 ---------------------------------------------------------------------- .../datastorage/store/impl/FileFactory.java | 8 ++++++++ .../carbondata/spark/csv/CarbonTextFile.scala | 5 ++++- .../spark/rdd/CarbonDataRDDFactory.scala | 4 +++- integration/spark/src/test/resources/sample | 5 +++++ .../spark/src/test/resources/sample.csv.bz2 | Bin 0 -> 114 bytes .../dataload/TestLoadDataGeneral.scala | 20 ++++++++++++++++++- .../csvreaderstep/UnivocityCsvParser.java | 3 ++- 7 files changed, 41 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java index 854ec8d..07e0b90 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java @@ -37,12 +37,14 @@ import org.apache.carbondata.core.datastorage.store.filesystem.LocalCarbonFile; import org.apache.carbondata.core.datastorage.store.filesystem.ViewFSCarbonFile; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; public final class FileFactory { @@ -127,11 +129,14 @@ public final class FileFactory { throws IOException { path = path.replace("\\", "/"); boolean gzip = path.endsWith(".gz"); + boolean bzip2 = path.endsWith(".bz2"); InputStream stream; switch (fileType) { case LOCAL: if (gzip) { stream = new GZIPInputStream(new FileInputStream(path)); + } else if (bzip2) { + stream = new BZip2CompressorInputStream(new FileInputStream(path)); } else { stream = new FileInputStream(path); } @@ -148,6 +153,9 @@ public final class FileFactory { if (gzip) { GzipCodec codec = new GzipCodec(); stream = codec.createInputStream(stream); + } else if (bzip2) { + BZip2Codec codec = new BZip2Codec(); + stream = codec.createInputStream(stream); } break; default: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala index c703a81..3589823 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala @@ -36,7 +36,10 @@ private[csv] object CarbonTextFile { val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location) hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true) - hadoopConfiguration.set("io.compression.codecs", "org.apache.hadoop.io.compress.GzipCodec") + hadoopConfiguration.set("io.compression.codecs", + """org.apache.hadoop.io.compress.GzipCodec, + org.apache.hadoop.io.compress.DefaultCodec, + org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) CarbonDataRDDFactory.configSplitMaxSize(sc, location, hadoopConfiguration) new NewHadoopRDD[LongWritable, Text]( http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 1bc7ad0..511f452 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -882,7 +882,9 @@ object CarbonDataRDDFactory extends Logging { hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths) hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true") hadoopConfiguration.set("io.compression.codecs", - "org.apache.hadoop.io.compress.GzipCodec") + """org.apache.hadoop.io.compress.GzipCodec, + org.apache.hadoop.io.compress.DefaultCodec, + org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/test/resources/sample ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/resources/sample b/integration/spark/src/test/resources/sample new file mode 100644 index 0000000..7c57de7 --- /dev/null +++ b/integration/spark/src/test/resources/sample @@ -0,0 +1,5 @@ +id,name,city,age +1,david,shenzhen,31 +2,eason,shenzhen,27 +3,jarry,wuhan,35 +3,jarry,Bangalore,35 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/test/resources/sample.csv.bz2 ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/resources/sample.csv.bz2 b/integration/spark/src/test/resources/sample.csv.bz2 new file mode 100644 index 0000000..0c2417d Binary files /dev/null and b/integration/spark/src/test/resources/sample.csv.bz2 differ http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala index 4446b5e..7e102df 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala @@ -52,12 +52,30 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll { ) } + test("test data loading CSV file without extension name") { + val testData = currentDirectory + "/src/test/resources/sample" + sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest") + checkAnswer( + sql("SELECT COUNT(*) FROM loadtest"), + Seq(Row(8)) + ) + } + test("test data loading GZIP compressed CSV file") { val testData = currentDirectory + "/src/test/resources/sample.csv.gz" sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest") checkAnswer( sql("SELECT COUNT(*) FROM loadtest"), - Seq(Row(8)) + Seq(Row(12)) + ) + } + + test("test data loading BZIP2 compressed CSV file") { + val testData = currentDirectory + "/src/test/resources/sample.csv.bz2" + sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest") + checkAnswer( + sql("SELECT COUNT(*) FROM loadtest"), + Seq(Row(16)) ) } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java index a677a50..2fa5219 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java +++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java @@ -161,7 +161,8 @@ public class UnivocityCsvParser { String path = this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath(); FileType fileType = FileFactory.getFileType(path); - if (path.endsWith(".gz")) { + if (path.endsWith(".gz") || + path.endsWith(".bz2")) { DataInputStream dataInputStream = FileFactory.getDataInputStream(path, fileType, bufferSize); inputStreamReader = new BufferedReader(new InputStreamReader(dataInputStream)); } else {