http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java
deleted file mode 100644
index 195374a..0000000
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-/**
- * Copyright 2005 Bytecode Pty Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-/**
- * Interface for the ResultSetHelperService.  Allows the user to define their 
own ResultSetHelper
- * for use in the CSVWriter.
- */
-public interface ResultSetHelper {
-  /**
-   * Returns the column Names from the ResultSet.
-   *
-   * @param rs - ResultSet
-   * @return - string array containing the column names.
-   * @throws SQLException - thrown by the ResultSet.
-   */
-  String[] getColumnNames(ResultSet rs) throws SQLException;
-
-  /**
-   * Returns the column values from the result set.
-   *
-   * @param rs - the ResultSet containing the values.
-   * @return String Array containing the values.
-   * @throws SQLException - thrown by the ResultSet.
-   * @throws IOException  - thrown by the ResultSet.
-   */
-  String[] getColumnValues(ResultSet rs) throws SQLException, IOException;
-
-  /**
-   * Returns the column values from the result set with the values trimmed if 
desired.
-   *
-   * @param rs   - the ResultSet containing the values.
-   * @param trim - values should have white spaces trimmed.
-   * @return String Array containing the values.
-   * @throws SQLException - thrown by the ResultSet.
-   * @throws IOException  - thrown by the ResultSet.
-   */
-  String[] getColumnValues(ResultSet rs, boolean trim) throws SQLException, 
IOException;
-
-  /**
-   * Returns the column values from the result set with the values trimmed if 
desired.
-   * Also format the date and time columns based on the format strings passed 
in.
-   *
-   * @param rs               - the ResultSet containing the values.
-   * @param trim             - values should have white spaces trimmed.
-   * @param dateFormatString - format String for dates.
-   * @param timeFormatString - format String for timestamps.
-   * @return String Array containing the values.
-   * @throws SQLException - thrown by the ResultSet.
-   * @throws IOException  - thrown by the ResultSet.
-   */
-  String[] getColumnValues(ResultSet rs, boolean trim, String dateFormatString,
-      String timeFormatString) throws SQLException, IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java
deleted file mode 100644
index 3d15949..0000000
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-/**
- * Copyright 2005 Bytecode Pty Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.io.Reader;
-import java.math.BigDecimal;
-import java.sql.Clob;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-/**
- * helper class for processing JDBC ResultSet objects.
- */
-public class ResultSetHelperService implements ResultSetHelper {
-  public static final int CLOBBUFFERSIZE = 2048;
-
-  // note: we want to maintain compatibility with Java 5 VM's
-  // These types don't exist in Java 5
-  static final int NVARCHAR = -9;
-  static final int NCHAR = -15;
-  static final int LONGNVARCHAR = -16;
-  static final int NCLOB = 2011;
-
-  static final String DEFAULT_DATE_FORMAT = "dd-MMM-yyyy";
-  static final String DEFAULT_TIMESTAMP_FORMAT = "dd-MMM-yyyy HH:mm:ss";
-
-  /**
-   * Default Constructor.
-   */
-  public ResultSetHelperService() {
-  }
-
-  private static String read(Clob c) throws SQLException, IOException {
-    StringBuilder sb = new StringBuilder((int) c.length());
-    Reader r = c.getCharacterStream();
-    try {
-      char[] cbuf = new char[CLOBBUFFERSIZE];
-      int n;
-      while ((n = r.read(cbuf, 0, cbuf.length)) != -1) {
-        sb.append(cbuf, 0, n);
-      }
-    } finally {
-      r.close();
-    }
-    return sb.toString();
-
-  }
-
-  /**
-   * Returns the column names from the result set.
-   *
-   * @param rs - ResultSet
-   * @return - a string array containing the column names.
-   * @throws SQLException - thrown by the result set.
-   */
-  public String[] getColumnNames(ResultSet rs) throws SQLException {
-    List<String> names = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    //CHECKSTYLE:OFF    Approval No:Approval-V1R2C10_010
-    ResultSetMetaData metadata = rs.getMetaData();
-    //CHECKSTYLE:ON
-    for (int i = 0; i < metadata.getColumnCount(); i++) {
-      names.add(metadata.getColumnName(i + 1));
-    }
-
-    String[] nameArray = new String[names.size()];
-    return names.toArray(nameArray);
-  }
-
-  /**
-   * Get all the column values from the result set.
-   *
-   * @param rs - the ResultSet containing the values.
-   * @return - String array containing all the column values.
-   * @throws SQLException - thrown by the result set.
-   * @throws IOException  - thrown by the result set.
-   */
-  public String[] getColumnValues(ResultSet rs) throws SQLException, 
IOException {
-    return this.getColumnValues(rs, false, DEFAULT_DATE_FORMAT, 
DEFAULT_TIMESTAMP_FORMAT);
-  }
-
-  /**
-   * Get all the column values from the result set.
-   *
-   * @param rs   - the ResultSet containing the values.
-   * @param trim - values should have white spaces trimmed.
-   * @return - String array containing all the column values.
-   * @throws SQLException - thrown by the result set.
-   * @throws IOException  - thrown by the result set.
-   */
-  public String[] getColumnValues(ResultSet rs, boolean trim) throws 
SQLException, IOException {
-    return this.getColumnValues(rs, trim, DEFAULT_DATE_FORMAT, 
DEFAULT_TIMESTAMP_FORMAT);
-  }
-
-  /**
-   * Get all the column values from the result set.
-   *
-   * @param rs               - the ResultSet containing the values.
-   * @param trim             - values should have white spaces trimmed.
-   * @param dateFormatString - format String for dates.
-   * @param timeFormatString - format String for timestamps.
-   * @return - String array containing all the column values.
-   * @throws SQLException - thrown by the result set.
-   * @throws IOException  - thrown by the result set.
-   */
-  public String[] getColumnValues(ResultSet rs, boolean trim, String 
dateFormatString,
-      String timeFormatString) throws SQLException, IOException {
-    List<String> values = new ArrayList<>();
-    ResultSetMetaData metadata = rs.getMetaData();
-
-    for (int i = 0; i < metadata.getColumnCount(); i++) {
-      values.add(getColumnValue(rs, metadata.getColumnType(i + 1), i + 1, 
trim, dateFormatString,
-          timeFormatString));
-    }
-
-    String[] valueArray = new String[values.size()];
-    return values.toArray(valueArray);
-  }
-
-  /**
-   * changes an object to a String.
-   *
-   * @param obj - Object to format.
-   * @return - String value of an object or empty string if the object is null.
-   */
-  protected String handleObject(Object obj) {
-    return obj == null ? "" : String.valueOf(obj);
-  }
-
-  /**
-   * changes a BigDecimal to String.
-   *
-   * @param decimal - BigDecimal to format
-   * @return String representation of a BigDecimal or empty string if null
-   */
-  protected String handleBigDecimal(BigDecimal decimal) {
-    return decimal == null ? "" : decimal.toString();
-  }
-
-  /**
-   * Retrieves the string representation of an Long value from the result set.
-   *
-   * @param rs          - Result set containing the data.
-   * @param columnIndex - index to the column of the long.
-   * @return - the string representation of the long
-   * @throws SQLException - thrown by the result set on error.
-   */
-  protected String handleLong(ResultSet rs, int columnIndex) throws 
SQLException {
-    long lv = rs.getLong(columnIndex);
-    return rs.wasNull() ? "" : Long.toString(lv);
-  }
-
-  /**
-   * Retrieves the string representation of an Integer value from the result 
set.
-   *
-   * @param rs          - Result set containing the data.
-   * @param columnIndex - index to the column of the integer.
-   * @return - string representation of the Integer.
-   * @throws SQLException - returned from the result set on error.
-   */
-  protected String handleInteger(ResultSet rs, int columnIndex) throws 
SQLException {
-    int i = rs.getInt(columnIndex);
-    return rs.wasNull() ? "" : Integer.toString(i);
-  }
-
-  /**
-   * Retrieves a date from the result set.
-   *
-   * @param rs               - Result set containing the data
-   * @param columnIndex      - index to the column of the date
-   * @param dateFormatString - format for the date
-   * @return - formatted date.
-   * @throws SQLException - returned from the result set on error.
-   */
-  protected String handleDate(ResultSet rs, int columnIndex, String 
dateFormatString)
-      throws SQLException {
-    java.sql.Date date = rs.getDate(columnIndex);
-    String value = null;
-    if (date != null) {
-      SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
-      value = dateFormat.format(date);
-    }
-    return value;
-  }
-
-  /**
-   * Return time read from ResultSet.
-   *
-   * @param time time read from ResultSet
-   * @return String version of time or null if time is null.
-   */
-  protected String handleTime(Time time) {
-    return time == null ? null : time.toString();
-  }
-
-  /**
-   * The formatted timestamp.
-   *
-   * @param timestamp             - timestamp read from resultset
-   * @param timestampFormatString - format string
-   * @return - formatted time stamp.
-   */
-  protected String handleTimestamp(Timestamp timestamp, String 
timestampFormatString) {
-    SimpleDateFormat timeFormat = new SimpleDateFormat(timestampFormatString);
-    return timestamp == null ? null : timeFormat.format(timestamp);
-  }
-
-  private String getColumnValue(ResultSet rs, int colType, int colIndex, 
boolean trim,
-      String dateFormatString, String timestampFormatString) throws 
SQLException, IOException {
-
-    String value = "";
-
-    switch (colType) {
-      case Types.BIT:
-      case Types.JAVA_OBJECT:
-        value = handleObject(rs.getObject(colIndex));
-        break;
-      case Types.BOOLEAN:
-        boolean b = rs.getBoolean(colIndex);
-        value = Boolean.valueOf(b).toString();
-        break;
-      case NCLOB: // todo : use rs.getNClob
-      case Types.CLOB:
-        Clob c = rs.getClob(colIndex);
-        if (c != null) {
-          value = read(c);
-        }
-        break;
-      case Types.BIGINT:
-        value = handleLong(rs, colIndex);
-        break;
-      case Types.DECIMAL:
-      case Types.DOUBLE:
-      case Types.FLOAT:
-      case Types.REAL:
-      case Types.NUMERIC:
-        value = handleBigDecimal(rs.getBigDecimal(colIndex));
-        break;
-      case Types.INTEGER:
-      case Types.TINYINT:
-      case Types.SMALLINT:
-        value = handleInteger(rs, colIndex);
-        break;
-      case Types.DATE:
-        value = handleDate(rs, colIndex, dateFormatString);
-        break;
-      case Types.TIME:
-        value = handleTime(rs.getTime(colIndex));
-        break;
-      case Types.TIMESTAMP:
-        value = handleTimestamp(rs.getTimestamp(colIndex), 
timestampFormatString);
-        break;
-      case NVARCHAR: // todo : use rs.getNString
-      case NCHAR: // todo : use rs.getNString
-      case LONGNVARCHAR: // todo : use rs.getNString
-      case Types.LONGVARCHAR:
-      case Types.VARCHAR:
-      case Types.CHAR:
-        value = getColumnValue(rs, colIndex, trim);
-        break;
-      default:
-        value = "";
-    }
-
-    if (value == null) {
-      value = "";
-    }
-
-    return value;
-  }
-
-  /**
-   * @param rs
-   * @param colIndex
-   * @param trim
-   * @return
-   * @throws SQLException
-   */
-  public String getColumnValue(ResultSet rs, int colIndex, boolean trim) 
throws SQLException {
-    String value;
-    String columnValue = rs.getString(colIndex);
-    if (trim && columnValue != null) {
-      value = columnValue.trim();
-    } else {
-      value = columnValue;
-    }
-    return value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala
deleted file mode 100644
index d6932cd..0000000
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{Logging, Partition, SerializableWritable, 
SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.common.logging.impl.StandardLogService
-import org.apache.carbondata.spark.PartitionResult
-import org.apache.carbondata.spark.partition.api.impl.CSVFilePartitioner
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-
-class CarbonSparkRawDataPartition(rddId: Int, val idx: Int, @transient val 
tableSplit: TableSplit)
-  extends Partition {
-
-  override val index: Int = idx
-  val serializableHadoopSplit = new 
SerializableWritable[TableSplit](tableSplit)
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * This RDD class is used to  create splits the fact csv store to various 
partitions as per
- * configuration  and compute each split in the respective node located in the 
server.
- * .
- */
-class CarbonDataPartitionRDD[K, V](
-    sc: SparkContext,
-    results: PartitionResult[K, V],
-    databaseName: String,
-    tableName: String,
-    sourcePath: String,
-    targetFolder: String,
-    requiredColumns: Array[String],
-    headers: String,
-    delimiter: String,
-    quoteChar: String,
-    escapeChar: String,
-    multiLine: Boolean,
-    partitioner: Partitioner)
-  extends RDD[(K, V)](sc, Nil) with Logging {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil
-      .getPartitionSplits(sourcePath, partitioner.nodeList, 
partitioner.partitionCount)
-    splits.zipWithIndex.map {s =>
-      new CarbonSparkRawDataPartition(id, s._2, s._1)
-    }
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-    new Iterator[(K, V)] {
-      val split = theSplit.asInstanceOf[CarbonSparkRawDataPartition]
-      StandardLogService
-        
.setThreadName(split.serializableHadoopSplit.value.getPartition.getUniqueID, 
null)
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-
-      val csvPart = new CSVFilePartitioner(partitioner.partitionClass, 
sourcePath)
-      csvPart.splitFile(databaseName, tableName,
-        split.serializableHadoopSplit.value.getPartition.getFilesPath, 
targetFolder,
-        partitioner.nodeList.toList.asJava, partitioner.partitionCount, 
partitioner.partitionColumn,
-        requiredColumns, delimiter, quoteChar, headers, escapeChar, multiLine)
-
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished) {
-          finished = true
-          finished
-        }
-        else {
-          !finished
-        }
-      }
-
-      override def next(): (K, V) = {
-        results.getKey(partitioner.partitionCount, csvPart.isPartialSuccess)
-      }
-    }
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonSparkRawDataPartition]
-    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s.head + s.length)
-    s
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3118d3f..964c955 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -62,37 +62,6 @@ object CarbonDataRDDFactory extends Logging {
 
   val logger = 
LogServiceFactory.getLogService(CarbonDataRDDFactory.getClass.getName)
 
-  // scalastyle:off
-  def partitionCarbonData(sc: SparkContext,
-      databaseName: String,
-      tableName: String,
-      sourcePath: String,
-      targetFolder: String,
-      requiredColumns: Array[String],
-      headers: String,
-      delimiter: String,
-      quoteChar: String,
-      escapeChar: String,
-      multiLine: Boolean,
-      partitioner: Partitioner): String = {
-    // scalastyle:on
-    val status = new
-        CarbonDataPartitionRDD(sc, new PartitionResultImpl(), databaseName, 
tableName, sourcePath,
-          targetFolder, requiredColumns, headers, delimiter, quoteChar, 
escapeChar, multiLine,
-          partitioner
-        ).collect
-    CarbonDataProcessorUtil
-      .renameBadRecordsFromInProgressToNormal("partition/" + databaseName + 
'/' + tableName)
-    var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-    status.foreach {
-      case (key, value) =>
-        if (value) {
-          loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-        }
-    }
-    loadStatus
-  }
-
   def mergeCarbonData(
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 59db05b..a7d807e 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.util.control.Breaks.{break, breakable}
 
+import au.com.bytecode.opencsv.CSVReader
 import org.apache.commons.lang3.{ArrayUtils, StringUtils}
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
@@ -38,9 +39,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.load.CarbonLoadModel
-import org.apache.carbondata.spark.partition.reader.{CSVParser, CSVReader}
+import org.apache.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
 import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, 
SortIndexWriterTask}
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil._
@@ -500,7 +499,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: 
CarbonLoadModel,
       inputStream = FileFactory.getDataInputStream(preDefDictFilePath,
         FileFactory.getFileType(preDefDictFilePath))
       csvReader = new CSVReader(new InputStreamReader(inputStream, 
Charset.defaultCharset),
-        CSVReader.DEFAULT_SKIP_LINES, new 
CSVParser(carbonLoadModel.getCsvDelimiter.charAt(0)))
+        carbonLoadModel.getCsvDelimiter.charAt(0))
       // read the column data to list iterator
       colDictData = csvReader.readAll.iterator
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 818aa4a..e714520 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -49,7 +49,6 @@ import 
org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.load.CarbonLoadModel
-import org.apache.carbondata.spark.partition.reader.CSVWriter
 import org.apache.carbondata.spark.rdd._
 
 /**
@@ -58,6 +57,16 @@ import org.apache.carbondata.spark.rdd._
 object GlobalDictionaryUtil extends Logging {
 
   /**
+   * The default separator to use if none is supplied to the constructor.
+   */
+  val DEFAULT_SEPARATOR: Char = ','
+  /**
+   * The default quote character to use if none is supplied to the
+   * constructor.
+   */
+  val DEFAULT_QUOTE_CHARACTER: Char = '"'
+
+  /**
    * find columns which need to generate global dictionary.
    *
    * @param dimensions  dimension list of schema
@@ -354,7 +363,7 @@ object GlobalDictionaryUtil extends Logging {
       })
       .option("delimiter", {
         if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
-          "" + CSVWriter.DEFAULT_SEPARATOR
+          "" + DEFAULT_SEPARATOR
         }
         else {
           carbonLoadModel.getCsvDelimiter
@@ -367,7 +376,7 @@ object GlobalDictionaryUtil extends Logging {
       .option("codec", "gzip")
       .option("quote", {
         if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
-          "" + CSVWriter. DEFAULT_QUOTE_CHARACTER
+          "" + DEFAULT_QUOTE_CHARACTER
         }
         else {
           carbonLoadModel.getQuoteChar
@@ -592,7 +601,7 @@ object GlobalDictionaryUtil extends Logging {
    */
   private def parseRecord(x: String, accum: Accumulator[Int],
                   csvFileColumns: Array[String]) : (String, String) = {
-    val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
+    val tokens = x.split("" + DEFAULT_SEPARATOR)
     var columnName: String = ""
     var value: String = ""
     // such as "," , "", throw ex
@@ -713,7 +722,7 @@ object GlobalDictionaryUtil extends Logging {
 
     if (null != readLine) {
       val delimiter = if 
(StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
-        "" + CSVWriter.DEFAULT_SEPARATOR
+        "" + DEFAULT_SEPARATOR
       } else {
         carbonLoadModel.getCsvDelimiter
       }
@@ -756,7 +765,7 @@ object GlobalDictionaryUtil extends Logging {
           df.columns
         }
         else {
-          carbonLoadModel.getCsvHeader.split("" + CSVWriter.DEFAULT_SEPARATOR)
+          carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
         }
         headers = headers.map(headerName => headerName.trim)
         val colDictFilePath = carbonLoadModel.getColDictFilePath
@@ -820,7 +829,7 @@ object GlobalDictionaryUtil extends Logging {
           var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) 
{
             getHeaderFormFactFile(carbonLoadModel)
           } else {
-            carbonLoadModel.getCsvHeader.toLowerCase.split("" + 
CSVWriter.DEFAULT_SEPARATOR)
+            carbonLoadModel.getCsvHeader.toLowerCase.split("" + 
DEFAULT_SEPARATOR)
           }
           headers = headers.map(headerName => headerName.trim)
           // prune columns according to the CSV file header, dimension columns

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 972de05..c9fdd6b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -28,13 +28,12 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
OverrideCatalog}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.ExtractPythonUDFs
-import org.apache.spark.sql.execution.command.PartitionData
 import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, 
PreWriteCheck}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.optimizer.CarbonOptimizer
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, 
QueryStatisticsConstants, QueryStatisticsRecorder}
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, 
QueryStatisticsConstants}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory}
 import org.apache.carbondata.spark.rdd.CarbonDataFrameRDD
@@ -153,45 +152,6 @@ object CarbonContext {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
-  /**
-   * @param databaseName - Database Name
-   * @param tableName   - Table Name
-   * @param factPath   - Raw CSV data path
-   * @param targetPath - Target path where the file will be split as per 
partition
-   * @param delimiter  - default file delimiter is comma(,)
-   * @param quoteChar  - default quote character used in Raw CSV file, Default 
quote
-   *                   character is double quote(")
-   * @param fileHeader - Header should be passed if not available in Raw CSV 
File, else pass null,
-   *                   Header will be read from CSV
-   * @param escapeChar - This parameter by default will be null, there wont be 
any validation if
-   *                   default escape character(\) is found on the RawCSV file
-   * @param multiLine  - This parameter will be check for end of quote 
character if escape character
-   *                   & quote character is set.
-   *                   if set as false, it will check for end of quote 
character within the line
-   *                   and skips only 1 line if end of quote not found
-   *                   if set as true, By default it will check for 10000 
characters in multiple
-   *                   lines for end of quote & skip all lines if end of quote 
not found.
-   */
-  final def partitionData(
-      databaseName: String = null,
-      tableName: String,
-      factPath: String,
-      targetPath: String,
-      delimiter: String = ",",
-      quoteChar: String = "\"",
-      fileHeader: String = null,
-      escapeChar: String = null,
-      multiLine: Boolean = false)(hiveContext: HiveContext): String = {
-    updateCarbonPorpertiesPath(hiveContext)
-    var databaseNameLocal = databaseName
-    if (databaseNameLocal == null) {
-      databaseNameLocal = "default"
-    }
-    val partitionDataClass = PartitionData(databaseName, tableName, factPath, 
targetPath, delimiter,
-      quoteChar, fileHeader, escapeChar, multiLine)
-    partitionDataClass.run(hiveContext)
-    partitionDataClass.partitionStatus
-  }
 
   final def updateCarbonPorpertiesPath(hiveContext: HiveContext) {
     val carbonPropertiesFilePath = 
hiveContext.getConf("carbon.properties.filepath", null)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be1675f4/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index e4a79ab..ac48624 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1139,28 +1139,6 @@ private[sql] case class LoadTable(
           carbonLoadModel.setColDictFilePath(columnDict)
           carbonLoadModel.setDirectLoad(true)
         }
-        else {
-          val fileType = FileFactory.getFileType(partitionLocation)
-          if (FileFactory.isFileExist(partitionLocation, fileType)) {
-            val file = FileFactory.getCarbonFile(partitionLocation, fileType)
-            CarbonUtil.deleteFoldersAndFiles(file)
-          }
-          partitionLocation += System.currentTimeMillis()
-          FileFactory.mkdirs(partitionLocation, fileType)
-          LOGGER.info("Initiating Data Partitioning for the Table : (" +
-                      dbName + "." + tableName + ")")
-          partitionStatus = CarbonContext.partitionData(
-            dbName,
-            tableName,
-            factPath,
-            partitionLocation,
-            delimiter,
-            quoteChar,
-            fileHeader,
-            escapeChar, multiLine)(sqlContext.asInstanceOf[HiveContext])
-          
carbonLoadModel.setFactFilePath(FileUtils.getPaths(partitionLocation))
-          carbonLoadModel.setColDictFilePath(columnDict)
-        }
         GlobalDictionaryUtil
           .generateGlobalDictionary(sqlContext, carbonLoadModel, 
relation.tableMeta.storePath)
         CarbonDataRDDFactory
@@ -1213,34 +1191,6 @@ private[sql] case class LoadTable(
 
 }
 
-private[sql] case class PartitionData(databaseName: String, tableName: String, 
factPath: String,
-    targetPath: String, delimiter: String, quoteChar: String,
-    fileHeader: String, escapeChar: String, multiLine: Boolean)
-  extends RunnableCommand {
-
-  var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val identifier = TableIdentifier(tableName, Option(databaseName))
-    val relation = CarbonEnv.getInstance(sqlContext)
-      .carbonCatalog.lookupRelation1(identifier, 
None)(sqlContext).asInstanceOf[CarbonRelation]
-    val dimNames = relation.tableMeta.carbonTable
-      .getDimensionByTableName(tableName).asScala.map(_.getColName)
-    val msrNames = relation.tableMeta.carbonTable
-      .getDimensionByTableName(tableName).asScala.map(_.getColName)
-    val targetFolder = targetPath
-    partitionStatus = CarbonDataRDDFactory.partitionCarbonData(
-      sqlContext.sparkContext, databaseName,
-      tableName, factPath, targetFolder, (dimNames ++ msrNames).toArray
-      , fileHeader, delimiter,
-      quoteChar, escapeChar, multiLine, relation.tableMeta.partitioner)
-    if (partitionStatus == 
CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
-      logInfo("Bad Record Found while partitioning data")
-    }
-    Seq.empty
-  }
-}
-
 private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: 
Option[String],
     tableName: String)
   extends RunnableCommand {

Reply via email to