Repository: incubator-carbondata
Updated Branches:
  refs/heads/master a473f553e -> 5279d5b15


add codec and testcase


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/11198c4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/11198c4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/11198c4a

Branch: refs/heads/master
Commit: 11198c4a845da1ca460819da02a01bb7f719f575
Parents: a473f55
Author: jackylk <jacky.li...@huawei.com>
Authored: Tue Sep 13 22:44:47 2016 +0800
Committer: Vimal-Das <vimal...@apache.org>
Committed: Wed Oct 19 11:10:37 2016 +0530

----------------------------------------------------------------------
 .../datastorage/store/impl/FileFactory.java     |   8 ++++++++
 .../carbondata/spark/csv/CarbonTextFile.scala   |   5 ++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +++-
 integration/spark/src/test/resources/sample     |   5 +++++
 .../spark/src/test/resources/sample.csv.bz2     | Bin 0 -> 114 bytes
 .../dataload/TestLoadDataGeneral.scala          |  20 ++++++++++++++++++-
 .../csvreaderstep/UnivocityCsvParser.java       |   3 ++-
 7 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
index 854ec8d..07e0b90 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -37,12 +37,14 @@ import 
org.apache.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
 import 
org.apache.carbondata.core.datastorage.store.filesystem.ViewFSCarbonFile;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.GzipCodec;
 
 public final class FileFactory {
@@ -127,11 +129,14 @@ public final class FileFactory {
       throws IOException {
     path = path.replace("\\", "/");
     boolean gzip = path.endsWith(".gz");
+    boolean bzip2 = path.endsWith(".bz2");
     InputStream stream;
     switch (fileType) {
       case LOCAL:
         if (gzip) {
           stream = new GZIPInputStream(new FileInputStream(path));
+        } else if (bzip2) {
+          stream = new BZip2CompressorInputStream(new FileInputStream(path));
         } else {
           stream = new FileInputStream(path);
         }
@@ -148,6 +153,9 @@ public final class FileFactory {
         if (gzip) {
           GzipCodec codec = new GzipCodec();
           stream = codec.createInputStream(stream);
+        } else if (bzip2) {
+          BZip2Codec codec = new BZip2Codec();
+          stream = codec.createInputStream(stream);
         }
         break;
       default:

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
index c703a81..3589823 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
@@ -36,7 +36,10 @@ private[csv] object CarbonTextFile {
     val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
     hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
     hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true)
-    hadoopConfiguration.set("io.compression.codecs", 
"org.apache.hadoop.io.compress.GzipCodec")
+    hadoopConfiguration.set("io.compression.codecs",
+      """org.apache.hadoop.io.compress.GzipCodec,
+         org.apache.hadoop.io.compress.DefaultCodec,
+         org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
 
     CarbonDataRDDFactory.configSplitMaxSize(sc, location, hadoopConfiguration)
     new NewHadoopRDD[LongWritable, Text](

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1bc7ad0..511f452 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -882,7 +882,9 @@ object CarbonDataRDDFactory extends Logging {
           hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
           hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
           hadoopConfiguration.set("io.compression.codecs",
-            "org.apache.hadoop.io.compress.GzipCodec")
+            """org.apache.hadoop.io.compress.GzipCodec,
+               org.apache.hadoop.io.compress.DefaultCodec,
+               org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
 
           configSplitMaxSize(sqlContext.sparkContext, filePaths, 
hadoopConfiguration)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/test/resources/sample
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/sample 
b/integration/spark/src/test/resources/sample
new file mode 100644
index 0000000..7c57de7
--- /dev/null
+++ b/integration/spark/src/test/resources/sample
@@ -0,0 +1,5 @@
+id,name,city,age
+1,david,shenzhen,31
+2,eason,shenzhen,27
+3,jarry,wuhan,35
+3,jarry,Bangalore,35

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/test/resources/sample.csv.bz2
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/sample.csv.bz2 
b/integration/spark/src/test/resources/sample.csv.bz2
new file mode 100644
index 0000000..0c2417d
Binary files /dev/null and 
b/integration/spark/src/test/resources/sample.csv.bz2 differ

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 4446b5e..7e102df 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -52,12 +52,30 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterAll {
     )
   }
 
+  test("test data loading CSV file without extension name") {
+    val testData = currentDirectory + "/src/test/resources/sample"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM loadtest"),
+      Seq(Row(8))
+    )
+  }
+
   test("test data loading GZIP compressed CSV file") {
     val testData = currentDirectory + "/src/test/resources/sample.csv.gz"
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
     checkAnswer(
       sql("SELECT COUNT(*) FROM loadtest"),
-      Seq(Row(8))
+      Seq(Row(12))
+    )
+  }
+
+  test("test data loading BZIP2 compressed CSV file") {
+    val testData = currentDirectory + "/src/test/resources/sample.csv.bz2"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM loadtest"),
+      Seq(Row(16))
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/11198c4a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
 
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
index a677a50..2fa5219 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
@@ -161,7 +161,8 @@ public class UnivocityCsvParser {
     String path = 
this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath();
     FileType fileType = FileFactory.getFileType(path);
 
-    if (path.endsWith(".gz")) {
+    if (path.endsWith(".gz") ||
+        path.endsWith(".bz2")) {
       DataInputStream dataInputStream = FileFactory.getDataInputStream(path, 
fileType, bufferSize);
       inputStreamReader = new BufferedReader(new 
InputStreamReader(dataInputStream));
     } else {

Reply via email to