Repository: carbondata
Updated Branches:
  refs/heads/master 7bfe4afe4 -> 74f5d67c0


[CARBONDATA-2209] Fixed rename table with partitions not working issue and 
batch_sort and no_sort with partition table issue

This closes #2006


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74f5d67c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74f5d67c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74f5d67c

Branch: refs/heads/master
Commit: 74f5d67c0f79ba2a45ed97339179333a7cd37279
Parents: 7bfe4af
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Tue Feb 27 16:38:09 2018 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Fri Mar 2 21:19:57 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/datamap/Segment.java |  21 ++
 .../blockletindex/SegmentIndexFileStore.java    |  13 +-
 .../core/metadata/SegmentFileStore.java         |  33 +++-
 .../core/util/path/CarbonTablePath.java         |   7 +
 .../core/writer/CarbonIndexFileMergeWriter.java | 191 ++++++++++++++-----
 .../CarbonIndexFileMergeTestCase.scala          |  15 +-
 .../StandardPartitionGlobalSortTestCase.scala   |  86 +++++++++
 .../StandardPartitionTableQueryTestCase.scala   |  21 ++
 .../schema/CarbonAlterTableRenameCommand.scala  |  58 +++++-
 .../sql/execution/strategy/DDLStrategy.scala    |   8 +
 .../spark/sql/hive/CarbonSessionState.scala     |  12 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  10 +
 .../loading/DataLoadProcessBuilder.java         |   8 +-
 .../processing/util/CarbonLoaderUtil.java       |  87 +++++++++
 14 files changed, 509 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index c47f16c..a2a2a41 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -21,6 +21,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
 /**
  * Represents one load of carbondata
  */
@@ -76,6 +80,23 @@ public class Segment implements Serializable {
     return new Segment(segmentId, null);
   }
 
+  /**
+   * Read the table status and get the segment corresponding to segmentNo
+   * @param segmentNo
+   * @param tablePath
+   * @return
+   */
+  public static Segment getSegment(String segmentNo, String tablePath) {
+    LoadMetadataDetails[] loadMetadataDetails =
+        
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath));
+    for (LoadMetadataDetails details: loadMetadataDetails) {
+      if (details.getLoadName().equals(segmentNo)) {
+        return new Segment(details.getLoadName(), details.getSegmentFile());
+      }
+    }
+    return null;
+  }
+
   @Override public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 4883d94..9364a7a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -96,6 +96,7 @@ public class SegmentIndexFileStore {
   public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, 
String tablePath,
       SegmentStatus status, boolean ignoreStatus) throws IOException {
     List<CarbonFile> carbonIndexFiles = new ArrayList<>();
+    Set<String> indexFiles = new HashSet<>();
     if (segmentFile == null) {
       return;
     }
@@ -107,11 +108,21 @@ public class SegmentIndexFileStore {
         if (locations.getValue().isRelative()) {
           location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + 
location;
         }
+        String mergeFileName = locations.getValue().getMergeFileName();
+        if (mergeFileName != null) {
+          CarbonFile mergeFile = FileFactory
+              .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + 
mergeFileName);
+          if (mergeFile.exists() && 
!indexFiles.contains(mergeFile.getAbsolutePath())) {
+            carbonIndexFiles.add(mergeFile);
+            indexFiles.add(mergeFile.getAbsolutePath());
+          }
+        }
         for (String indexFile : locations.getValue().getFiles()) {
           CarbonFile carbonFile = FileFactory
               .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + 
indexFile);
-          if (carbonFile.exists()) {
+          if (carbonFile.exists() && 
!indexFiles.contains(carbonFile.getAbsolutePath())) {
             carbonIndexFiles.add(carbonFile);
+            indexFiles.add(carbonFile.getAbsolutePath());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index f2548b5..2d31b4e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -195,7 +195,7 @@ public class SegmentFileStore {
    * @param partitionSpecs
    */
   public static SegmentFile getSegmentFileForPhysicalDataPartitions(String 
tablePath,
-      List<PartitionSpec> partitionSpecs) {
+      List<PartitionSpec> partitionSpecs) throws IOException {
     SegmentFile segmentFile = null;
     for (PartitionSpec spec : partitionSpecs) {
       String location = spec.getLocation().toString();
@@ -220,6 +220,9 @@ public class SegmentFileStore {
         folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
         for (CarbonFile file : listFiles) {
           if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+            List<String> indexFiles =
+                new 
SegmentIndexFileStore().getIndexFilesFromMergeFile(file.getAbsolutePath());
+            folderDetails.getFiles().addAll(indexFiles);
             folderDetails.setMergeFileName(file.getName());
           } else {
             folderDetails.getFiles().add(file.getName());
@@ -302,6 +305,10 @@ public class SegmentFileStore {
     readIndexFiles(SegmentStatus.SUCCESS, false);
   }
 
+  public SegmentFile getSegmentFile() {
+    return segmentFile;
+  }
+
   /**
    * Reads all index files as per the status of the file. In case of 
@ignoreStatus is true it just
    * reads all index files
@@ -377,6 +384,30 @@ public class SegmentFileStore {
   }
 
   /**
+   * Gets all carbon index files from this segment
+   * @return
+   */
+  public List<CarbonFile> getIndexCarbonFiles() {
+    Map<String, String> indexFiles = getIndexFiles();
+    Set<String> files = new HashSet<>();
+    for (Map.Entry<String, String> entry: indexFiles.entrySet()) {
+      Path path = new Path(entry.getKey());
+      files.add(entry.getKey());
+      if (entry.getValue() != null) {
+        files.add(new Path(path.getParent(), entry.getValue()).toString());
+      }
+    }
+    List<CarbonFile> carbonFiles = new ArrayList<>();
+    for (String indexFile : files) {
+      CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
+      if (carbonFile.exists()) {
+        carbonFiles.add(carbonFile);
+      }
+    }
+    return carbonFiles;
+  }
+
+  /**
    * Drops the partition related files from the segment file of the segment 
and writes
    * to a new file. First iterator over segment file and check the path it 
needs to be dropped.
    * And update the status with delete if it found.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index fa742d8..b5fe5ea 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -170,6 +170,13 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * Return metadata path based on `tablePath`
+   */
+  public static String getTableStatusPath(String tablePath) {
+    return getMetadataPath(tablePath) + File.separator + TABLE_STATUS_FILE;
+  }
+
+  /**
    * @param columnId unique column identifier
    * @return absolute path of dictionary meta file
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 01f96ba..bc150e5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -17,18 +17,26 @@
 package org.apache.carbondata.core.writer;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.MergedBlockIndex;
 import org.apache.carbondata.format.MergedBlockIndexHeader;
 
+import org.apache.hadoop.fs.Path;
+
 public class CarbonIndexFileMergeWriter {
 
   /**
@@ -38,7 +46,7 @@ public class CarbonIndexFileMergeWriter {
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
-   * @param segmentPath
+   * @param tablePath
    * @param indexFileNamesTobeAdded while merging it comsiders only these 
files.
    *                                If null then consider all
    * @param readFileFooterFromCarbonDataFile flag to read file footer 
information from carbondata
@@ -46,77 +54,152 @@ public class CarbonIndexFileMergeWriter {
    *                                         which do not store the blocklet 
info to current version
    * @throws IOException
    */
-  private void mergeCarbonIndexFilesOfSegment(String segmentPath,
-      List<String> indexFileNamesTobeAdded, boolean 
readFileFooterFromCarbonDataFile)
-      throws IOException {
-    CarbonFile[] indexFiles = 
SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+  private SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String 
segmentId,
+      String tablePath, List<String> indexFileNamesTobeAdded,
+      boolean readFileFooterFromCarbonDataFile) throws IOException {
+    Segment segment = Segment.getSegment(segmentId, tablePath);
+    String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+    CarbonFile[] indexFiles;
+    SegmentFileStore sfs = null;
+    if (segment != null && segment.getSegmentFileName() != null) {
+      sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+      List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
+      indexFiles = indexCarbonFiles.toArray(new 
CarbonFile[indexCarbonFiles.size()]);
+    } else {
+      indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+    }
     if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != 
null) {
-      SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
-      if (readFileFooterFromCarbonDataFile) {
-        // this case will be used in case of upgrade where old store will not 
have the blocklet
-        // info in the index file and therefore blocklet info need to be read 
from the file footer
-        // in the carbondata file
-        fileStore.readAllIndexAndFillBolckletInfo(segmentPath);
+      if (sfs == null) {
+        return mergeNormalSegment(indexFileNamesTobeAdded, 
readFileFooterFromCarbonDataFile,
+            segmentPath, indexFiles);
       } else {
-        fileStore.readAllIIndexOfSegment(segmentPath);
+        return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles);
       }
-      Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
-      MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
-      MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
-      List<String> fileNames = new ArrayList<>(indexMap.size());
-      List<ByteBuffer> data = new ArrayList<>(indexMap.size());
-      for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
-        if (indexFileNamesTobeAdded == null ||
-            indexFileNamesTobeAdded.contains(entry.getKey())) {
-          fileNames.add(entry.getKey());
-          data.add(ByteBuffer.wrap(entry.getValue()));
-        }
+    }
+    return null;
+  }
+
+
+  private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> 
indexFileNamesTobeAdded,
+      boolean readFileFooterFromCarbonDataFile, String segmentPath, 
CarbonFile[] indexFiles)
+      throws IOException {
+    SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+    if (readFileFooterFromCarbonDataFile) {
+      // this case will be used in case of upgrade where old store will not 
have the blocklet
+      // info in the index file and therefore blocklet info need to be read 
from the file footer
+      // in the carbondata file
+      fileStore.readAllIndexAndFillBolckletInfo(segmentPath);
+    } else {
+      fileStore.readAllIIndexOfSegment(segmentPath);
+    }
+    Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
+    writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap);
+    for (CarbonFile indexFile : indexFiles) {
+      indexFile.delete();
+    }
+    return null;
+  }
+
+  private SegmentIndexFIleMergeStatus mergePartitionSegment(List<String> 
indexFileNamesTobeAdded,
+      SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+    fileStore
+        .readAllIIndexOfSegment(sfs.getSegmentFile(), sfs.getTablePath(), 
SegmentStatus.SUCCESS,
+            true);
+    Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
+    Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
+    for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) {
+      Path path = new Path(entry.getKey());
+      Map<String, byte[]> map = 
indexLocationMap.get(path.getParent().toString());
+      if (map == null) {
+        map = new HashMap<>();
+        indexLocationMap.put(path.getParent().toString(), map);
       }
-      if (fileNames.size() > 0) {
-        openThriftWriter(
-            segmentPath + "/" + System.currentTimeMillis() + 
CarbonTablePath.MERGE_INDEX_FILE_EXT);
-        indexHeader.setFile_names(fileNames);
-        mergedBlockIndex.setFileData(data);
-        writeMergedBlockIndexHeader(indexHeader);
-        writeMergedBlockIndex(mergedBlockIndex);
-        close();
+      map.put(path.getName(), entry.getValue());
+    }
+    for (Map.Entry<String, Map<String, byte[]>> entry : 
indexLocationMap.entrySet()) {
+      String mergeIndexFile =
+          writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), 
entry.getValue());
+      for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : 
sfs.getLocationMap()
+          .entrySet()) {
+        String location = segentry.getKey();
+        if (segentry.getValue().isRelative()) {
+          location = sfs.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR 
+ location;
+        }
+        if (new Path(entry.getKey()).equals(new Path(location))) {
+          segentry.getValue().setMergeFileName(mergeIndexFile);
+          break;
+        }
       }
-      for (CarbonFile indexFile : indexFiles) {
-        indexFile.delete();
+    }
+
+    List<String> filesTobeDeleted = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      filesTobeDeleted.add(file.getAbsolutePath());
+    }
+    return new SegmentIndexFIleMergeStatus(sfs.getSegmentFile(), 
filesTobeDeleted);
+  }
+
+  private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, 
String segmentPath,
+      Map<String, byte[]> indexMap) throws IOException {
+    MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
+    MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
+    List<String> fileNames = new ArrayList<>(indexMap.size());
+    List<ByteBuffer> data = new ArrayList<>(indexMap.size());
+    for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
+      if (indexFileNamesTobeAdded == null ||
+          indexFileNamesTobeAdded.contains(entry.getKey())) {
+        fileNames.add(entry.getKey());
+        data.add(ByteBuffer.wrap(entry.getValue()));
       }
     }
+    if (fileNames.size() > 0) {
+      String mergeIndexName = System.currentTimeMillis() + 
CarbonTablePath.MERGE_INDEX_FILE_EXT;
+      openThriftWriter(segmentPath + "/" + mergeIndexName);
+      indexHeader.setFile_names(fileNames);
+      mergedBlockIndex.setFileData(data);
+      writeMergedBlockIndexHeader(indexHeader);
+      writeMergedBlockIndex(mergedBlockIndex);
+      close();
+      return mergeIndexName;
+    }
+    return null;
   }
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
    *
-   * @param segmentPath
+   * @param segmentId
    * @param indexFileNamesTobeAdded
    * @throws IOException
    */
-  public void mergeCarbonIndexFilesOfSegment(String segmentPath,
-      List<String> indexFileNamesTobeAdded) throws IOException {
-    mergeCarbonIndexFilesOfSegment(segmentPath, indexFileNamesTobeAdded, 
false);
+  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String 
segmentId,
+      String tablePath, List<String> indexFileNamesTobeAdded) throws 
IOException {
+    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, 
indexFileNamesTobeAdded, false);
   }
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
-   * @param segmentPath
+   *
+   * @param segmentId
    * @throws IOException
    */
-  public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws 
IOException {
-    mergeCarbonIndexFilesOfSegment(segmentPath, null, false);
+  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String 
segmentId,
+      String tablePath) throws IOException {
+    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false);
   }
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
-   * @param segmentPath
+   *
+   * @param segmentId
    * @param readFileFooterFromCarbonDataFile
    * @throws IOException
    */
-  public void mergeCarbonIndexFilesOfSegment(String segmentPath,
-      boolean readFileFooterFromCarbonDataFile) throws IOException {
-    mergeCarbonIndexFilesOfSegment(segmentPath, null, 
readFileFooterFromCarbonDataFile);
+  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String 
segmentId,
+      String tablePath, boolean readFileFooterFromCarbonDataFile) throws 
IOException {
+    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
+        readFileFooterFromCarbonDataFile);
   }
 
   private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
@@ -166,4 +249,24 @@ public class CarbonIndexFileMergeWriter {
     thriftWriter.close();
   }
 
+  public static class SegmentIndexFIleMergeStatus implements Serializable {
+
+    private SegmentFileStore.SegmentFile segmentFile;
+
+    private List<String> filesTobeDeleted;
+
+    public SegmentIndexFIleMergeStatus(SegmentFileStore.SegmentFile 
segmentFile,
+        List<String> filesTobeDeleted) {
+      this.segmentFile = segmentFile;
+      this.filesTobeDeleted = filesTobeDeleted;
+    }
+
+    public SegmentFileStore.SegmentFile getSegmentFile() {
+      return segmentFile;
+    }
+
+    public List<String> getFilesTobeDeleted() {
+      return filesTobeDeleted;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 895b0b5..7608318 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -62,9 +62,8 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge 
OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), 
false)
+      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
       sql("""Select count(*) from indexmerge"""))
@@ -88,9 +87,9 @@ class CarbonIndexFileMergeTestCase
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
     val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), 
false)
+      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), 
false)
+      .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -114,9 +113,9 @@ class CarbonIndexFileMergeTestCase
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
     val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), 
false)
+      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), 
false)
+      .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -144,7 +143,7 @@ class CarbonIndexFileMergeTestCase
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
     val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      
.mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+      .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
@@ -174,7 +173,7 @@ class CarbonIndexFileMergeTestCase
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
     val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      
.mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+      .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "2") == 100)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index ff062cd..b511ee8 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -927,6 +927,92 @@ class StandardPartitionGlobalSortTestCase extends 
QueryTest with BeforeAndAfterA
     assert(exMessage.getMessage.contains("day is not a valid partition column 
in table default.partitionnocolumn"))
   }
 
+  test("data loading with default partition in static partition table with 
batchsort") {
+    sql("DROP TABLE IF EXISTS partitiondefaultbatchsort")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultbatchsort (empno int, designation 
String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='BATCH_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiondefaultbatchsort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultbatchsort"), 
Seq(Row(10)))
+  }
+
+  test("data loading with default partition in static partition table with 
nosort") {
+    sql("DROP TABLE IF EXISTS partitiondefaultnosort")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultnosort (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='NO_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiondefaultnosort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultnosort"), 
Seq(Row(10)))
+  }
+
+  test("data loading with default partition in static partition table with 
rename") {
+    sql("DROP TABLE IF EXISTS partitiondefaultrename")
+    sql("DROP TABLE IF EXISTS partitiondefaultrename_new")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultrename (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiondefaultrename OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultrename"), 
Seq(Row(10)))
+    sql(s"alter table partitiondefaultrename rename to 
partitiondefaultrename_new")
+    checkAnswer(sql("select count(*) from partitiondefaultrename_new"), 
Seq(Row(10)))
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiondefaultrename_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultrename_new"), 
Seq(Row(20)))
+  }
+
+  test("data loading with default partition in static partition table with 
rename first") {
+    sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst")
+    sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst_new")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultrenamefirst (empno int, designation 
String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"alter table partitiondefaultrenamefirst rename to 
partitiondefaultrenamefirst_new")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiondefaultrenamefirst_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultrenamefirst_new"), 
Seq(Row(10)))
+  }
+
+  test("data loading for global partition table for two partition column with 
no columns in csv") {
+    sql("DROP TABLE IF EXISTS partitiontwonocolumns")
+    sql(
+      """
+        | CREATE TABLE partitiontwonocolumns (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int,doj Timestamp, empname String)
+        | PARTITIONED BY (newcol1 date, newcol2 int)
+        | STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitiontwonocolumns partition(newcol1='2016-08-09', newcol2='20') 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
partitiontwonocolumns order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))
+
+    checkAnswer(sql("select distinct cast(newcol1 as string) from 
partitiontwonocolumns"), Seq(Row("2016-08-09")))
+  }
+
   override def afterAll = {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 58eb9f9..163e662 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -300,6 +300,27 @@ test("Creation of partition table should fail if the 
colname in table schema and
     FileFactory.deleteAllCarbonFilesOfDir(file)
   }
 
+  test("set partition location with static column partition with load 
command") {
+    sql("drop table if exists staticpartitionsetloc")
+    sql(
+      """
+        | CREATE TABLE staticpartitionsetloc (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val location = metastoredb +"/" +"ravi1"
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
staticpartitionsetloc partition(empname='ravi') OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')""")
+    intercept[Exception] {
+      sql(s"""alter table staticpartitionsetloc partition (empname='ravi') set 
location '$location'""")
+    }
+    val file = FileFactory.getCarbonFile(location)
+    FileFactory.deleteAllCarbonFilesOfDir(file)
+  }
+
   test("add external partition with static column partition with load command 
with diffrent schema") {
 
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index c8f64e1..40b5cfc 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql.execution.command.schema
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, 
MetadataCommand}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.util.AlterTableUtil
@@ -119,6 +121,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       val hiveClient = 
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
         .getClient()
+      var partitions: Seq[CatalogTablePartition] = Seq.empty
+      if (carbonTable.isHivePartitionTable) {
+        partitions =
+          sparkSession.sessionState.catalog.listPartitions(
+            TableIdentifier(oldTableName, Some(oldDatabaseName)))
+      }
       sparkSession.catalog.refreshTable(TableIdentifier(oldTableName,
         Some(oldDatabaseName)).quotedString)
       hiveClient.runSqlHive(
@@ -127,6 +135,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
           s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
           s"('tableName'='$newTableName', " +
           s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+
       // changed the rename order to deal with situation when carbon table and 
hive table
       // will point to the same tablePath
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -138,6 +147,27 @@ private[sql] case class CarbonAlterTableRenameCommand(
           sys.error(s"Folder rename failed for table 
$oldDatabaseName.$oldTableName")
         }
       }
+      val updatedParts = updatePartitionLocations(
+        partitions,
+        oldTablePath.getPath,
+        newTablePath,
+        sparkSession)
+
+      val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
+      val catalogTable = 
sparkSession.sessionState.catalog.getTableMetadata(newIdentifier)
+      // Update the storage location with new path
+      sparkSession.sessionState.catalog.alterTable(
+        catalogTable.copy(storage = sparkSession.sessionState.catalog.
+          asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
+          new Path(newTablePath),
+          catalogTable.storage)))
+      if (updatedParts.nonEmpty) {
+        // Update the new updated partitions specs with new location.
+        sparkSession.sessionState.catalog.alterPartitions(
+          newIdentifier,
+          updatedParts)
+      }
+
       newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
@@ -151,8 +181,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, 
operationContext)
 
-      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
-        Some(oldDatabaseName)).quotedString)
+      sparkSession.catalog.refreshTable(newIdentifier.quotedString)
       carbonTableLockFilePath = newTablePath
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to 
$newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to 
$newTableName")
@@ -187,6 +216,31 @@ private[sql] case class CarbonAlterTableRenameCommand(
     Seq.empty
   }
 
+  /**
+   * Update partitions with new table location
+   *
+   */
+  private def updatePartitionLocations(
+      partitions: Seq[CatalogTablePartition],
+      oldTablePath: String,
+      newTablePath: String,
+      sparkSession: SparkSession): Seq[CatalogTablePartition] = {
+    partitions.map{ part =>
+      if (part.storage.locationUri.isDefined) {
+        val path = new Path(part.location)
+        if (path.toString.contains(oldTablePath)) {
+          val newPath = new Path(path.toString.replace(oldTablePath, 
newTablePath))
+          part.copy(storage = sparkSession.sessionState.catalog.
+            asInstanceOf[CarbonSessionCatalog].updateStorageLocation(newPath, 
part.storage))
+        } else {
+          part
+        }
+      } else {
+        part
+      }
+    }
+  }
+
   private def renameBadRecords(
       oldTableName: String,
       newTableName: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index f69ccc1..dcbce84 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -265,6 +265,14 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
         RefreshCarbonTableCommand(tableIdentifier.database,
           tableIdentifier.table).run(sparkSession)
         ExecutedCommandExec(RefreshTable(tableIdentifier)) :: Nil
+      case alterSetLoc@AlterTableSetLocationCommand(tableName, _, _) =>
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableName)(sparkSession)
+        if (isCarbonTable) {
+          throw new UnsupportedOperationException("Set partition location is 
not supported")
+        } else {
+          ExecutedCommandExec(alterSetLoc) :: Nil
+        }
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 1b7f0cb..ba2fe947 100644
--- 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -17,8 +17,9 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
BoundReference, Expression, InterpretedPredicate, PredicateSubquery, 
ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
@@ -182,6 +183,15 @@ class CarbonSessionCatalog(
       allPartitions
     }
   }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toString))
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index a119bda..e82b485 100644
--- 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import scala.collection.generic.SeqFactory
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
@@ -171,6 +172,15 @@ class CarbonSessionCatalog(
       partitionFilters,
       sparkSession.sessionState.conf.sessionLocalTimeZone)
   }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f5b29e7..82f4c9b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -57,15 +57,15 @@ public final class DataLoadProcessBuilder {
       CarbonIterator[] inputIterators) throws Exception {
     CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, 
storeLocation);
     SortScopeOptions.SortScope sortScope = 
CarbonDataProcessorUtil.getSortScope(configuration);
-    if ((!configuration.isSortTable() || 
sortScope.equals(SortScopeOptions.SortScope.NO_SORT))
-        && !loadModel.isPartitionLoad()) {
+    if (loadModel.isPartitionLoad()) {
+      return buildInternalForPartitionLoad(inputIterators, configuration, 
sortScope);
+    } else if (!configuration.isSortTable() ||
+        sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
       return buildInternalForNoSort(inputIterators, configuration);
     } else if (configuration.getBucketingInfo() != null) {
       return buildInternalForBucketing(inputIterators, configuration);
     } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
       return buildInternalForBatchSort(inputIterators, configuration);
-    } else if (loadModel.isPartitionLoad()) {
-      return buildInternalForPartitionLoad(inputIterators, configuration, 
sortScope);
     } else {
       return buildInternal(inputIterators, configuration);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 32c72da..23f9aa8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -57,6 +58,7 @@ import 
org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.merger.NodeBlockRelation;
 import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
@@ -328,6 +330,60 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
+  /**
+   * This API will update the segmentFile of a passed segment.
+   *
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
+   */
+  public static boolean updateSegmentFile(String tablePath, String segmentId, 
String segmentFile)
+      throws IOException {
+    boolean status = false;
+    String tableStatusPath = CarbonTablePath.getTableStatusPath(tablePath);
+    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        AbsoluteTableIdentifier.from(tablePath, null, null);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    int retryCount = CarbonLockUtil
+        
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+    int maxTimeout = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+    try {
+      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
+        LOGGER.info("Acquired lock for tablepath" + tablePath + " for table 
status updation");
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+            SegmentStatusManager.readLoadMetadata(metadataPath);
+
+        for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
+          // if the segments is in the list of marked for delete then update 
the status.
+          if (segmentId.equals(detail.getLoadName())) {
+            detail.setSegmentFile(segmentFile);
+            break;
+          }
+        }
+
+        SegmentStatusManager
+            .writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray);
+        status = true;
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for Table status updation for table 
path " + tablePath);
+      }
+      ;
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" 
+ tablePath);
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + tablePath + " during 
table status updation");
+      }
+    }
+    return status;
+  }
+
   private static void addToStaleFolders(CarbonTablePath carbonTablePath,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws 
IOException {
     String path = carbonTablePath.getCarbonDataDirectoryPath("0", 
entry.getLoadName());
@@ -950,4 +1006,35 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setIndexSize(String.valueOf(indexSize));
     return dataSize + indexSize;
   }
+
+  /**
+   * Merge index files with in the segment of partitioned table
+   * @param segmentId
+   * @param tablePath
+   * @param uniqueId
+   * @return
+   * @throws IOException
+   */
+  public static String mergeIndexFilesinPartitionedSegment(String segmentId, 
String tablePath,
+      String uniqueId) throws IOException {
+    CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus 
segmentIndexFIleMergeStatus =
+        new 
CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, 
tablePath);
+    if (segmentIndexFIleMergeStatus != null) {
+      uniqueId = System.currentTimeMillis() + "";
+      String newSegmentFileName = segmentId + "_" + uniqueId + 
CarbonTablePath.SEGMENT_EXT;
+      String path =
+          CarbonTablePath.getSegmentFilesLocation(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR
+              + newSegmentFileName;
+      
SegmentFileStore.writeSegmentFile(segmentIndexFIleMergeStatus.getSegmentFile(), 
path);
+      updateSegmentFile(tablePath, segmentId, newSegmentFileName);
+      deleteFiles(segmentIndexFIleMergeStatus.getFilesTobeDeleted());
+    }
+    return uniqueId;
+  }
+
+  private static void deleteFiles(List<String> filesToBeDeleted) throws 
IOException {
+    for (String filePath : filesToBeDeleted) {
+      FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath));
+    }
+  }
 }

Reply via email to