http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
 
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 9ccc02c..755a7df 100644
--- 
a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ 
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -68,9 +68,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
   //getCanonicalPath gives path with \, but the code expects /.
   writerPath = writerPath.replace("\\", "/");
 
-  val filePath = writerPath + "/Fact/Part0/Segment_null/"
-
-  def buildTestData(persistSchema:Boolean) = {
+  def buildTestData(): Any = {
 
     FileUtils.deleteDirectory(new File(writerPath))
 
@@ -83,17 +81,9 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
       .toString()
 
     try {
-      val builder = CarbonWriter.builder().isTransactionalTable(true)
+      val builder = CarbonWriter.builder()
       val writer =
-        if (persistSchema) {
-          builder.persistSchemaFile(true)
-          
builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), 
spark
-            .sparkContext.hadoopConfiguration)
-        } else {
-          
builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), 
spark
-            .sparkContext.hadoopConfiguration)
-        }
-
+        
builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
       var i = 0
       while (i < 100) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), 
String.valueOf(i.toDouble / 2)))
@@ -126,19 +116,19 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
 
   //TO DO, need to remove segment dependency and tableIdentifier Dependency
   test("read carbondata files (sdk Writer Output) using the 
SparkCarbonFileFormat ") {
-    buildTestData(false)
-    assert(new File(filePath).exists())
+    buildTestData()
+    assert(new File(writerPath).exists())
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     //data source file format
     if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       //data source file format
-      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$filePath') """)
+      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$writerPath') """)
     } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
-           |'$filePath' """.stripMargin)
+           |'$writerPath' """.stripMargin)
     } else{
       // TO DO
     }
@@ -169,55 +159,55 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
 
     spark.sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     cleanTestData()
   }
 
   test("Running SQL directly and read carbondata files (sdk Writer Output) 
using the SparkCarbonFileFormat ") {
-    buildTestData(false)
-    assert(new File(filePath).exists())
+    buildTestData()
+    assert(new File(writerPath).exists())
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     //data source file format
     if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       //data source file format
-      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$filePath') """)
+      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$writerPath') """)
     } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
-           |'$filePath' """.stripMargin)
+           |'$writerPath' """.stripMargin)
     } else {
       // TO DO
     }
 
-    val directSQL = spark.sql(s"""select * FROM  
carbon.`$filePath`""".stripMargin)
+    val directSQL = spark.sql(s"""select * FROM  
carbon.`$writerPath`""".stripMargin)
     directSQL.show(false)
     TestUtil.checkAnswer(spark.sql("select * from sdkOutputTable"), directSQL)
 
     spark.sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     cleanTestData()
   }
 
   // TODO: Make the sparkCarbonFileFormat to work without index file
   test("Read sdk writer output file without Carbondata file should fail") {
-    buildTestData(false)
+    buildTestData()
     deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     val exception = intercept[Exception] {
       //    data source file format
       if (SparkUtil.isSparkVersionEqualTo("2.1")) {
         //data source file format
-        spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$filePath') """)
+        spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$writerPath') """)
       } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         //data source file format
         spark.sql(
           s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
-             |'$filePath' """.stripMargin)
+             |'$writerPath' """.stripMargin)
       } else{
         // TO DO
       }
@@ -226,28 +216,28 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
       .contains("CarbonData file is not present in the table location"))
 
     // drop table should not delete the files
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     cleanTestData()
   }
 
 
   test("Read sdk writer output file without any file should fail") {
-    buildTestData(false)
+    buildTestData()
     deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
     deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     val exception = intercept[Exception] {
       //data source file format
       if (SparkUtil.isSparkVersionEqualTo("2.1")) {
         //data source file format
-        spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$filePath') """)
+        spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$writerPath') """)
       } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
         //data source file format
         spark.sql(
           s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
-             |'$filePath' """.stripMargin)
+             |'$writerPath' """.stripMargin)
       } else{
         // TO DO
       }
@@ -258,13 +248,13 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
       .contains("CarbonData file is not present in the table location"))
 
     // drop table should not delete the files
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     cleanTestData()
   }
 
   test("Read sdk writer output file withSchema") {
-    buildTestData(true)
-    assert(new File(filePath).exists())
+    buildTestData()
+    assert(new File(writerPath).exists())
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     //data source file format
@@ -272,12 +262,12 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
 
     if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       //data source file format
-      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$filePath') """)
+      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$writerPath') """)
     } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
-           |'$filePath' """.stripMargin)
+           |'$writerPath' """.stripMargin)
     } else{
       // TO DO
     }
@@ -309,24 +299,24 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
     spark.sql("DROP TABLE sdkOutputTable")
 
     // drop table should not delete the files
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     cleanTestData()
   }
 
   test("Read sdk writer output file without index file should not fail") {
-    buildTestData(false)
+    buildTestData()
     deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
 
     if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       //data source file format
-      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$filePath') """)
+      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH 
'$writerPath') """)
     } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
-           |'$filePath' """.stripMargin)
+           |'$writerPath' """.stripMargin)
     } else{
       // TO DO
     }
@@ -336,7 +326,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
 
     spark.sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
-    assert(new File(filePath).exists())
+    assert(new File(writerPath).exists())
     cleanTestData()
   }
   test("Read data having multi blocklet and validate min max flag") {
@@ -388,7 +378,9 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
     val emailDataBlocklet2 = "Email for testing min max for allowed chars"
     try{
       val 
options=Map("bad_records_action"->"FORCE","complex_delimiter_level_1"->"$").asJava
-      val 
writer=CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16).sortBy(Array("myid","ingestion_time","event_id")).withLoadOptions(options).buildWriterForCSVInput(new
 Schema(fields),spark.sessionState.newHadoopConf())
+      val writer = 
CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16)
+        .sortBy(Array("myid", "ingestion_time", 
"event_id")).withLoadOptions(options)
+        .withCsvInput(new Schema(fields)).build()
       val timeF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
       val date_F=new SimpleDateFormat("yyyy-MM-dd")
       for(i<- 1 to recordsInBlocklet1){
@@ -445,8 +437,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends 
FunSuite with BeforeAndA
       .append("]")
       .toString()
     val builder = CarbonWriter.builder()
-    val writer = builder.outputPath(writerPath)
-      .buildWriterForCSVInput(Schema.parseJson(schema), 
spark.sessionState.newHadoopConf())
+    val writer = 
builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
     for (i <- 0 until 3) {
       // write a varchar with 75,000 length
       writer.write(Array[String](s"name_$i", 
RandomStringUtils.randomAlphabetic(75000), i.toString))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 3ae3d59..af3480f 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -34,7 +33,6 @@ import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
@@ -51,7 +49,7 @@ public class CarbonReaderBuilder {
   private String[] projectionColumns;
   private Expression filterExpression;
   private String tableName;
-  private boolean isTransactionalTable;
+  private Configuration hadoopConf;
 
   /**
    * Construct a CarbonReaderBuilder with table path and table name
@@ -79,21 +77,6 @@ public class CarbonReaderBuilder {
   }
 
   /**
-   * Configure the transactional status of table
-   * If set to false, then reads the carbondata and carbonindex files from a 
flat folder structure.
-   * If set to true, then reads the carbondata and carbonindex files from 
segment folder structure.
-   * Default value is false
-   *
-   * @param isTransactionalTable whether is transactional table or not
-   * @return CarbonReaderBuilder object
-   */
-  public CarbonReaderBuilder isTransactionalTable(boolean 
isTransactionalTable) {
-    Objects.requireNonNull(isTransactionalTable);
-    this.isTransactionalTable = isTransactionalTable;
-    return this;
-  }
-
-  /**
    * Configure the filter expression for carbon reader
    *
    * @param filterExpression filter expression
@@ -106,72 +89,19 @@ public class CarbonReaderBuilder {
   }
 
   /**
-   * Set the access key for S3
-   *
-   * @param key   the string of access key for different S3 type,like: 
fs.s3a.access.key
-   * @param value the value of access key
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setAccessKey(String key, String value) {
-    FileFactory.getConfiguration().set(key, value);
-    return this;
-  }
-
-  /**
-   * Set the access key for S3.
-   *
-   * @param value the value of access key
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setAccessKey(String value) {
-    return setAccessKey(Constants.ACCESS_KEY, value);
-  }
-
-  /**
-   * Set the secret key for S3
-   *
-   * @param key   the string of secret key for different S3 type,like: 
fs.s3a.secret.key
-   * @param value the value of secret key
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setSecretKey(String key, String value) {
-    FileFactory.getConfiguration().set(key, value);
-    return this;
-  }
-
-  /**
-   * Set the secret key for S3
-   *
-   * @param value the value of secret key
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setSecretKey(String value) {
-    return setSecretKey(Constants.SECRET_KEY, value);
-  }
-
-  /**
-   * Set the endpoint for S3
+   * To support hadoop configuration
    *
-   * @param key   the string of endpoint for different S3 type,like: 
fs.s3a.endpoint
-   * @param value the value of endpoint
-   * @return CarbonWriterBuilder object
+   * @param conf hadoop configuration support, can set s3a AK,SK,end point and 
other conf with this
+   * @return updated CarbonReaderBuilder
    */
-  public CarbonReaderBuilder setEndPoint(String key, String value) {
-    FileFactory.getConfiguration().set(key, value);
+  public CarbonReaderBuilder withHadoopConf(Configuration conf) {
+    if (conf != null) {
+      this.hadoopConf = conf;
+    }
     return this;
   }
 
   /**
-   * Set the endpoint for S3
-   *
-   * @param value the value of endpoint
-   * @return CarbonWriterBuilder object
-   */
-  public CarbonReaderBuilder setEndPoint(String value) {
-    return setEndPoint(Constants.ENDPOINT, value);
-  }
-
-  /**
    * Build CarbonReader
    *
    * @param <T>
@@ -179,22 +109,19 @@ public class CarbonReaderBuilder {
    * @throws IOException
    * @throws InterruptedException
    */
-  public <T> CarbonReader<T> build(Configuration configuration)
+  public <T> CarbonReader<T> build()
       throws IOException, InterruptedException {
-    // DB name is not applicable for SDK reader as, table will be never 
registered.
+    if (hadoopConf == null) {
+      hadoopConf = FileFactory.getConfiguration();
+    }
     CarbonTable table;
-    if (isTransactionalTable) {
-      table = CarbonTable
-          .buildFromTablePath(tableName, "default", tablePath, 
UUID.randomUUID().toString());
+    if (filterExpression != null) {
+      table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
     } else {
-      if (filterExpression != null) {
-        table = CarbonTable.buildTable(tablePath, tableName, configuration);
-      } else {
-        table = CarbonTable.buildDummyTable(tablePath);
-      }
+      table = CarbonTable.buildDummyTable(tablePath);
     }
     final CarbonFileInputFormat format = new CarbonFileInputFormat();
-    final Job job = new Job(configuration);
+    final Job job = new Job(hadoopConf);
     format.setTableInfo(job.getConfiguration(), table.getTableInfo());
     format.setTablePath(job.getConfiguration(), table.getTablePath());
     format.setTableName(job.getConfiguration(), table.getTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 02434fc..87930f6 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -33,26 +33,19 @@ import 
org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.MapType;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.ThriftWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.s3a.Constants;
 
 /**
  * Builder for {@link CarbonWriter}
@@ -63,15 +56,20 @@ public class CarbonWriterBuilder {
   private Schema schema;
   private String path;
   private String[] sortColumns;
-  private boolean persistSchemaFile;
   private int blockletSize;
   private int blockSize;
-  private boolean isTransactionalTable;
   private long timestamp;
   private Map<String, String> options;
   private String taskNo;
   private int localDictionaryThreshold;
   private boolean isLocalDictionaryEnabled;
+  private short numOfThreads;
+  private Configuration hadoopConf;
+  private enum WRITER_TYPE {
+    CSV, AVRO, JSON
+  }
+
+  private WRITER_TYPE writerType;
 
   /**
    * Sets the output path of the writer builder
@@ -114,102 +112,6 @@ public class CarbonWriterBuilder {
     return this;
   }
 
-
-
-  /**
-   * If set, create a schema file in metadata folder.
-   * @param persist is a boolean value, If set to true, creates a schema file 
in metadata folder.
-   * By default set to false. will not create metadata folder
-   * @return updated CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder persistSchemaFile(boolean persist) {
-    this.persistSchemaFile = persist;
-    return this;
-  }
-
-  /**
-   * If set false, writes the carbondata and carbonindex files in a flat 
folder structure
-   * @param isTransactionalTable is a boolelan value
-   * If set to false, then writes the carbondata and carbonindex files
-   * in a flat folder structure.
-   * If set to true, then writes the carbondata and carbonindex files
-   * in segment folder structure.
-   * By default set to false.
-   * @return updated CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder isTransactionalTable(boolean 
isTransactionalTable) {
-    Objects.requireNonNull(isTransactionalTable, "Transactional Table should 
not be null");
-    this.isTransactionalTable = isTransactionalTable;
-    return this;
-  }
-
-  /**
-   * Set the access key for S3
-   *
-   * @param key   the string of access key for different S3 type,like: 
fs.s3a.access.key
-   * @param value the value of access key
-   * @return CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder setAccessKey(String key, String value) {
-    FileFactory.getConfiguration().set(key, value);
-    return this;
-  }
-
-  /**
-   * Set the access key for S3.
-   *
-   * @param value the value of access key
-   * @return CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder setAccessKey(String value) {
-    return setAccessKey(Constants.ACCESS_KEY, value);
-  }
-
-  /**
-   * Set the secret key for S3
-   *
-   * @param key   the string of secret key for different S3 type,like: 
fs.s3a.secret.key
-   * @param value the value of secret key
-   * @return CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder setSecretKey(String key, String value) {
-    FileFactory.getConfiguration().set(key, value);
-    return this;
-  }
-
-  /**
-   * Set the secret key for S3
-   *
-   * @param value the value of secret key
-   * @return CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder setSecretKey(String value) {
-    return setSecretKey(Constants.SECRET_KEY, value);
-  }
-
-  /**
-   * Set the endpoint for S3
-   *
-   * @param key   the string of endpoint for different S3 type,like: 
fs.s3a.endpoint
-   * @param value the value of endpoint
-   * @return CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder setEndPoint(String key, String value) {
-    FileFactory.getConfiguration().set(key, value);
-    return this;
-  }
-
-  /**
-   * Set the endpoint for S3
-   *
-   * @param value the value of endpoint
-   * @return CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder setEndPoint(String value) {
-    FileFactory.getConfiguration().set(Constants.ENDPOINT, value);
-    return this;
-  }
-
   /**
    * to set the timestamp in the carbondata and carbonindex index files
    * @param timestamp is a timestamp to be used in the carbondata and 
carbonindex index files.
@@ -286,6 +188,7 @@ public class CarbonWriterBuilder {
    * c. local_dictionary_threshold -- positive value, default is 10000
    * d. local_dictionary_enable -- true / false. Default is false
    * e. sort_columns -- comma separated column. "c1,c2". Default all 
dimensions are sorted.
+   *                    If empty string "" is passed. No columns are sorted
    * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is 
"local_sort"
    * k. long_string_columns -- comma separated string columns which are more 
than 32k length.
    *                           default value is null.
@@ -316,7 +219,12 @@ public class CarbonWriterBuilder {
         
this.enableLocalDictionary((entry.getValue().equalsIgnoreCase("true")));
       } else if (entry.getKey().equalsIgnoreCase("sort_columns")) {
         //sort columns
-        String[] sortColumns = entry.getValue().split(",");
+        String[] sortColumns;
+        if (entry.getValue().trim().isEmpty()) {
+          sortColumns = new String[0];
+        } else {
+          sortColumns = entry.getValue().split(",");
+        }
         this.sortBy(sortColumns);
       } else if (entry.getKey().equalsIgnoreCase("sort_scope")) {
         this.withSortScope(entry);
@@ -328,6 +236,36 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * To make sdk writer thread safe.
+   *
+   * @param numOfThreads should number of threads in which writer is called in 
multi-thread scenario
+   *                     default sdk writer is not thread safe.
+   *                     can use one writer instance in one thread only.
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withThreadSafe(short numOfThreads) {
+    if (numOfThreads < 1) {
+      throw new IllegalArgumentException("number of threads cannot be lesser 
than 1. "
+          + "suggest to keep two times the number of cores available");
+    }
+    this.numOfThreads = numOfThreads;
+    return this;
+  }
+
+  /**
+   * To support hadoop configuration
+   *
+   * @param conf hadoop configuration support, can set s3a AK,SK,end point and 
other conf with this
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withHadoopConf(Configuration conf) {
+    if (conf != null) {
+      this.hadoopConf = conf;
+    }
+    return this;
+  }
+
+  /**
    * To set the carbondata file size in MB between 1MB-2048MB
    * @param blockSize is size in MB between 1MB to 2048 MB
    * default value is 1024 MB
@@ -379,138 +317,79 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * This writer is not thread safe,
-   * use buildThreadSafeWriterForCSVInput in multi thread environment
-   * Build a {@link CarbonWriter}, which accepts row in CSV format
-   * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
-   * @return CSVCarbonWriter
-   * @throws IOException
-   * @throws InvalidLoadOptionException
-   */
-  public CarbonWriter buildWriterForCSVInput(Schema schema, Configuration 
configuration)
-      throws IOException, InvalidLoadOptionException {
-    Objects.requireNonNull(schema, "schema should not be null");
-    Objects.requireNonNull(path, "path should not be null");
-    this.schema = schema;
-    CarbonLoadModel loadModel = buildLoadModel(schema);
-    return new CSVCarbonWriter(loadModel, configuration);
-  }
-
-  /**
+   * to build a {@link CarbonWriter}, which accepts row in CSV format
    *
-   * Build a {@link CarbonWriter}, which accepts row in CSV format
    * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
-   * @param numOfThreads number of threads() in which .write will be called.
-   * @return CSVCarbonWriter
-   * @throws IOException
-   * @throws InvalidLoadOptionException
+   * @return CarbonWriterBuilder
    */
-  public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short 
numOfThreads,
-      Configuration configuration) throws IOException, 
InvalidLoadOptionException {
+  public CarbonWriterBuilder withCsvInput(Schema schema) {
     Objects.requireNonNull(schema, "schema should not be null");
-    Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
-    Objects.requireNonNull(path, "path should not be null");
     this.schema = schema;
-    if (numOfThreads <= 0) {
-      throw new IllegalArgumentException(" numOfThreads must be greater than 
0");
-    }
-    CarbonLoadModel loadModel = buildLoadModel(schema);
-    loadModel.setSdkWriterCores(numOfThreads);
-    return new CSVCarbonWriter(loadModel, configuration);
-  }
-
-  /**
-   * This writer is not thread safe,
-   * use buildThreadSafeWriterForAvroInput in multi thread environment
-   * Build a {@link CarbonWriter}, which accepts Avro object
-   * @param avroSchema avro Schema object {org.apache.avro.Schema}
-   * @return AvroCarbonWriter
-   * @throws IOException
-   * @throws InvalidLoadOptionException
-   */
-  public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema 
avroSchema,
-      Configuration configuration) throws IOException, 
InvalidLoadOptionException {
-    this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
-    Objects.requireNonNull(schema, "schema should not be null");
-    Objects.requireNonNull(path, "path should not be null");
-    CarbonLoadModel loadModel = buildLoadModel(schema);
-    // AVRO records are pushed to Carbon as Object not as Strings. This was 
done in order to
-    // handle multi level complex type support. As there are no conversion 
converter step is
-    // removed from the load. LoadWithoutConverter flag is going to point to 
the Loader Builder
-    // which will skip Conversion Step.
-    loadModel.setLoadWithoutConverterStep(true);
-    return new AvroCarbonWriter(loadModel, configuration);
+    this.writerType = WRITER_TYPE.CSV;
+    return this;
   }
 
   /**
-   * Build a {@link CarbonWriter}, which accepts Avro object
+   * to build a {@link CarbonWriter}, which accepts Avro object
+   *
    * @param avroSchema avro Schema object {org.apache.avro.Schema}
-   * @param numOfThreads number of threads() in which .write will be called.
-   * @return AvroCarbonWriter
-   * @throws IOException
-   * @throws InvalidLoadOptionException
+   * @return CarbonWriterBuilder
    */
-  public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema 
avroSchema,
-      short numOfThreads, Configuration configuration)
-      throws IOException, InvalidLoadOptionException {
+  public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema) {
+    Objects.requireNonNull(avroSchema, "Avro schema should not be null");
     this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
-    Objects.requireNonNull(schema, "schema should not be null");
-    Objects.requireNonNull(path, "path should not be null");
-    Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
-    if (numOfThreads <= 0) {
-      throw new IllegalArgumentException(" numOfThreads must be greater than 
0");
-    }
-    CarbonLoadModel loadModel = buildLoadModel(schema);
-    // AVRO records are pushed to Carbon as Object not as Strings. This was 
done in order to
-    // handle multi level complex type support. As there are no conversion 
converter step is
-    // removed from the load. LoadWithoutConverter flag is going to point to 
the Loader Builder
-    // which will skip Conversion Step.
-    loadModel.setLoadWithoutConverterStep(true);
-    loadModel.setSdkWriterCores(numOfThreads);
-    return new AvroCarbonWriter(loadModel, configuration);
+    this.writerType = WRITER_TYPE.AVRO;
+    return this;
   }
 
   /**
-   * This writer is not thread safe,
-   * use buildThreadSafeWriterForJsonInput in multi thread environment
-   * Build a {@link CarbonWriter}, which accepts Json object
+   * to build a {@link CarbonWriter}, which accepts Json object
+   *
    * @param carbonSchema carbon Schema object
-   * @return JsonCarbonWriter
-   * @throws IOException
-   * @throws InvalidLoadOptionException
+   * @return CarbonWriterBuilder
    */
-  public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, 
Configuration configuration)
-      throws IOException, InvalidLoadOptionException {
+  public CarbonWriterBuilder withJsonInput(Schema carbonSchema) {
     Objects.requireNonNull(carbonSchema, "schema should not be null");
-    Objects.requireNonNull(path, "path should not be null");
     this.schema = carbonSchema;
-    CarbonLoadModel loadModel = buildLoadModel(carbonSchema);
-    loadModel.setJsonFileLoad(true);
-    return new JsonCarbonWriter(loadModel, configuration);
+    this.writerType = WRITER_TYPE.JSON;
+    return this;
   }
 
   /**
-   * Can use this writer in multi-thread instance.
+   * Build a {@link CarbonWriter}
+   * This writer is not thread safe,
+   * use withThreadSafe() configuration in multi thread environment
    *
-   * Build a {@link CarbonWriter}, which accepts Json object
-   * @param carbonSchema carbon Schema object
-   * @param numOfThreads number of threads() in which .write will be called.
-   * @return JsonCarbonWriter
+   * @return CarbonWriter {AvroCarbonWriter/CSVCarbonWriter/JsonCarbonWriter 
based on Input Type }
    * @throws IOException
    * @throws InvalidLoadOptionException
    */
-  public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema 
carbonSchema, short numOfThreads,
-      Configuration configuration) throws IOException, 
InvalidLoadOptionException {
-    Objects.requireNonNull(carbonSchema, "schema should not be null");
+  public CarbonWriter build() throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(path, "path should not be null");
-    if (numOfThreads <= 0) {
-      throw new IllegalArgumentException(" numOfThreads must be greater than 
0");
+    if (this.writerType == null) {
+      throw new IOException(
+          "Writer type is not set, use withCsvInput() or withAvroInput() or 
withJsonInput()  "
+              + "API based on input");
     }
-    this.schema = carbonSchema;
     CarbonLoadModel loadModel = buildLoadModel(schema);
-    loadModel.setJsonFileLoad(true);
     loadModel.setSdkWriterCores(numOfThreads);
-    return new JsonCarbonWriter(loadModel, configuration);
+    if (hadoopConf == null) {
+      hadoopConf = FileFactory.getConfiguration();
+    }
+    if (this.writerType == WRITER_TYPE.AVRO) {
+      // AVRO records are pushed to Carbon as Object not as Strings. This was 
done in order to
+      // handle multi level complex type support. As there are no conversion 
converter step is
+      // removed from the load. LoadWithoutConverter flag is going to point to 
the Loader Builder
+      // which will skip Conversion Step.
+      loadModel.setLoadWithoutConverterStep(true);
+      return new AvroCarbonWriter(loadModel, hadoopConf);
+    } else if (this.writerType == WRITER_TYPE.JSON) {
+      loadModel.setJsonFileLoad(true);
+      return new JsonCarbonWriter(loadModel, hadoopConf);
+    } else {
+      // CSV
+      return new CSVCarbonWriter(loadModel, hadoopConf);
+    }
   }
 
   private void setCsvHeader(CarbonLoadModel model) {
@@ -542,10 +421,6 @@ public class CarbonWriterBuilder {
     this.schema = updateSchemaFields(carbonSchema, longStringColumns);
     // build CarbonTable using schema
     CarbonTable table = buildCarbonTable();
-    if (persistSchemaFile) {
-      // we are still using the traditional carbon table folder structure
-      persistSchemaFile(table, CarbonTablePath.getSchemaFilePath(path));
-    }
     // build LoadModel
     return buildLoadModel(table, timestamp, taskNo, options);
   }
@@ -614,18 +489,13 @@ public class CarbonWriterBuilder {
     tableSchemaBuilder.setSortColumns(Arrays.asList(sortColumnsSchemaList));
     String tableName;
     String dbName;
-    if (isTransactionalTable) {
-      tableName = "_tempTable";
-      dbName = "_tempDB";
-    } else {
-      dbName = "";
-      tableName = "_tempTable_" + String.valueOf(timestamp);
-    }
+    dbName = "";
+    tableName = "_tempTable_" + String.valueOf(timestamp);
     TableSchema schema = tableSchemaBuilder.build();
     schema.setTableName(tableName);
     CarbonTable table =
         
CarbonTable.builder().tableName(schema.getTableName()).databaseName(dbName).tablePath(path)
-            
.tableSchema(schema).isTransactionalTable(isTransactionalTable).build();
+            .tableSchema(schema).isTransactionalTable(false).build();
     return table;
   }
 
@@ -708,36 +578,6 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * Save the schema of the {@param table} to {@param persistFilePath}
-   * @param table table object containing schema
-   * @param persistFilePath absolute file path with file name
-   */
-  private void persistSchemaFile(CarbonTable table, String persistFilePath) 
throws IOException {
-    TableInfo tableInfo = table.getTableInfo();
-    String schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(persistFilePath);
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
-    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    org.apache.carbondata.format.TableInfo thriftTableInfo =
-        schemaConverter.fromWrapperToExternalTableInfo(
-            tableInfo,
-            tableInfo.getDatabaseName(),
-            tableInfo.getFactTable().getTableName());
-    org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
-        new org.apache.carbondata.format.SchemaEvolutionEntry(
-            tableInfo.getLastUpdatedTime());
-    
thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
-        .add(schemaEvolutionEntry);
-    FileFactory.FileType fileType = 
FileFactory.getFileType(schemaMetadataPath);
-    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-      FileFactory.mkdirs(schemaMetadataPath, fileType);
-    }
-    ThriftWriter thriftWriter = new ThriftWriter(persistFilePath, false);
-    thriftWriter.open();
-    thriftWriter.write(thriftTableInfo);
-    thriftWriter.close();
-  }
-
-  /**
    * Build a {@link CarbonLoadModel}
    */
   private CarbonLoadModel buildLoadModel(CarbonTable table, long timestamp, 
String taskNo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java
index f4c2408..dc6e012 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -29,7 +29,6 @@ import 
org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -68,25 +67,19 @@ public class TestUtil {
     }
   }
 
-  static void writeFilesAndVerify(Schema schema, String path) {
-    writeFilesAndVerify(schema, path, null);
-  }
-
   static void writeFilesAndVerify(Schema schema, String path, String[] 
sortColumns) {
     writeFilesAndVerify(
-        100, schema, path, sortColumns, false, -1, -1, true);
+        100, schema, path, sortColumns, -1, -1);
   }
 
-  public static void writeFilesAndVerify(
-      int rows, Schema schema, String path, boolean persistSchema) {
+  public static void writeFilesAndVerify(int rows, Schema schema, String path) 
{
     writeFilesAndVerify(
-        rows, schema, path, null, persistSchema, -1, -1, true);
+        rows, schema, path, null, -1, -1);
   }
 
-  public static void writeFilesAndVerify(Schema schema, String path, boolean 
persistSchema,
-      boolean isTransactionalTable) {
+  public static void writeFilesAndVerify(Schema schema, String path) {
     writeFilesAndVerify(
-        100, schema, path, null, persistSchema, -1, -1, isTransactionalTable);
+        100, schema, path, null, -1, -1);
   }
 
   /**
@@ -100,7 +93,7 @@ public class TestUtil {
    */
   public static void writeFilesAndVerify(
       int rows, Schema schema, String path, boolean persistSchema, boolean 
isTransactionalTable) {
-    writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, 
isTransactionalTable);
+    writeFilesAndVerify(rows, schema, path, null, -1, -1);
   }
 
   /**
@@ -109,23 +102,17 @@ public class TestUtil {
    * @param schema schema of the file
    * @param path local write path
    * @param sortColumns sort columns
-   * @param persistSchema true if want to persist schema file
    * @param blockletSize blockletSize in the file, -1 for default size
    * @param blockSize blockSize in the file, -1 for default size
-   * @param isTransactionalTable set to true if this is written for 
Transactional Table.
    */
   public static void writeFilesAndVerify(int rows, Schema schema, String path, 
String[] sortColumns,
-      boolean persistSchema, int blockletSize, int blockSize, boolean 
isTransactionalTable) {
+      int blockletSize, int blockSize) {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
-          .isTransactionalTable(isTransactionalTable)
           .outputPath(path);
       if (sortColumns != null) {
         builder = builder.sortBy(sortColumns);
       }
-      if (persistSchema) {
-        builder = builder.persistSchemaFile(true);
-      }
       if (blockletSize != -1) {
         builder = builder.withBlockletSize(blockletSize);
       }
@@ -133,7 +120,7 @@ public class TestUtil {
         builder = builder.withBlockSize(blockSize);
       }
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(schema, 
configuration);
+      CarbonWriter writer = builder.withCsvInput(schema).build();
 
       for (int i = 0; i < rows; i++) {
         writer.write(new String[]{
@@ -145,20 +132,7 @@ public class TestUtil {
       throw new RuntimeException(e);
     }
 
-    File segmentFolder = null;
-    if (isTransactionalTable) {
-      segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
-      if (!segmentFolder.exists()) {
-        throw new RuntimeException("Test failed: file not exists");
-      }
-    } else {
-      segmentFolder = new File(path);
-      if (!segmentFolder.exists()) {
-        throw new RuntimeException("Test failed: file not exists");
-      }
-    }
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+    File[] dataFiles = new File(path).listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {
         return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
 
b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
index e43f750..ed2be51 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
@@ -22,8 +22,11 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -42,12 +45,18 @@ abstract class MetaCachedCarbonStore implements CarbonStore 
{
     if (cache.containsKey(path)) {
       return cache.get(path);
     }
-    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
-        .readSchemaFile(CarbonTablePath.getSchemaFilePath(path));
-    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    TableInfo tableInfo1 = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
-    tableInfo1.setTablePath(path);
-    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1);
+    String schemaPath = CarbonTablePath.getSchemaFilePath(path);
+    TableInfo tableInfo;
+    if (!FileFactory.isFileExist(schemaPath, 
FileFactory.getFileType(schemaPath))) {
+      tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path), 
false);
+    } else {
+      org.apache.carbondata.format.TableInfo tableInfoFormat;
+      tableInfoFormat = 
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(path));
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      tableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfoFormat, "", "", "");
+      tableInfo.setTablePath(path);
+    }
+    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
     cache.put(path, table);
     return table;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index abaa7f1..53ed90f 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.sdk.file;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,15 +30,12 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
-import 
org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.CharEncoding;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -88,8 +84,8 @@ public class AvroCarbonWriterTest {
     // conversion to GenericData.Record
     GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema);
     try {
-      CarbonWriter writer = 
CarbonWriter.builder().outputPath(path).isTransactionalTable(true)
-          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), 
TestUtil.configuration);
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path)
+          .withAvroInput(new Schema.Parser().parse(avroSchema)).build();
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -100,10 +96,7 @@ public class AvroCarbonWriterTest {
       Assert.fail(e.getMessage());
     }
 
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+    File[] dataFiles = new File(path).listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {
         return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
       }
@@ -156,8 +149,8 @@ public class AvroCarbonWriterTest {
     try {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
-          .isTransactionalTable(true)
-          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), 
TestUtil.configuration);
+          
+          .withAvroInput(new Schema.Parser().parse(avroSchema)).build();
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -168,10 +161,7 @@ public class AvroCarbonWriterTest {
       Assert.fail(e.getMessage());
     }
 
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+    File[] dataFiles = new File(path).listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {
         return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
       }
@@ -246,11 +236,7 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
 
     try {
-      CarbonWriter writer = CarbonWriter.builder()
-          .outputPath(path)
-          .isTransactionalTable(true)
-          .buildWriterForAvroInput(nn, TestUtil.configuration);
-
+      CarbonWriter writer = 
CarbonWriter.builder().outputPath(path).withAvroInput(nn).build();
       for (int i = 0; i < 100; i++) {
         writer.write(record);
       }
@@ -260,10 +246,7 @@ public class AvroCarbonWriterTest {
       Assert.fail(e.getMessage());
     }
 
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+    File[] dataFiles = new File(path).listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {
         return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
       }
@@ -307,11 +290,7 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
 
     try {
-      CarbonWriter writer = CarbonWriter.builder()
-          .outputPath(path)
-          .isTransactionalTable(true)
-          .buildWriterForAvroInput(nn, TestUtil.configuration);
-
+      CarbonWriter writer = 
CarbonWriter.builder().outputPath(path).withAvroInput(nn).build();
       for (int i = 0; i < 100; i++) {
         writer.write(record);
       }
@@ -321,10 +300,7 @@ public class AvroCarbonWriterTest {
       Assert.fail(e.getMessage());
     }
 
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+    File[] dataFiles = new File(path).listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {
         return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
       }
@@ -337,17 +313,14 @@ public class AvroCarbonWriterTest {
 
 
   private void WriteAvroComplexData(String mySchema, String json, String[] 
sortColumns)
-      throws UnsupportedEncodingException, IOException, 
InvalidLoadOptionException {
+      throws IOException, InvalidLoadOptionException {
 
     // conversion to GenericData.Record
     Schema nn = new Schema.Parser().parse(mySchema);
     GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
     try {
-      CarbonWriter writer = CarbonWriter.builder()
-          .outputPath(path)
-          .isTransactionalTable(true).sortBy(sortColumns)
-          .buildWriterForAvroInput(nn, TestUtil.configuration);
-
+      CarbonWriter writer =
+          
CarbonWriter.builder().outputPath(path).sortBy(sortColumns).withAvroInput(nn).build();
       for (int i = 0; i < 100; i++) {
         writer.write(record);
       }
@@ -396,10 +369,7 @@ public class AvroCarbonWriterTest {
 
     WriteAvroComplexData(mySchema, json, null);
 
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+    File[] dataFiles = new File(path).listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {
         return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
       }
@@ -460,11 +430,11 @@ public class AvroCarbonWriterTest {
     Field[] field = new Field[2];
     field[0] = new Field("name", DataTypes.STRING);
     field[1] = new Field("name", DataTypes.STRING);
-    CarbonWriterBuilder writer = 
CarbonWriter.builder().isTransactionalTable(false)
+    CarbonWriterBuilder writer = CarbonWriter.builder()
         .uniqueIdentifier(System.currentTimeMillis()).outputPath(path);
 
     try {
-      writer.buildWriterForCSVInput(new 
org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration);
+      writer.withCsvInput(new 
org.apache.carbondata.sdk.file.Schema(field)).build();
       Assert.fail();
     } catch (Exception e) {
       assert(e.getMessage().contains("Duplicate column name found in table 
schema"));
@@ -477,14 +447,14 @@ public class AvroCarbonWriterTest {
     Field[] field = new Field[2];
     field[0] = new Field("name", DataTypes.STRING);
     field[1] = new Field("date", DataTypes.DATE);
-    CarbonWriterBuilder writer = 
CarbonWriter.builder().isTransactionalTable(false)
+    CarbonWriterBuilder writer = CarbonWriter.builder()
         .uniqueIdentifier(System.currentTimeMillis()).outputPath(path);
 
     try {
       Map<String, String> loadOptions = new HashMap<String, String>();
       loadOptions.put("bad_records_action", "fail");
       CarbonWriter carbonWriter =
-          
writer.isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForCSVInput(new
 org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration);
+          writer.withLoadOptions(loadOptions).withCsvInput(new 
org.apache.carbondata.sdk.file.Schema(field)).build();
       carbonWriter.write(new String[] { "k", "20-02-2233" });
       carbonWriter.close();
       Assert.fail();
@@ -510,15 +480,15 @@ public class AvroCarbonWriterTest {
     // conversion to GenericData.Record
     GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema);
     try {
-      CarbonWriter writer = 
CarbonWriter.builder().outputPath(path).isTransactionalTable(true)
-          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), 
TestUtil.configuration);
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path)
+          .withAvroInput(new Schema.Parser().parse(avroSchema)).build();
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
       }
       writer.close();
       TableInfo tableInfo = 
SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "",
-          ""), true, TestUtil.configuration);
+          ""), false);
       List<String> dataTypes = new ArrayList<>();
       for(ColumnSchema columnSchema: 
tableInfo.getFactTable().getListOfColumns()) {
         dataTypes.add(columnSchema.getDataType().toString());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 5bc453b..ba6d772 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
-import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
@@ -123,11 +122,8 @@ public class CSVCarbonWriterTest {
     fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
 
     try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .isTransactionalTable(true)
-          .outputPath(path);
-
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), 
TestUtil.configuration);
+      CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
 
       for (int i = 0; i < 100; i++) {
         String[] row = new String[]{
@@ -148,10 +144,7 @@ public class CSVCarbonWriterTest {
       Assert.fail(e.getMessage());
     }
 
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+    File[] dataFiles = new File(path).listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {
         return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
       }
@@ -171,7 +164,7 @@ public class CSVCarbonWriterTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 
false, 1, 100, false);
+    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 
1, 100);
 
     // TODO: implement reader to verify the number of blocklet in the file
 
@@ -187,10 +180,8 @@ public class CSVCarbonWriterTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 
false, 2, 2, true);
-
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 
2, 2);
+    File[] dataFiles = new File(path).listFiles(new FileFilter() {
       @Override public boolean accept(File pathname) {
         return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
       }
@@ -222,23 +213,6 @@ public class CSVCarbonWriterTest {
     // TODO: test write data with partition
   }
 
-  @Test
-  public void testSchemaPersistence() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
-
-    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
-    Assert.assertTrue(new File(schemaFile).exists());
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
   @Test(expected = IOException.class)
   public void testWhenWriterthrowsError() throws IOException{
     CarbonWriter carbonWriter = null;
@@ -249,8 +223,8 @@ public class CSVCarbonWriterTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
     try {
-      carbonWriter = CarbonWriter.builder().isTransactionalTable(false).
-          outputPath(path).buildWriterForCSVInput(new Schema(fields), 
TestUtil.configuration);
+      carbonWriter = CarbonWriter.builder().
+          outputPath(path).withCsvInput(new Schema(fields)).build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.assertTrue(false);
@@ -269,8 +243,8 @@ public class CSVCarbonWriterTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
     try {
-      carbonWriter = CarbonWriter.builder().isTransactionalTable(false).
-          outputPath(path).buildWriterForCSVInput(new Schema(fields), 
TestUtil.configuration);
+      carbonWriter = CarbonWriter.builder().
+          outputPath(path).withCsvInput(new Schema(fields)).build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.assertTrue(false);
@@ -293,10 +267,10 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
-          .isTransactionalTable(true).taskNo(5)
+          .taskNo(5)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), 
TestUtil.configuration);
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
 
       for (int i = 0; i < 2; i++) {
         String[] row = new String[]{
@@ -307,10 +281,7 @@ public class CSVCarbonWriterTest {
       }
       writer.close();
 
-      File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, 
"null"));
-      Assert.assertTrue(segmentFolder.exists());
-
-      File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      File[] dataFiles = new File(path).listFiles(new FileFilter() {
         @Override public boolean accept(File pathname) {
           return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
         }
@@ -338,8 +309,7 @@ public class CSVCarbonWriterTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(1000000, new Schema(fields), path, new 
String[]{"name"},
-        true, 3, 8, false);
+    TestUtil.writeFilesAndVerify(1000000, new Schema(fields), path, new 
String[]{"name"}, 3, 8);
 
     // read footer and verify number of blocklets
     CarbonFile folder = FileFactory.getCarbonFile(path);
@@ -374,8 +344,8 @@ public class CSVCarbonWriterTest {
     fields[2] = new Field("doubleField", DataTypes.DOUBLE);
 
     try {
-      CarbonWriterBuilder builder = 
CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), 
TestUtil.configuration);
+      CarbonWriterBuilder builder = 
CarbonWriter.builder().taskNo(5).outputPath(path);
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "robot" + (i % 10), String.valueOf(i + 
"." + i),
             String.valueOf(i + "." + i) };
@@ -383,7 +353,7 @@ public class CSVCarbonWriterTest {
       }
       writer.close();
       TableInfo tableInfo = 
SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "",
-          ""), true, TestUtil.configuration);
+          ""), false);
       List<String> dataTypes = new ArrayList<>();
       for(ColumnSchema columnSchema: 
tableInfo.getFactTable().getListOfColumns()) {
           dataTypes.add(columnSchema.getDataType().toString());
@@ -409,15 +379,15 @@ public class CSVCarbonWriterTest {
     fields[1] = new Field("byteField", DataTypes.BYTE);
 
     try {
-      CarbonWriterBuilder builder = 
CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), 
TestUtil.configuration);
+      CarbonWriterBuilder builder = 
CarbonWriter.builder().taskNo(5).outputPath(path);
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "robot" + (i % 10),  "" + i };
         writer.write(row);
       }
       writer.close();
       TableInfo tableInfo = 
SchemaReader.inferSchema(AbsoluteTableIdentifier.from(path, "",
-          ""), true, TestUtil.configuration);
+          ""), false);
       List<String> dataTypes = new ArrayList<>();
       for(ColumnSchema columnSchema: 
tableInfo.getFactTable().getListOfColumns()) {
         dataTypes.add(columnSchema.getDataType().toString());
@@ -443,15 +413,15 @@ public class CSVCarbonWriterTest {
     fields[2] = new Field("floatField", DataTypes.FLOAT);
 
     try {
-      CarbonWriterBuilder builder = 
CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), 
TestUtil.configuration);
+      CarbonWriterBuilder builder = 
CarbonWriter.builder().taskNo(5).outputPath(path);
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "robot" + (i % 10), "" + i, i + "." + i 
};
         writer.write(row);
       }
       writer.close();
       CarbonReader carbonReader =
-          new CarbonReaderBuilder(path, 
"table1").build(TestUtil.configuration);
+          new CarbonReaderBuilder(path, "table1").build();
       for (int i = 0; i < 15; i++) {
         Object[] actualRow = (Object[]) carbonReader.readNextRow();
         String[] expectedRow = new String[] { "robot" + (i % 10), "" + i, i + 
"." + i };
@@ -483,9 +453,8 @@ public class CSVCarbonWriterTest {
     Field structType = new Field("structField", "struct", 
Arrays.asList(fields));
 
     try {
-      CarbonWriterBuilder builder = 
CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(new 
Field[] {structType}),
-          TestUtil.configuration);
+      CarbonWriterBuilder builder = 
CarbonWriter.builder().taskNo(5).outputPath(path);
+      CarbonWriter writer = builder.withCsvInput(new Schema(new Field[] 
{structType})).build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "robot" + (i % 10)+"$" + i+ "$" + i + 
"." + i };
         writer.write(row);
@@ -523,9 +492,8 @@ public class CSVCarbonWriterTest {
     Field structType2 = new Field("bytearray", "array", 
Arrays.asList(fields2));
 
     try {
-      CarbonWriterBuilder builder = 
CarbonWriter.builder().isTransactionalTable(true).taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(new 
Field[] {structType1, structType2}),
-          TestUtil.configuration);
+      CarbonWriterBuilder builder = 
CarbonWriter.builder().taskNo(5).outputPath(path);
+      CarbonWriter writer = builder.withCsvInput(new Schema(new Field[] 
{structType1, structType2})).build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "1.0$2.0$3.0", "1$2$3" };
         writer.write(row);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a2a94d0/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
deleted file mode 100644
index a7ec6cd..0000000
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.sdk.file;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import 
org.apache.carbondata.core.metadata.schema.table.DiskBasedDMSchemaStorageProvider;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test suite for {@link CSVCarbonWriter}
- */
-public class CSVNonTransactionalCarbonWriterTest {
-  @Before
-  public void cleanFile() {
-    String path = null;
-    try {
-      path = new 
File(CSVNonTransactionalCarbonWriterTest.class.getResource("/").getPath() + 
"../")
-          .getCanonicalPath().replaceAll("\\\\", "/");
-    } catch (IOException e) {
-      assert (false);
-    }
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, 
path);
-    assert (TestUtil.cleanMdtFile());
-  }
-
-  @After
-  public void verifyDMFile() {
-    assert (!TestUtil.verifyMdtFile());
-  }
-
-  @Test
-  public void testWriteFiles() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testWriteFilesJsonSchema() throws IOException {
-    String path = "./testWriteFilesJsonSchema";
-    FileUtils.deleteDirectory(new File(path));
-
-    String schema = new StringBuilder()
-        .append("[ \n")
-        .append("   {\"name\":\"string\"},\n")
-        .append("   {\"age\":\"int\"},\n")
-        .append("   {\"height\":\"double\"}\n")
-        .append("]")
-        .toString();
-
-    writeFilesAndVerify(Schema.parseJson(schema), path);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path) {
-    writeFilesAndVerify(schema, path, null);
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path, String[] 
sortColumns) {
-    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path, boolean 
persistSchema) {
-    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
-  }
-
-  /**
-   * Invoke CarbonWriter API to write carbon files and assert the file is 
rewritten
-   * @param rows number of rows to write
-   * @param schema schema of the file
-   * @param path local write path
-   * @param sortColumns sort columns
-   * @param persistSchema true if want to persist schema file
-   * @param blockletSize blockletSize in the file, -1 for default size
-   * @param blockSize blockSize in the file, -1 for default size
-   */
-  private void writeFilesAndVerify(int rows, Schema schema, String path, 
String[] sortColumns,
-      boolean persistSchema, int blockletSize, int blockSize) {
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .isTransactionalTable(false)
-          .uniqueIdentifier(System.currentTimeMillis())
-          .taskNo(System.nanoTime())
-          .outputPath(path);
-      if (sortColumns != null) {
-        builder = builder.sortBy(sortColumns);
-      }
-      if (persistSchema) {
-        builder = builder.persistSchemaFile(true);
-      }
-      if (blockletSize != -1) {
-        builder = builder.withBlockletSize(blockletSize);
-      }
-      if (blockSize != -1) {
-        builder = builder.withBlockSize(blockSize);
-      }
-
-      CarbonWriter writer = builder.buildWriterForCSVInput(schema, 
TestUtil.configuration);
-
-      for (int i = 0; i < rows; i++) {
-        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), 
String.valueOf((double) i / 2)});
-      }
-      writer.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    } catch (InvalidLoadOptionException l) {
-      l.printStackTrace();
-      Assert.fail(l.getMessage());
-    }
-
-    File segmentFolder = new File(path);
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-  }
-  
-
-  @Test
-  public void testAllPrimitiveDataType() throws IOException {
-    // TODO: write all data type and read by CarbonRecordReader to verify the 
content
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[9];
-    fields[0] = new Field("stringField", DataTypes.STRING);
-    fields[1] = new Field("intField", DataTypes.INT);
-    fields[2] = new Field("shortField", DataTypes.SHORT);
-    fields[3] = new Field("longField", DataTypes.LONG);
-    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
-    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
-    fields[6] = new Field("dateField", DataTypes.DATE);
-    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
-    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
-
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .uniqueIdentifier(System.currentTimeMillis())
-          .isTransactionalTable(false)
-          .taskNo(System.nanoTime())
-          .outputPath(path);
-
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), 
TestUtil.configuration);
-
-      for (int i = 0; i < 100; i++) {
-        String[] row = new String[]{
-            "robot" + (i % 10),
-            String.valueOf(i),
-            String.valueOf(i),
-            String.valueOf(Long.MAX_VALUE - i),
-            String.valueOf((double) i / 2),
-            String.valueOf(true),
-            "2019-03-02",
-            "2019-02-12 03:03:34"
-        };
-        writer.write(row);
-      }
-      writer.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    }
-
-    File segmentFolder = new File(path);
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void test2Blocklet() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 
100);
-
-    // TODO: implement reader to verify the number of blocklet in the file
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void test2Block() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 
2);
-
-    File segmentFolder = new File(path);
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertEquals(2, dataFiles.length);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testSortColumns() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
-
-    // TODO: implement reader and verify the data is sorted
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testPartitionOutput() {
-    // TODO: test write data with partition
-  }
-
-  @Test
-  public void testSchemaPersistence() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path, true);
-
-    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
-    Assert.assertTrue(new File(schemaFile).exists());
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-}

Reply via email to